๐ค Ghostwritten by Claude Opus 4.5 ยท Curated by Tom Hundley
This article was written by Claude Opus 4.5 and curated for publication by Tom Hundley.
When your RAG system needs to survive an enterprise production audit.
If LangChain is the Swiss Army knife of AI frameworks and LlamaIndex is the document specialist, Haystack is the enterprise-grade factory floor. Built by deepset, a German AI company founded in 2018, Haystack was designed from the ground up for production deployments in regulated industries.
The framework takes a fundamentally different approach than its competitors: everything is a pipeline. Not a chain. Not an index. A Directed Acyclic Graph (DAG) of connected components with explicit data flow, type checking, and serialization built into the core architecture.
This pipeline-centric design is not an accident. It reflects deepset's experience deploying NLP systems for organizations like Airbus, NVIDIA, Apple, and Meta, where reproducibility, auditability, and reliability matter more than rapid prototyping.
Haystack is the right choice when:
If you are coming from LangChain (Part 2) or LlamaIndex (Part 3), here are the key philosophical differences:
| Aspect | LangChain | LlamaIndex | Haystack |
|---|---|---|---|
| Core abstraction | Chains/Runnables | Indexes/Query Engines | Pipelines (DAG) |
| Design philosophy | Maximum flexibility | Document-centric | Production-first |
| Type safety | Optional | Moderate | Strict |
| Serialization | Add-on | Add-on | Built-in |
| Target audience | Prototypers | Knowledge workers | Enterprise teams |
This is not to say Haystack cannot prototype quickly. It can. But its design decisions optimize for the 99% of an application's lifetime spent in production, not the 1% spent in development.
Haystack 2.x is the current major version, released in early 2024. It is a complete rewrite from Haystack 1.x with a more modular architecture.
# Core installation
pip install haystack-ai
# Common extras for RAG
pip install haystack-ai[opensearch] # OpenSearch document store
pip install sentence-transformers # Local embeddings
pip install openai # OpenAI integrationImportant: The package name is haystack-ai, not haystack or farm-haystack. The older names refer to Haystack 1.x.
# Required for OpenAI-based components
export OPENAI_API_KEY="sk-proj-..."
# Optional: For OpenSearch document store
export OPENSEARCH_HOST="localhost"
export OPENSEARCH_PORT="9200"
export OPENSEARCH_USERNAME="admin"
export OPENSEARCH_PASSWORD="YourComplexPassword123!"A well-organized Haystack project:
rag-project/
โโโ pipelines/
โ โโโ __init__.py
โ โโโ indexing.py # Document ingestion pipeline
โ โโโ query.py # RAG query pipeline
โโโ components/
โ โโโ __init__.py
โ โโโ custom.py # Custom components
โโโ document_stores/
โ โโโ __init__.py
โ โโโ config.py # Document store configuration
โโโ data/
โ โโโ documents/ # Source documents
โโโ tests/
โ โโโ test_indexing.py
โ โโโ test_query.py
โโโ config.yaml # Pipeline configuration
โโโ main.pyHaystack's architecture revolves around four key abstractions.
Components are the building blocks. Each component does one thing well: embed text, split documents, retrieve from a store, generate responses.
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.generators import OpenAIGenerator
# Each component has typed inputs and outputs
embedder = SentenceTransformersTextEmbedder(
model="sentence-transformers/all-MiniLM-L6-v2"
)
# Components can be used standalone
result = embedder.run(text="What is RAG?")
print(result["embedding"][:5]) # First 5 dimensions of the vectorComponents have explicit input and output types. This is not just for documentation. Haystack uses these types to validate pipeline connections at construction time, catching errors before runtime.
A Pipeline is a DAG of connected components. Data flows from inputs through components to outputs, with the pipeline managing execution order and data passing.
from haystack import Pipeline
# Create an empty pipeline
pipeline = Pipeline()
# Add components
pipeline.add_component("embedder", embedder)
pipeline.add_component("retriever", retriever)
# Connect them (output -> input)
pipeline.connect("embedder.embedding", "retriever.query_embedding")The connection syntax "component_name.output_name" makes data flow explicit. No magic. No implicit state. Every piece of data has a clear path.
Document stores are where your indexed documents live. They abstract away the underlying storage technology (in-memory, OpenSearch, Elasticsearch, Pinecone, etc.) behind a consistent interface.
from haystack.document_stores.in_memory import InMemoryDocumentStore
# In-memory store for development
document_store = InMemoryDocumentStore()
# All stores share the same interface
document_store.write_documents(documents)
document_store.count_documents()For production deployments with concurrent requests, Haystack provides asynchronous pipeline execution:
from haystack import AsyncPipeline
async_pipeline = AsyncPipeline()
# ... add components ...
# Run asynchronously
result = await async_pipeline.run_async({"query": "What is RAG?"})Before you can retrieve documents, you need to process them. Haystack provides a rich set of document converters and splitters.
Haystack can convert documents from various formats:
from haystack.components.converters import (
PyPDFToDocument,
HTMLToDocument,
MarkdownToDocument,
TextFileToDocument
)
# PDF conversion
pdf_converter = PyPDFToDocument()
result = pdf_converter.run(sources=["document.pdf"])
documents = result["documents"]
# HTML conversion with metadata
html_converter = HTMLToDocument()
result = html_converter.run(
sources=["page.html"],
meta={"source": "website", "category": "documentation"}
)Each converter produces Document objects with content and metadata:
from haystack import Document
# Document structure
doc = Document(
content="The text content...",
meta={
"source": "quarterly_report.pdf",
"page_number": 5,
"department": "finance"
}
)Raw documents are typically too long for embedding models and retrieval. Splitters break them into chunks:
from haystack.components.preprocessors import DocumentSplitter
# Split by word count with overlap
splitter = DocumentSplitter(
split_by="word",
split_length=200,
split_overlap=20
)
result = splitter.run(documents=documents)
chunks = result["documents"]For more sophisticated splitting based on sentence boundaries:
from haystack.components.preprocessors import DocumentSplitter
# Sentence-aware splitting
splitter = DocumentSplitter(
split_by="sentence",
split_length=5, # 5 sentences per chunk
split_overlap=1 # 1 sentence overlap
)As discussed in Part 1, chunking strategy significantly impacts retrieval quality. The "right" parameters depend on your content and use case.
Once split, documents need to be embedded (converted to vectors):
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
# Embed documents with a local model
doc_embedder = SentenceTransformersDocumentEmbedder(
model="sentence-transformers/all-MiniLM-L6-v2"
)
result = doc_embedder.run(documents=chunks)
embedded_docs = result["documents"]
# Each document now has an embedding attribute
print(embedded_docs[0].embedding[:5]) # Vector dimensionsFor OpenAI embeddings:
from haystack.components.embedders import OpenAIDocumentEmbedder
doc_embedder = OpenAIDocumentEmbedder(
model="text-embedding-3-small"
)Here is a full indexing pipeline that reads PDFs, splits them, embeds them, and writes to a document store:
from haystack import Pipeline
from haystack.components.converters import PyPDFToDocument
from haystack.components.preprocessors import DocumentCleaner, DocumentSplitter
from haystack.components.embedders import SentenceTransformersDocumentEmbedder
from haystack.components.writers import DocumentWriter
from haystack.document_stores.in_memory import InMemoryDocumentStore
# Initialize document store
document_store = InMemoryDocumentStore()
# Create indexing pipeline
indexing_pipeline = Pipeline()
# Add components
indexing_pipeline.add_component("converter", PyPDFToDocument())
indexing_pipeline.add_component("cleaner", DocumentCleaner())
indexing_pipeline.add_component(
"splitter",
DocumentSplitter(split_by="word", split_length=200, split_overlap=20)
)
indexing_pipeline.add_component(
"embedder",
SentenceTransformersDocumentEmbedder(
model="sentence-transformers/all-MiniLM-L6-v2"
)
)
indexing_pipeline.add_component(
"writer",
DocumentWriter(document_store=document_store)
)
# Connect the pipeline
indexing_pipeline.connect("converter", "cleaner")
indexing_pipeline.connect("cleaner", "splitter")
indexing_pipeline.connect("splitter", "embedder")
indexing_pipeline.connect("embedder", "writer")
# Run the pipeline
result = indexing_pipeline.run({"converter": {"sources": ["docs/manual.pdf"]}})
print(f"Indexed {result['writer']['documents_written']} document chunks")Choosing the right document store depends on your deployment requirements.
Fast iteration, no external dependencies, but data is lost on restart:
from haystack.document_stores.in_memory import InMemoryDocumentStore
store = InMemoryDocumentStore(
bm25_algorithm="BM25Plus", # For keyword search
embedding_similarity_function="cosine" # For vector search
)
# Supports both embedding and BM25 retrieval
store.write_documents(documents)Enterprise-grade search with excellent scaling characteristics:
from haystack_integrations.document_stores.opensearch import OpenSearchDocumentStore
# IMPORTANT: OpenSearch requires complex passwords!
# The password must contain uppercase, lowercase, numbers, and special chars
store = OpenSearchDocumentStore(
hosts="https://localhost:9200",
use_ssl=True,
verify_certs=False,
http_auth=("admin", "YourComplexPassword123!"),
index="rag-documents"
)Critical Gotcha: Do NOT create the OpenSearch index manually. Haystack creates indices with specific mappings for embeddings. If you create the index yourself, vector search will fail silently. Let Haystack create the index on first write.
Similar to OpenSearch but for Elasticsearch deployments:
from haystack_integrations.document_stores.elasticsearch import ElasticsearchDocumentStore
store = ElasticsearchDocumentStore(
hosts="http://localhost:9200",
index="rag-documents"
)For purpose-built vector databases:
# Pinecone
from haystack_integrations.document_stores.pinecone import PineconeDocumentStore
store = PineconeDocumentStore(
api_key="your-api-key",
environment="us-west1-gcp",
index="rag-documents",
dimension=384 # Must match your embedding model
)
# Qdrant
from haystack_integrations.document_stores.qdrant import QdrantDocumentStore
store = QdrantDocumentStore(
url="http://localhost:6333",
index="rag-documents",
embedding_dim=384
)One of Haystack's strengths is that pipelines are storage-agnostic. To switch from development to production:
# Development
if os.environ.get("ENV") == "development":
document_store = InMemoryDocumentStore()
else:
# Production
document_store = OpenSearchDocumentStore(
hosts=os.environ["OPENSEARCH_HOST"],
http_auth=(
os.environ["OPENSEARCH_USER"],
os.environ["OPENSEARCH_PASSWORD"]
)
)
# Same pipeline works with either store
retriever = InMemoryEmbeddingRetriever(document_store=document_store)
# or
retriever = OpenSearchEmbeddingRetriever(document_store=document_store)Another Critical Gotcha: When switching between embedding models, you MUST delete and recreate your index. Vectors from different models are not compatible. Mixing them produces garbage results without any error message.
Now for the main event: building a complete RAG pipeline.
Query โ Text Embedder โ Retriever โ Prompt Builder โ Generator โ Responsefrom haystack import Pipeline
from haystack.components.embedders import SentenceTransformersTextEmbedder
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from haystack.document_stores.in_memory import InMemoryDocumentStore
# Assume document_store is already populated from indexing pipeline
document_store = InMemoryDocumentStore()
# Define the prompt template
template = """
Given the following context, answer the question.
If the context doesn't contain the answer, say "I don't have information about that."
Context:
{% for document in documents %}
{{ document.content }}
{% endfor %}
Question: {{ question }}
Answer:
"""
# Build the pipeline
rag_pipeline = Pipeline()
rag_pipeline.add_component(
"embedder",
SentenceTransformersTextEmbedder(
model="sentence-transformers/all-MiniLM-L6-v2"
)
)
rag_pipeline.add_component(
"retriever",
InMemoryEmbeddingRetriever(document_store=document_store, top_k=5)
)
rag_pipeline.add_component(
"prompt_builder",
PromptBuilder(template=template)
)
rag_pipeline.add_component(
"generator",
OpenAIGenerator(model="gpt-4o")
)
# Connect the components
rag_pipeline.connect("embedder.embedding", "retriever.query_embedding")
rag_pipeline.connect("retriever.documents", "prompt_builder.documents")
rag_pipeline.connect("prompt_builder", "generator")
# Run the pipeline
result = rag_pipeline.run({
"embedder": {"text": "What is our vacation policy?"},
"prompt_builder": {"question": "What is our vacation policy?"}
})
print(result["generator"]["replies"][0])Notice the explicit data flow: the query text goes to the embedder, the embedding goes to the retriever, retrieved documents go to the prompt builder, and the built prompt goes to the generator. No magic. No hidden state.
One of Haystack's killer features for debugging and documentation:
# Generate a visual representation
rag_pipeline.draw("rag_pipeline.png")This creates a DAG diagram showing all components and their connections. Invaluable for debugging complex pipelines and onboarding new team members.
Haystack supports multiple retrieval strategies.
Traditional keyword-based search, excellent for exact matches:
from haystack.components.retrievers.in_memory import InMemoryBM25Retriever
bm25_retriever = InMemoryBM25Retriever(
document_store=document_store,
top_k=10
)
result = bm25_retriever.run(query="Error code XJ-445")Vector similarity search for conceptual matching:
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
embedding_retriever = InMemoryEmbeddingRetriever(
document_store=document_store,
top_k=10
)
# Note: Requires a query embedding, not raw text
result = embedding_retriever.run(query_embedding=embedding_vector)Combine keyword and semantic search for best results. This requires a custom approach in Haystack 2.x:
from haystack import Pipeline, Document
from haystack.components.joiners import DocumentJoiner
# Hybrid search pipeline
hybrid_pipeline = Pipeline()
# Add both retrievers
hybrid_pipeline.add_component("text_embedder", text_embedder)
hybrid_pipeline.add_component("embedding_retriever", embedding_retriever)
hybrid_pipeline.add_component("bm25_retriever", bm25_retriever)
hybrid_pipeline.add_component(
"joiner",
DocumentJoiner(join_mode="reciprocal_rank_fusion")
)
# Connect
hybrid_pipeline.connect("text_embedder.embedding", "embedding_retriever.query_embedding")
hybrid_pipeline.connect("embedding_retriever.documents", "joiner.documents")
hybrid_pipeline.connect("bm25_retriever.documents", "joiner.documents")
# Run - query goes to both retrievers
result = hybrid_pipeline.run({
"text_embedder": {"text": query},
"bm25_retriever": {"query": query}
})The DocumentJoiner with reciprocal_rank_fusion merges results from both retrievers, giving you the best of both worlds: exact keyword matching and semantic understanding.
The most common choice for production:
from haystack.components.generators.chat import OpenAIChatGenerator
from haystack.dataclasses import ChatMessage
generator = OpenAIChatGenerator(
model="gpt-4o",
generation_kwargs={
"temperature": 0.7,
"max_tokens": 1000
}
)
messages = [
ChatMessage.from_system("You are a helpful assistant."),
ChatMessage.from_user("What is RAG?")
]
result = generator.run(messages=messages)
print(result["replies"][0].content)For Azure OpenAI deployments:
from haystack.components.generators.chat import AzureOpenAIChatGenerator
generator = AzureOpenAIChatGenerator(
azure_endpoint="https://your-resource.openai.azure.com",
azure_deployment="gpt-4o",
api_version="2024-02-15-preview"
)For local or self-hosted models:
from haystack.components.generators import HuggingFaceLocalGenerator
generator = HuggingFaceLocalGenerator(
model="mistralai/Mistral-7B-Instruct-v0.2",
task="text-generation",
generation_kwargs={
"max_new_tokens": 500,
"temperature": 0.7
}
)The PromptBuilder uses Jinja2 templates for flexible prompt construction:
from haystack.components.builders import PromptBuilder
# Template with conditional logic
template = """
You are a {{ persona }} assistant for {{ company }}.
{% if documents %}
Use the following context to answer:
{% for doc in documents %}
---
Source: {{ doc.meta.source }}
{{ doc.content }}
{% endfor %}
{% else %}
No context was retrieved. Answer based on your knowledge.
{% endif %}
Question: {{ question }}
"""
builder = PromptBuilder(template=template)
result = builder.run(
persona="technical support",
company="Acme Corp",
documents=retrieved_docs,
question="How do I reset my password?"
)Haystack's pipeline architecture enables sophisticated patterns that are difficult in less structured frameworks.
Route queries to different processing paths based on content:
from haystack.components.routers import ConditionalRouter
# Define routing rules
routes = [
{
"condition": "{{ 'code' in query or 'error' in query }}",
"output": "technical",
"output_name": "technical_queries",
"output_type": str
},
{
"condition": "{{ 'policy' in query or 'vacation' in query }}",
"output": "hr",
"output_name": "hr_queries",
"output_type": str
},
{
"condition": "{{ True }}", # Default route
"output": "general",
"output_name": "general_queries",
"output_type": str
}
]
router = ConditionalRouter(routes=routes)
# Build branching pipeline
pipeline = Pipeline()
pipeline.add_component("router", router)
pipeline.add_component("tech_retriever", tech_retriever)
pipeline.add_component("hr_retriever", hr_retriever)
pipeline.add_component("general_retriever", general_retriever)
# Connect branches
pipeline.connect("router.technical_queries", "tech_retriever.query")
pipeline.connect("router.hr_queries", "hr_retriever.query")
pipeline.connect("router.general_queries", "general_retriever.query")Implement iterative refinement where the model can correct its own answers:
from haystack.components.validators import JsonSchemaValidator
# Pipeline that loops until output is valid
pipeline = Pipeline(max_runs_per_component=3) # Prevent infinite loops
pipeline.add_component("generator", generator)
pipeline.add_component("validator", JsonSchemaValidator(json_schema=schema))
pipeline.add_component("error_handler", error_prompt_builder)
# If validation fails, route back to generator with error message
pipeline.connect("generator.replies", "validator.messages")
pipeline.connect("validator.validation_error", "error_handler.error")
pipeline.connect("error_handler.prompt", "generator.prompt")Implement fallbacks when primary retrieval fails:
from haystack.components.routers import ConditionalRouter
# Check if retrieval returned results
fallback_routes = [
{
"condition": "{{ documents|length > 0 }}",
"output": "{{ documents }}",
"output_name": "has_results",
"output_type": list
},
{
"condition": "{{ True }}",
"output": "{{ query }}",
"output_name": "no_results",
"output_type": str
}
]
# Build pipeline with web search fallback
pipeline = Pipeline()
pipeline.add_component("retriever", primary_retriever)
pipeline.add_component("fallback_check", ConditionalRouter(routes=fallback_routes))
pipeline.add_component("web_search", web_search_component)
pipeline.add_component("joiner", DocumentJoiner())
pipeline.connect("retriever.documents", "fallback_check.documents")
pipeline.connect("fallback_check.has_results", "joiner.documents")
pipeline.connect("fallback_check.no_results", "web_search.query")
pipeline.connect("web_search.documents", "joiner.documents")Haystack supports agent-like patterns with tool use:
from haystack.components.agents import Agent
from haystack.tools import Tool
# Define tools
search_tool = Tool(
name="search_documents",
description="Search the knowledge base for relevant documents",
pipeline=search_pipeline
)
calculator_tool = Tool(
name="calculate",
description="Perform mathematical calculations",
function=calculator_function
)
# Create agent
agent = Agent(
llm=OpenAIChatGenerator(model="gpt-4o"),
tools=[search_tool, calculator_tool],
max_iterations=5
)
result = agent.run("What was our Q3 revenue and how does it compare to Q2?")Hayhooks converts Haystack pipelines into REST APIs:
pip install hayhooks# Save your pipeline
rag_pipeline.dump("rag_pipeline.yaml")# Start the Hayhooks server
hayhooks run
# Deploy your pipeline
hayhooks pipeline deploy rag_pipeline.yaml --name ragYour pipeline is now accessible via HTTP:
curl -X POST http://localhost:8000/rag \
-H "Content-Type: application/json" \
-d '{
"embedder": {"text": "What is our vacation policy?"},
"prompt_builder": {"question": "What is our vacation policy?"}
}'Hayhooks can expose your pipeline as an OpenAI-compatible API:
hayhooks pipeline deploy rag_pipeline.yaml \
--name rag \
--openai-compatibleThis allows existing OpenAI client code to work with your Haystack pipeline:
from openai import OpenAI
client = OpenAI(base_url="http://localhost:8000/v1")
response = client.chat.completions.create(
model="rag", # Your pipeline name
messages=[{"role": "user", "content": "What is our vacation policy?"}]
)Pipelines can be serialized for version control and deployment:
# Save to YAML
rag_pipeline.dump("pipelines/rag_v1.yaml")
# Load from YAML
from haystack import Pipeline
loaded_pipeline = Pipeline.load("pipelines/rag_v1.yaml")
# Serialize to dict (for database storage)
pipeline_dict = rag_pipeline.to_dict()
# Reconstruct from dict
reconstructed = Pipeline.from_dict(pipeline_dict)For fully managed deployments, deepset offers a cloud platform:
This is particularly relevant for enterprises that need managed infrastructure without the operational overhead.
From production deployments, here are the issues that trip up teams:
OpenSearch requires complex passwords. This will fail:
# FAILS: Password too simple
store = OpenSearchDocumentStore(
http_auth=("admin", "admin")
)This works:
# Works: Complex password
store = OpenSearchDocumentStore(
http_auth=("admin", "MyP@ssw0rd!2024")
)The password must contain uppercase, lowercase, numbers, and special characters.
Do NOT create OpenSearch/Elasticsearch indices manually:
# DON'T DO THIS
curl -X PUT "localhost:9200/my-index"Haystack creates indices with specific mappings for vector fields. Let the DocumentWriter create the index on first write:
# DO THIS - let Haystack create the index
writer = DocumentWriter(document_store=store)
writer.run(documents=docs) # Index created automaticallyWhen you change embedding models, you MUST recreate your index:
# Old embeddings (384 dimensions)
old_embedder = SentenceTransformersDocumentEmbedder(
model="sentence-transformers/all-MiniLM-L6-v2"
)
# New embeddings (768 dimensions)
new_embedder = SentenceTransformersDocumentEmbedder(
model="sentence-transformers/all-mpnet-base-v2"
)
# CRITICAL: Delete and recreate index!
document_store.delete_index()
# Re-run indexing pipeline with new embedderMixing vectors from different models produces meaningless similarity scores.
Haystack agents can crash on malformed tool outputs. Always wrap tool functions:
def safe_tool_function(input_data):
try:
result = actual_function(input_data)
return {"status": "success", "result": result}
except Exception as e:
return {"status": "error", "message": str(e)}Document embedders load the entire model into memory. For large-scale indexing:
# Process in batches
BATCH_SIZE = 100
for i in range(0, len(documents), BATCH_SIZE):
batch = documents[i:i + BATCH_SIZE]
indexing_pipeline.run({"embedder": {"documents": batch}})Production RAG requires evaluation. Haystack provides built-in components for this.
from haystack.components.evaluators import (
FaithfulnessEvaluator,
ContextRelevanceEvaluator,
SASEvaluator # Semantic Answer Similarity
)
# Evaluate faithfulness (is the answer grounded in context?)
faithfulness = FaithfulnessEvaluator()
result = faithfulness.run(
questions=["What is our vacation policy?"],
contexts=[retrieved_contexts],
responses=["You get 20 days of PTO per year."]
)
print(f"Faithfulness score: {result['score']}")
# Evaluate context relevance
relevance = ContextRelevanceEvaluator()
result = relevance.run(
questions=["What is our vacation policy?"],
contexts=[retrieved_contexts]
)
print(f"Context relevance: {result['score']}")Build an evaluation pipeline that runs alongside your RAG pipeline:
from haystack import Pipeline
eval_pipeline = Pipeline()
eval_pipeline.add_component("faithfulness", FaithfulnessEvaluator())
eval_pipeline.add_component("relevance", ContextRelevanceEvaluator())
eval_pipeline.add_component("similarity", SASEvaluator())
# Run evaluation
eval_result = eval_pipeline.run({
"faithfulness": {
"questions": questions,
"contexts": contexts,
"responses": responses
},
"relevance": {
"questions": questions,
"contexts": contexts
},
"similarity": {
"responses": responses,
"ground_truths": expected_answers
}
})For more comprehensive evaluation, integrate with RAGAS:
from ragas import evaluate
from ragas.metrics import faithfulness, answer_relevancy, context_precision
# Convert Haystack results to RAGAS format
ragas_dataset = {
"question": questions,
"contexts": [[c] for c in contexts],
"answer": responses,
"ground_truth": expected_answers
}
# Run RAGAS evaluation
scores = evaluate(
ragas_dataset,
metrics=[faithfulness, answer_relevancy, context_precision]
)
print(scores)Here is a production-ready RAG system with Haystack:
"""
Complete Haystack RAG Implementation
Production-ready with error handling, logging, and evaluation
"""
import os
import logging
from pathlib import Path
from haystack import Pipeline, Document
from haystack.document_stores.in_memory import InMemoryDocumentStore
from haystack.components.converters import TextFileToDocument
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.embedders import (
SentenceTransformersDocumentEmbedder,
SentenceTransformersTextEmbedder
)
from haystack.components.retrievers.in_memory import InMemoryEmbeddingRetriever
from haystack.components.builders import PromptBuilder
from haystack.components.generators import OpenAIGenerator
from haystack.components.writers import DocumentWriter
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Configuration
EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2"
LLM_MODEL = "gpt-4o"
CHUNK_SIZE = 200
CHUNK_OVERLAP = 20
TOP_K = 5
class HaystackRAG:
"""Production-ready RAG implementation with Haystack."""
def __init__(self):
self.document_store = InMemoryDocumentStore(
embedding_similarity_function="cosine"
)
self.indexing_pipeline = self._build_indexing_pipeline()
self.query_pipeline = self._build_query_pipeline()
def _build_indexing_pipeline(self) -> Pipeline:
"""Build the document indexing pipeline."""
pipeline = Pipeline()
# Add components
pipeline.add_component(
"converter",
TextFileToDocument()
)
pipeline.add_component(
"splitter",
DocumentSplitter(
split_by="word",
split_length=CHUNK_SIZE,
split_overlap=CHUNK_OVERLAP
)
)
pipeline.add_component(
"embedder",
SentenceTransformersDocumentEmbedder(model=EMBEDDING_MODEL)
)
pipeline.add_component(
"writer",
DocumentWriter(document_store=self.document_store)
)
# Connect the pipeline
pipeline.connect("converter", "splitter")
pipeline.connect("splitter", "embedder")
pipeline.connect("embedder", "writer")
return pipeline
def _build_query_pipeline(self) -> Pipeline:
"""Build the RAG query pipeline."""
template = """
You are a helpful assistant. Answer the question based on the provided context.
If the context does not contain enough information to answer, say so clearly.
Always cite which source documents informed your answer.
Context:
{% for document in documents %}
---
Source: {{ document.meta.get('file_path', 'Unknown') }}
{{ document.content }}
{% endfor %}
Question: {{ question }}
Answer:"""
pipeline = Pipeline()
# Add components
pipeline.add_component(
"embedder",
SentenceTransformersTextEmbedder(model=EMBEDDING_MODEL)
)
pipeline.add_component(
"retriever",
InMemoryEmbeddingRetriever(
document_store=self.document_store,
top_k=TOP_K
)
)
pipeline.add_component(
"prompt_builder",
PromptBuilder(template=template)
)
pipeline.add_component(
"generator",
OpenAIGenerator(
model=LLM_MODEL,
generation_kwargs={"temperature": 0.7}
)
)
# Connect the pipeline
pipeline.connect("embedder.embedding", "retriever.query_embedding")
pipeline.connect("retriever.documents", "prompt_builder.documents")
pipeline.connect("prompt_builder", "generator")
return pipeline
def index_documents(self, file_paths: list[str]) -> int:
"""
Index documents from file paths.
Args:
file_paths: List of paths to text files
Returns:
Number of document chunks indexed
"""
logger.info(f"Indexing {len(file_paths)} files...")
result = self.indexing_pipeline.run({
"converter": {"sources": file_paths}
})
docs_written = result["writer"]["documents_written"]
logger.info(f"Indexed {docs_written} document chunks")
return docs_written
def index_text(self, texts: list[str], metadata: list[dict] = None) -> int:
"""
Index raw text directly.
Args:
texts: List of text strings to index
metadata: Optional metadata for each text
Returns:
Number of document chunks indexed
"""
if metadata is None:
metadata = [{"source": f"text_{i}"} for i in range(len(texts))]
documents = [
Document(content=text, meta=meta)
for text, meta in zip(texts, metadata)
]
# Run only splitter, embedder, writer
splitter = DocumentSplitter(
split_by="word",
split_length=CHUNK_SIZE,
split_overlap=CHUNK_OVERLAP
)
embedder = SentenceTransformersDocumentEmbedder(model=EMBEDDING_MODEL)
writer = DocumentWriter(document_store=self.document_store)
split_docs = splitter.run(documents=documents)["documents"]
embedded_docs = embedder.run(documents=split_docs)["documents"]
result = writer.run(documents=embedded_docs)
logger.info(f"Indexed {result['documents_written']} document chunks")
return result["documents_written"]
def query(self, question: str) -> dict:
"""
Query the RAG system.
Args:
question: The question to answer
Returns:
Dict with 'answer', 'sources', and 'documents'
"""
logger.info(f"Processing query: {question[:50]}...")
result = self.query_pipeline.run({
"embedder": {"text": question},
"prompt_builder": {"question": question}
})
answer = result["generator"]["replies"][0]
documents = result["retriever"]["documents"]
# Extract sources
sources = list(set(
doc.meta.get("file_path", doc.meta.get("source", "Unknown"))
for doc in documents
))
return {
"answer": answer,
"sources": sources,
"documents": documents
}
def save_pipelines(self, directory: str):
"""Save pipelines to YAML for deployment."""
Path(directory).mkdir(parents=True, exist_ok=True)
self.indexing_pipeline.dump(f"{directory}/indexing.yaml")
self.query_pipeline.dump(f"{directory}/query.yaml")
logger.info(f"Pipelines saved to {directory}")
def visualize(self, output_dir: str = "."):
"""Generate pipeline visualizations."""
self.indexing_pipeline.draw(f"{output_dir}/indexing_pipeline.png")
self.query_pipeline.draw(f"{output_dir}/query_pipeline.png")
logger.info(f"Pipeline diagrams saved to {output_dir}")
def main():
"""Example usage of the Haystack RAG system."""
# Initialize
rag = HaystackRAG()
# Sample documents (in production, load from files)
sample_docs = [
"""
Company Vacation Policy
All full-time employees are entitled to 20 days of paid time off (PTO)
per calendar year. PTO accrues at a rate of 1.67 days per month.
Unused PTO can be carried over to the next year, up to a maximum of
5 days. PTO requests must be submitted at least 2 weeks in advance
for periods longer than 3 consecutive days.
""",
"""
Remote Work Guidelines
Employees may work remotely up to 3 days per week with manager approval.
Remote work days must be scheduled in advance and logged in the HR system.
Employees are expected to be available during core hours (10 AM - 3 PM)
regardless of work location. Home office equipment stipends of up to $500
are available for eligible employees.
""",
"""
Expense Reimbursement Policy
Business expenses must be submitted within 30 days of the expense date.
Receipts are required for all expenses over $25. Meals during travel are
reimbursed up to $75 per day. Flights should be booked through the
company travel portal. Personal expenses are not eligible for reimbursement.
"""
]
metadata = [
{"source": "hr_handbook", "section": "vacation"},
{"source": "hr_handbook", "section": "remote_work"},
{"source": "hr_handbook", "section": "expenses"}
]
# Index documents
rag.index_text(sample_docs, metadata)
# Query the system
questions = [
"How many vacation days do employees get?",
"Can I work from home?",
"What is the meal reimbursement limit for travel?"
]
for question in questions:
print(f"\n{'='*60}")
print(f"Question: {question}")
print("="*60)
result = rag.query(question)
print(f"\nAnswer: {result['answer']}")
print(f"\nSources: {', '.join(result['sources'])}")
# Visualize pipelines (optional)
# rag.visualize()
if __name__ == "__main__":
main()Haystack's pipeline-centric architecture offers distinct advantages for enterprise RAG deployments:
Strengths:
Trade-offs:
When to Choose Haystack:
This article covered Haystack's approach to RAG. Continue with the series:
For production deployments, also explore:
This is Part 4 of the "Building RAG Systems: A Platform-by-Platform Guide" series. Next up: Semantic Kernel: RAG in the Microsoft Ecosystem.
Discover more content: