Qdrant RAG with HoneyHive Tracing

Learn how to integrate Qdrant with HoneyHive for vector database monitoring and tracing in RAG applications.

Qdrant

Qdrant is an open-source vector database optimized for storing and searching high-dimensional vectors. By integrating Qdrant with HoneyHive, you can:

  • Track vector database operations
  • Monitor embedding quality and relevance
  • Analyze retrieval performance in your RAG pipelines
  • Identify opportunities for optimization

Prerequisites

  • A HoneyHive account and API key
  • Python 3.8+
  • Basic understanding of vector databases and RAG pipelines

Installation

Install the required packages:

pip install qdrant-client openai honeyhive

Basic Integration Example

The following example demonstrates a complete RAG pipeline with HoneyHive tracing for Qdrant operations. We’ll break down each component step by step.

Initialize Clients and Setup

First, set up the necessary clients and configuration for HoneyHive, OpenAI, and Qdrant:

from qdrant_client import QdrantClient
from qdrant_client.models import VectorParams, Distance, PointStruct
import openai
import os
from honeyhive.tracer import HoneyHiveTracer
from honeyhive.tracer.custom import trace  # for custom span annotation

# Set API Keys
openai.api_key = os.getenv("OPENAI_API_KEY", "your_openai_api_key")

# Initialize HoneyHive Tracer
HoneyHiveTracer.init(
    api_key=os.getenv("HONEYHIVE_API_KEY", "your_honeyhive_api_key"),
    project="qdrant-rag-example",  # Your project name in HoneyHive
    session_name="qdrant-integration-demo"  # Optional session identifier
)

Connect to Qdrant

You can connect to Qdrant in two ways: self-hosted (local) or cloud-hosted (Qdrant Cloud):

# Option 1: Self-Hosted Qdrant (Local)
# To run Qdrant locally, you need to have Docker installed and run the following command:
# docker pull qdrant/qdrant
# docker run -p 6333:6333 -p 6334:6334 -v "$(pwd)/qdrant_storage:/qdrant/storage" qdrant/qdrant

# Connect to local Qdrant
client = QdrantClient(url="http://localhost:6333")
print("Connected to local Qdrant instance")

# Option 2: Qdrant Cloud (uncomment to use)
# QDRANT_HOST = "your-cluster-id.eu-central.aws.cloud.qdrant.io"  # Replace with your cluster host
# QDRANT_API_KEY = os.getenv("QDRANT_API_KEY", "your_qdrant_api_key")  # Replace with your API key
# client = QdrantClient(host=QDRANT_HOST, api_key=QDRANT_API_KEY)
# print("Connected to Qdrant Cloud")

Create a Collection

Create a collection to store document embeddings:

collection_name = "documents"

# Check if collection exists, if not create it
if not client.collection_exists(collection_name):
    client.create_collection(
        collection_name=collection_name,
        vectors_config=VectorParams(size=1536, distance=Distance.COSINE)
    )
    print(f"Collection '{collection_name}' created")
else:
    print(f"Collection '{collection_name}' already exists")

Define Embedding Function with Tracing

Create a function to generate embeddings with HoneyHive tracing:

@trace()
def embed_text(text: str) -> list:
    """Generate embeddings for a text using OpenAI's API."""
    response = openai.Embedding.create(
        model="text-embedding-ada-002",
        input=text
    )
    return response['data'][0]['embedding']

The @trace decorator logs information about the embedding process, including the model used and performance metrics.

Insert Documents with Tracing

Create a function to insert documents into Qdrant with tracing:

@trace()
def insert_documents(docs):
    """Insert documents into Qdrant collection."""
    points = []
    for idx, doc in enumerate(docs):
        vector = embed_text(doc)
        points.append(PointStruct(
            id=str(idx),
            vector=vector,
            payload={"text": doc}
        ))
    
    # Upsert points to Qdrant
    client.upsert(
        collection_name=collection_name,
        points=points
    )
    return len(points)

# Sample documents
documents = [
    "Qdrant is a vector database optimized for storing and searching high-dimensional vectors.",
    "HoneyHive provides observability for AI applications, including RAG pipelines.",
    "Retrieval-Augmented Generation (RAG) combines retrieval systems with generative models.",
    "Vector databases like Qdrant are essential for efficient similarity search in RAG systems.",
    "OpenAI's embedding models convert text into high-dimensional vectors for semantic search."
]

# Insert documents
num_inserted = insert_documents(documents)
print(f"Inserted {num_inserted} documents into Qdrant")

The @trace decorator logs information about the document insertion process, including the number of documents inserted.

Retrieve Documents with Tracing

Create a function to retrieve relevant documents from Qdrant with tracing:

@trace()
def get_relevant_docs(query: str, top_k: int = 3) -> list:
    """Retrieve relevant documents for a query."""
    # Embed the query
    q_vector = embed_text(query)
    
    # Search in Qdrant for similar vectors
    search_results = client.search(
        collection_name=collection_name,
        query_vector=q_vector,
        limit=top_k,
        with_payload=True  # ensure we get stored payload (text)
    )
    
    # Extract the text payload from each result
    docs = []
    for point in search_results:
        docs.append({
            "text": point.payload.get("text"),
            "score": point.score  # similarity score
        })
    
    return docs

The @trace decorator logs information about the retrieval process, including the number of results and the embedding model used.

Generate Response with Tracing

Create a function to generate a response using OpenAI with tracing:

@trace()
def answer_query(query: str) -> str:
    """Generate an answer for a query using retrieved documents."""
    # Get relevant documents
    relevant_docs = get_relevant_docs(query)
    
    # Format context from retrieved documents
    context = "\n\n".join([f"Document {i+1} (Score: {doc['score']:.4f}):\n{doc['text']}" 
                          for i, doc in enumerate(relevant_docs)])
    
    # Create prompt with context and query
    prompt = f"""Answer the question based on the following context:

Context:
{context}

Question: {query}

Answer:"""
    
    # Generate answer using OpenAI
    completion = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "You are a helpful assistant that answers questions based on the provided context."},
            {"role": "user", "content": prompt}
        ],
        temperature=0.3
    )
    
    answer = completion['choices'][0]['message']['content']
    return answer

The @trace decorator logs information about the response generation process, including the model used and the prompt template.

Step 8: Complete RAG Pipeline with Tracing

Create a function to run the complete RAG pipeline with tracing:

@trace()
def rag_pipeline(query: str) -> dict:
    """End-to-end RAG pipeline."""
    # Get relevant documents
    relevant_docs = get_relevant_docs(query)
    
    # Generate answer
    answer = answer_query(query)
    
    # Return both the answer and the retrieved documents
    return {
        "query": query,
        "answer": answer,
        "retrieved_documents": relevant_docs
    }

The @trace decorator logs information about the entire RAG pipeline, including the query, retrieved documents, and generated answer.

Advanced Usage: Batch Processing

For larger document sets, you can use batch processing to improve performance:

@trace()
def batch_insert_documents(documents, batch_size=10):
    """Insert documents in batches."""
    total_inserted = 0
    
    for i in range(0, len(documents), batch_size):
        batch = documents[i:i+batch_size]
        points = []
        
        for idx, doc in enumerate(batch):
            vector = embed_text(doc)
            points.append(PointStruct(
                id=str(i + idx),  # Ensure unique IDs across batches
                vector=vector,
                payload={"text": doc}
            ))
        
        client.upsert(
            collection_name=collection_name,
            points=points
        )
        
        total_inserted += len(points)
        print(f"Inserted batch {i//batch_size + 1}, total: {total_inserted} documents")
    
    return total_inserted

# Example usage of batch processing
# large_document_set = [f"Document {i}" for i in range(100)]
# batch_insert_documents(large_document_set, batch_size=20)

Complete Example

Here’s a complete example that demonstrates the entire RAG pipeline with Qdrant and HoneyHive tracing:

# Import Libraries
from qdrant_client import QdrantClient
from qdrant_client.models import VectorParams, Distance, PointStruct
import openai
import os
from honeyhive.tracer import HoneyHiveTracer
from honeyhive.tracer.custom import trace
from openai import OpenAI
# Set API Keys
openai.api_key = os.getenv("OPENAI_API_KEY", "your_openai_api_key")

# Initialize HoneyHive Tracer
HoneyHiveTracer.init(
    api_key=os.getenv("HONEYHIVE_API_KEY", "your_honeyhive_api_key"),
    project="qdrant-rag-example",
    session_name="qdrant-integration-demo"
)

# Connect to Qdrant
client = QdrantClient(url="http://localhost:6333")

openai_client = OpenAI()

# Create Collection
collection_name = "documents"
if not client.collection_exists(collection_name):
    client.create_collection(
        collection_name=collection_name,
        vectors_config=VectorParams(size=1536, distance=Distance.COSINE)
    )

# Define Embedding Function
@trace()
def embed_text(text: str) -> list:
    response = openai.Embedding.create(
        model="text-embedding-ada-002",
        input=text
    )
    return response['data'][0]['embedding']

# Sample Documents
documents = [
    "Qdrant is a vector database optimized for storing and searching high-dimensional vectors.",
    "HoneyHive provides observability for AI applications, including RAG pipelines.",
    "Retrieval-Augmented Generation (RAG) combines retrieval systems with generative models.",
    "Vector databases like Qdrant are essential for efficient similarity search in RAG systems.",
    "OpenAI's embedding models convert text into high-dimensional vectors for semantic search."
]

# Insert Documents
@trace()
def insert_documents(docs):
    points = []
    for idx, doc in enumerate(docs):
        vector = embed_text(doc)
        points.append(PointStruct(
            id=str(idx),
            vector=vector,
            payload={"text": doc}
        ))
    
    client.upsert(
        collection_name=collection_name,
        points=points
    )
    return len(points)

num_inserted = insert_documents(documents)

# Define Retrieval Function
@trace()
def get_relevant_docs(query: str, top_k: int = 3) -> list:
    q_vector = embed_text(query)
    
    search_results = client.search(
        collection_name=collection_name,
        query_vector=q_vector,
        limit=top_k,
        with_payload=True
    )
    
    docs = []
    for point in search_results:
        docs.append({
            "text": point.payload.get("text"),
            "score": point.score
        })
    
    return docs

# Define Answer Generation Function
@trace()
def answer_query(query: str) -> str:
    relevant_docs = get_relevant_docs(query)
    
    context = "\n\n".join([f"Document {i+1} (Score: {doc['score']:.4f}):\n{doc['text']}" 
                          for i, doc in enumerate(relevant_docs)])
    
    prompt = f"""Answer the question based on the following context:

Context:
{context}

Question: {query}

Answer:"""
    
    completion = openai_client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {"role": "system", "content": "You are a helpful assistant that answers questions based on the provided context."},
            {"role": "user", "content": prompt}
        ],
        temperature=0.3
    )
    
    answer = completion['choices'][0]['message']['content']
    return answer

# Complete RAG Pipeline
@trace()
def rag_pipeline(query: str) -> dict:
    relevant_docs = get_relevant_docs(query)
    answer = answer_query(query)
    
    return {
        "query": query,
        "answer": answer,
        "retrieved_documents": relevant_docs
    }

# Test the RAG Pipeline
query = "What is Qdrant used for?"
result = rag_pipeline(query)

print(f"Query: {result['query']}")
print(f"Answer: {result['answer']}")
print("\nRetrieved Documents:")
for i, doc in enumerate(result['retrieved_documents']):
    print(f"Document {i+1} (Score: {doc['score']:.4f}): {doc['text']}")

Viewing Traces in HoneyHive

After running your RAG pipeline with Qdrant, you can view the traces in the HoneyHive UI:

  1. Navigate to your project in the HoneyHive dashboard
  2. Click on the “Traces” tab to see all the traces from your RAG pipeline
  3. Click on a specific trace to see detailed information about each step in the pipeline
  4. Analyze the performance of your vector operations, embeddings, and retrieval processes

With HoneyHive, you can easily monitor and optimize your Qdrant-powered RAG pipeline, ensuring that it delivers the best possible results for your users.

Visit the Qdrant documentation and the HoneyHive documentation.