Skip to main content
If your dataset lives outside HoneyHive (S3, databases, internal tools), you can keep it in sync by storing a stable external ID in each datapoint’s metadata.

How it works

  1. Each source row has a unique ID (database primary key, row number, etc.)
  2. Store that ID as external_id in the datapoint’s metadata
  3. On each sync run, compare source rows to existing datapoints by external_id
  4. Create, update, or delete as needed
This preserves datapoint IDs across syncs, so experiment history stays linked.
We’re building native dataset sync into the SDK to make this easier. In the meantime, the pattern below works well.

Sync script

Adapt the inputs and ground_truth mapping to match your data schema.
import os

from honeyhive import HoneyHive
from honeyhive.models import (
    CreateDatapointRequest,
    UpdateDatapointRequest,
    UpdateDatasetRequest,
)

client = HoneyHive(api_key=os.environ["HH_API_KEY"])
DATASET_NAME = os.environ["HH_DATASET_NAME"]

datasets = client.datasets.list(name=DATASET_NAME).datasets
if not datasets:
    raise ValueError(f"Dataset '{DATASET_NAME}' not found")

dataset = datasets[0]
dataset_id = dataset.id


def sync_dataset(rows: list[dict], id_field: str = "id"):
    """Sync source rows into a HoneyHive dataset by external_id."""

    # 1. Load existing datapoints and index by external_id
    existing = client.datapoints.list(dataset_name=DATASET_NAME)
    current_dataset_datapoints = [dp.id for dp in existing.datapoints]
    id_map: dict[str, str] = {}  # external_id -> HoneyHive datapoint id
    for dp in existing.datapoints:
        ext_id = (dp.metadata or {}).get("external_id")
        if ext_id:
            id_map[str(ext_id)] = dp.id

    # 2. Create or update each source row
    source_ids = set()
    created_ids: list[str] = []
    for row in rows:
        ext_id = str(row[id_field])
        source_ids.add(ext_id)

        # --- Adapt this mapping to your schema ---
        datapoint = {
            "inputs": {"question": row["question"]},
            "ground_truth": {"answer": row["answer"]},
            "metadata": {"external_id": ext_id},
        }

        if ext_id in id_map:
            client.datapoints.update(
                id_map[ext_id], UpdateDatapointRequest(**datapoint)
            )
        else:
            response = client.datapoints.create(CreateDatapointRequest(**datapoint))
            new_datapoint_id = response.result["insertedId"]
            created_ids.append(new_datapoint_id)
            id_map[ext_id] = new_datapoint_id

    # 3. Delete datapoints no longer in source
    stale_ids = set()
    for ext_id, dp_id in id_map.items():
        if ext_id not in source_ids:
            client.datapoints.delete(dp_id)
            stale_ids.add(dp_id)

    # 4. Update dataset membership once per sync run
    if created_ids or stale_ids:
        next_datapoints = [
            dp_id
            for dp_id in [*current_dataset_datapoints, *created_ids]
            if dp_id not in stale_ids
        ]
        client.datasets.update(
            UpdateDatasetRequest(
                dataset_id=dataset_id,
                datapoints=next_datapoints,
            )
        )


# Load rows from your source (S3, database, internal tool, etc.)
rows = [
    {"id": "q1", "question": "What is Python?", "answer": "A programming language"},
    {"id": "q2", "question": "What is tracing?", "answer": "Recording app execution"},
]

sync_dataset(rows)

Skip unchanged rows with hashing

The script above updates every matched row, even if nothing changed. For large datasets, add a content hash to skip unnecessary API calls.
import hashlib, json

def content_hash(data: dict) -> str:
    """Deterministic hash for change detection."""
    return hashlib.sha256(
        json.dumps(data, sort_keys=True, separators=(",", ":")).encode()
    ).hexdigest()
When creating or updating, store the hash in metadata:
content = {
    "inputs": {"question": row["question"]},
    "ground_truth": {"answer": row["answer"]},
}
row_hash = content_hash(content)

datapoint = {
    **content,
    "metadata": {"external_id": ext_id, "content_hash": row_hash},
}

# Skip update if hash matches
existing_hash = (dp.metadata or {}).get("content_hash")
if ext_id in id_map and existing_hash == row_hash:
    continue  # unchanged, skip

Track dataset versions

You can keep a version manifest alongside your sync script to track what changed and when.
{
  "dataset_name": "my-dataset",
  "content_hash": "3f2d1a9b7c42",
  "row_count": 250,
  "synced_at": "2026-02-13T10:22:00Z",
  "source": "data/test_cases.json"
}
Update the manifest after each sync:
import datetime, hashlib, json

def dataset_hash(rows: list[dict], id_field: str = "id") -> str:
    """Deterministic hash over all rows."""
    return hashlib.sha256(
        json.dumps(sorted(rows, key=lambda r: str(r[id_field])),
                   sort_keys=True, separators=(",", ":")).encode()
    ).hexdigest()[:12]

manifest = {
    "dataset_name": DATASET_NAME,
    "content_hash": dataset_hash(rows),
    "row_count": len(rows),
    "synced_at": datetime.datetime.now(datetime.timezone.utc).isoformat(),
    "source": "data/test_cases.json",
}

with open("dataset-version.json", "w") as f:
    json.dump(manifest, f, indent=2)
If you add a version field, a simple convention: patch (v1.0.1) for label fixes, minor (v1.1.0) for new rows with the same schema, major (v2.0.0) for schema changes.

Automate with CI/CD

Run the sync whenever your dataset source changes. For example, if your test cases live in a JSON file in your repo, trigger the sync on push:
# .github/workflows/sync-dataset.yml
name: Sync dataset
on:
  push:
    paths:
      - "data/test_cases.json"  # trigger when source file changes
  workflow_dispatch:             # manual trigger

jobs:
  sync:
    runs-on: ubuntu-latest
    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"
      - run: pip install "honeyhive>=1.0.0rc0"
      - run: python sync_dataset.py
        env:
          HH_API_KEY: ${{ secrets.HH_API_KEY }}
          HH_DATASET_NAME: my-dataset
For databases or S3, use a webhook or event trigger from your data pipeline instead.