> ## Documentation Index
> Fetch the complete documentation index at: https://docs.honeyhive.ai/llms.txt
> Use this file to discover all available pages before exploring further.

# Sync from External Sources

> Keep a HoneyHive dataset synced with S3, databases, or other external sources

If your dataset lives outside HoneyHive (S3, databases, internal tools), you can keep it in sync by storing a stable external ID in each datapoint's metadata.

## How it works

1. Each source row has a unique ID (database primary key, row number, etc.)
2. Store that ID as `external_id` in the datapoint's `metadata`
3. On each sync run, compare source rows to existing datapoints by `external_id`
4. Create, update, or delete as needed

This preserves datapoint IDs across syncs, so experiment history stays linked.

<Note>
  We're building native dataset sync into the SDK to make this easier. In the meantime, the pattern below works well.
</Note>

## Sync script

Adapt the `inputs` and `ground_truth` mapping to match your data schema.

```python theme={null}
import os

from honeyhive import HoneyHive
from honeyhive.models import (
    CreateDatapointRequest,
    UpdateDatapointRequest,
    UpdateDatasetRequest,
)

client = HoneyHive(api_key=os.environ["HH_API_KEY"])
DATASET_NAME = os.environ["HH_DATASET_NAME"]

datasets = client.datasets.list(name=DATASET_NAME).datasets
if not datasets:
    raise ValueError(f"Dataset '{DATASET_NAME}' not found")

dataset = datasets[0]
dataset_id = dataset.id


def sync_dataset(rows: list[dict], id_field: str = "id"):
    """Sync source rows into a HoneyHive dataset by external_id."""

    # 1. Load existing datapoints and index by external_id
    existing = client.datapoints.list(dataset_name=DATASET_NAME)
    current_dataset_datapoints = [dp.id for dp in existing.datapoints]
    id_map: dict[str, str] = {}  # external_id -> HoneyHive datapoint id
    for dp in existing.datapoints:
        ext_id = (dp.metadata or {}).get("external_id")
        if ext_id:
            id_map[str(ext_id)] = dp.id

    # 2. Create or update each source row
    source_ids = set()
    created_ids: list[str] = []
    for row in rows:
        ext_id = str(row[id_field])
        source_ids.add(ext_id)

        # --- Adapt this mapping to your schema ---
        datapoint = {
            "inputs": {"question": row["question"]},
            "ground_truth": {"answer": row["answer"]},
            "metadata": {"external_id": ext_id},
        }

        if ext_id in id_map:
            client.datapoints.update(
                id_map[ext_id], UpdateDatapointRequest(**datapoint)
            )
        else:
            response = client.datapoints.create(CreateDatapointRequest(**datapoint))
            new_datapoint_id = response.result["insertedId"]
            created_ids.append(new_datapoint_id)
            id_map[ext_id] = new_datapoint_id

    # 3. Delete datapoints no longer in source
    stale_ids = set()
    for ext_id, dp_id in id_map.items():
        if ext_id not in source_ids:
            client.datapoints.delete(dp_id)
            stale_ids.add(dp_id)

    # 4. Update dataset membership once per sync run
    if created_ids or stale_ids:
        next_datapoints = [
            dp_id
            for dp_id in [*current_dataset_datapoints, *created_ids]
            if dp_id not in stale_ids
        ]
        client.datasets.update(
            UpdateDatasetRequest(
                dataset_id=dataset_id,
                datapoints=next_datapoints,
            )
        )


# Load rows from your source (S3, database, internal tool, etc.)
rows = [
    {"id": "q1", "question": "What is Python?", "answer": "A programming language"},
    {"id": "q2", "question": "What is tracing?", "answer": "Recording app execution"},
]

sync_dataset(rows)
```

## Skip unchanged rows with hashing

The script above updates every matched row, even if nothing changed. For large datasets, add a content hash to skip unnecessary API calls.

```python theme={null}
import hashlib, json

def content_hash(data: dict) -> str:
    """Deterministic hash for change detection."""
    return hashlib.sha256(
        json.dumps(data, sort_keys=True, separators=(",", ":")).encode()
    ).hexdigest()
```

When creating or updating, store the hash in metadata:

```python theme={null}
content = {
    "inputs": {"question": row["question"]},
    "ground_truth": {"answer": row["answer"]},
}
row_hash = content_hash(content)

datapoint = {
    **content,
    "metadata": {"external_id": ext_id, "content_hash": row_hash},
}

# Skip update if hash matches
existing_hash = (dp.metadata or {}).get("content_hash")
if ext_id in id_map and existing_hash == row_hash:
    continue  # unchanged, skip
```

## Track dataset versions

You can keep a version manifest alongside your sync script to track what changed and when.

```json theme={null}
{
  "dataset_name": "my-dataset",
  "content_hash": "3f2d1a9b7c42",
  "row_count": 250,
  "synced_at": "2026-02-13T10:22:00Z",
  "source": "data/test_cases.json"
}
```

Update the manifest after each sync:

```python theme={null}
import datetime, hashlib, json

def dataset_hash(rows: list[dict], id_field: str = "id") -> str:
    """Deterministic hash over all rows."""
    return hashlib.sha256(
        json.dumps(sorted(rows, key=lambda r: str(r[id_field])),
                   sort_keys=True, separators=(",", ":")).encode()
    ).hexdigest()[:12]

manifest = {
    "dataset_name": DATASET_NAME,
    "content_hash": dataset_hash(rows),
    "row_count": len(rows),
    "synced_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
    "source": "data/test_cases.json",
}

with open("dataset-version.json", "w") as f:
    json.dump(manifest, f, indent=2)
```

<Tip>
  If you add a `version` field, a simple convention: **patch** (v1.0.1) for label fixes, **minor** (v1.1.0) for new rows with the same schema, **major** (v2.0.0) for schema changes.
</Tip>

## Automate with CI/CD

Run the sync whenever your dataset source changes. For example, if your test cases live in a JSON file in your repo, trigger the sync on push:

```yaml theme={null}
# .github/workflows/sync-dataset.yml
name: Sync dataset
on:
  push:
    paths:
      - "data/test_cases.json"  # trigger when source file changes
  workflow_dispatch:             # manual trigger

jobs:
  sync:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"
      - run: pip install honeyhive
      - run: python sync_dataset.py
        env:
          HH_API_KEY: ${{ secrets.HH_API_KEY }}
          HH_DATASET_NAME: my-dataset
```

For databases or S3, use a webhook or event trigger from your data pipeline instead.

## Related

<CardGroup cols={2}>
  <Card title="Run experiments" icon="flask" href="/v2/introduction/experiments-quickstart">
    Evaluate your AI against the synced dataset
  </Card>

  <Card title="Upload datasets" icon="upload" href="/v2/datasets/import">
    One-time upload via UI or SDK
  </Card>

  <Card title="Export datasets" icon="download" href="/v2/datasets/export">
    Export datapoints for external use
  </Card>
</CardGroup>
