This guide demonstrates how to integrate HoneyHive tracing with Marqo, a tensor search engine, to monitor and optimize your RAG (Retrieval Augmented Generation) applications.
Next, set up the connection to your Marqo instance:
Copy
Ask AI
import marqoimport requests# NOTE: Marqo server needs to be running locally on port 8882, or you need to set MARQO_URL# environment variable to point to your Marqo server# For local development, you can run Marqo in Docker with:# docker run -p 8882:8882 marqoai/marqo:latestMARQO_URL = os.environ.get("MARQO_URL", "http://localhost:8882") # Default Marqo URL# Check if Marqo server is availabledef is_marqo_available(): try: response = requests.get(f"{MARQO_URL}/health", timeout=2) return response.status_code == 200 except requests.RequestException: return False# If Marqo server is not available, print a warningmarqo_available = is_marqo_available()if not marqo_available: print(f"""WARNING: Marqo server is not available at {MARQO_URL}To run this example properly, you need to start a Marqo server: docker run -p 8882:8882 marqoai/marqo:latestOr set the MARQO_URL environment variable to point to a running Marqo server.Continuing with mock functionality for demonstration purposes.""")# Initialize Marqo client if server is availableif marqo_available: client = marqo.Client(url=MARQO_URL)else: # Create a mock client for demonstration client = None# Define the index nameINDEX_NAME = "honeyhive_marqo_demo"
Use the @trace decorator to monitor index creation:
Copy
Ask AI
from honeyhive import trace@tracedef create_marqo_index(): """Create a Marqo index if it doesn't exist.""" if not marqo_available: print("[MOCK] Creating index (simulated)") return try: # Check if index exists indexes = client.get_indexes() if INDEX_NAME not in [index["indexName"] for index in indexes.get("results", [])]: # Create the index with simpler settings based on documentation client.create_index(INDEX_NAME, model="hf/e5-base-v2") print(f"Created index: {INDEX_NAME}") else: print(f"Index {INDEX_NAME} already exists") except Exception as e: print(f"Error creating index: {e}") raise
@tracedef add_documents_to_marqo(documents): """Add documents to the Marqo index.""" if not marqo_available: print(f"[MOCK] Adding {len(documents)} documents to index (simulated)") return try: # Add documents to the index following the documentation's format client.index(INDEX_NAME).add_documents( documents=documents, tensor_fields=["text"] # Specify which fields to vectorize ) print(f"Added {len(documents)} documents to index") except Exception as e: print(f"Error adding documents: {e}") raise
Here’s a complete example of a RAG pipeline using Marqo and HoneyHive tracing:
Copy
Ask AI
import sysimport osimport marqoimport requestsfrom openai import OpenAIfrom honeyhive import HoneyHiveTracer, trace# Set your API keysHONEYHIVE_API_KEY = "your honeyhive api key"OPENAI_API_KEY = "your openai api key"# NOTE: Marqo server needs to be running locally on port 8882, or you need to set MARQO_URL# environment variable to point to your Marqo server# For local development, you can run Marqo in Docker with:# docker run -p 8882:8882 marqoai/marqo:latestMARQO_URL = os.environ.get("MARQO_URL", "http://localhost:8882") # Default Marqo URL# Initialize OpenAI clientopenai_client = OpenAI(api_key=OPENAI_API_KEY)# Initialize HoneyHive tracerHoneyHiveTracer.init( api_key=HONEYHIVE_API_KEY, project="your project name", source="dev",)# Check if Marqo server is availabledef is_marqo_available(): try: response = requests.get(f"{MARQO_URL}/health", timeout=2) return response.status_code == 200 except requests.RequestException: return False# If Marqo server is not available, print a warningmarqo_available = is_marqo_available()if not marqo_available: print(f"""WARNING: Marqo server is not available at {MARQO_URL}To run this example properly, you need to start a Marqo server: docker run -p 8882:8882 marqoai/marqo:latestOr set the MARQO_URL environment variable to point to a running Marqo server.Continuing with mock functionality for demonstration purposes.""")# Initialize Marqo client if server is availableif marqo_available: client = marqo.Client(url=MARQO_URL)else: # Create a mock client for demonstration client = None# Define the index nameINDEX_NAME = "honeyhive_marqo_demo"@tracedef create_marqo_index(): # Implementation as shown above pass@tracedef add_documents_to_marqo(documents): # Implementation as shown above pass@tracedef search_marqo(query, limit=3): # Implementation as shown above pass@tracedef generate_response(query, context): """Generate a response using OpenAI based on the retrieved context.""" try: # Extract text from context context_text = "\n\n".join([doc.get("text", "") for doc in context]) # Create prompt prompt = f""" Answer the following question based on the provided context: Context: {context_text} Question: {query} Answer: """ # Call OpenAI API response = openai_client.chat.completions.create( model="gpt-4o-mini", messages=[ {"role": "system", "content": "You are a helpful assistant that answers questions based on the provided context."}, {"role": "user", "content": prompt} ], temperature=0.3, max_tokens=500 ) answer = response.choices[0].message.content return answer except Exception as e: print(f"Error generating response: {e}") raise@tracedef rag_pipeline(query, documents=None): """ Run the complete RAG pipeline with Marqo and HoneyHive tracing. Args: query: The user query documents: Optional list of documents to add to the index Returns: The generated response """ # Create index if needed create_marqo_index() # Add documents if provided if documents: add_documents_to_marqo(documents) # Search for relevant documents results = search_marqo(query) # Generate response response = generate_response(query, results) return responsedef main(): # Sample documents documents = [ {"text": "Marqo is a tensor search engine that makes it easy to build search into your applications.", "_id": "1"}, {"text": "HoneyHive provides tracing and monitoring for AI applications.", "_id": "2"}, {"text": "Retrieval Augmented Generation (RAG) combines retrieval systems with generative models.", "_id": "3"}, {"text": "Vector databases store embeddings which are numerical representations of data.", "_id": "4"}, {"text": "OpenTelemetry is an observability framework for cloud-native software.", "_id": "5"} ] # Sample query query = "How can HoneyHive help with RAG applications?" # Run the RAG pipeline response = rag_pipeline(query, documents) print("\n=== Generated Response ===") print(response)if __name__ == "__main__": main()
Experiment with different embedding models in Marqo
Add custom metrics to your traces
Implement A/B testing of different RAG configurations
Explore HoneyHive’s evaluation capabilities for your RAG pipeline
By integrating HoneyHive with Marqo, you gain valuable insights into your vector search operations and can optimize your RAG pipeline for better performance and accuracy.