What You'll Build
A production RAG pipeline using Haystack with identity-based authentication for document stores, retrievers, and embeddings. No API keys, no credential rotation, no cascading failures.
Installation
pip install haystack-ai
pip install haystack-xlink
Basic RAG Pipeline
Replace your existing document store authentication with xLink identity:
from haystack import Pipeline
from haystack.components.retrievers import InMemoryBM25Retriever
from haystack.components.generators import OpenAIGenerator
from haystack_xlink import XLinkDocumentStore, XLinkEmbedder
# Initialize document store with identity (no API key)
document_store = XLinkDocumentStore(
connection_id="my-rag-pipeline",
index_name="documents"
)
# Create RAG pipeline with identity-based components
pipeline = Pipeline()
pipeline.add_component("retriever", InMemoryBM25Retriever(document_store=document_store))
pipeline.add_component("embedder", XLinkEmbedder(model="text-embedding-ada-002"))
pipeline.add_component("generator", OpenAIGenerator(model="gpt-4"))
# Connect components
pipeline.connect("retriever.documents", "generator.documents")
# Run query (all authentication handled automatically)
result = pipeline.run({
"retriever": {"query": "What is information-theoretic security?"},
"generator": {"query": "What is information-theoretic security?"}
})
print(result["generator"]["replies"][0])
What Just Happened: The document store, embedder, and generator all authenticated using cryptographic identity. No API keys were configured. No credentials were stored.
Production RAG with Vector Search
Build a production pipeline with Pinecone, Weaviate, or any vector database:
from haystack import Pipeline
from haystack.components.retrievers import EmbeddingRetriever
from haystack.components.generators import OpenAIGenerator
from haystack_xlink import (
XLinkPineconeDocumentStore,
XLinkEmbedder,
XLinkGenerator
)
# Initialize Pinecone with identity (replaces API key authentication)
document_store = XLinkPineconeDocumentStore(
connection_id="production-rag",
index_name="knowledge-base",
dimension=1536,
metric="cosine"
)
# Create production pipeline
pipeline = Pipeline()
pipeline.add_component(
"embedder",
XLinkEmbedder(
model="text-embedding-ada-002",
connection_id="openai-embeddings"
)
)
pipeline.add_component(
"retriever",
EmbeddingRetriever(document_store=document_store)
)
pipeline.add_component(
"generator",
XLinkGenerator(
model="gpt-4",
connection_id="openai-generation"
)
)
# Connect pipeline
pipeline.connect("embedder.embedding", "retriever.query_embedding")
pipeline.connect("retriever.documents", "generator.documents")
# Run query
result = pipeline.run({
"embedder": {"text": "How does threshold sharing work?"},
"generator": {
"query": "How does threshold sharing work?",
"generation_kwargs": {"temperature": 0.7}
}
})
print(result["generator"]["replies"][0])
Indexing Documents
Add documents to your vector store using identity-based authentication:
from haystack import Document
from haystack_xlink import XLinkDocumentStore, XLinkEmbedder
# Initialize components
document_store = XLinkDocumentStore(connection_id="doc-indexer")
embedder = XLinkEmbedder(model="text-embedding-ada-002")
# Create documents
documents = [
Document(content="XorIDA achieves information-theoretic security."),
Document(content="Threshold sharing eliminates single points of failure."),
Document(content="Identity-based auth prevents cascading failures.")
]
# Generate embeddings and index
for doc in documents:
embedding = embedder.run(text=doc.content)
doc.embedding = embedding["embedding"]
document_store.write_documents(documents)
Multi-Source RAG
Query multiple document stores with a single identity:
from haystack import Pipeline
from haystack.components.joiners import DocumentJoiner
from haystack_xlink import (
XLinkPineconeDocumentStore,
XLinkWeaviateDocumentStore,
XLinkGenerator
)
# Initialize multiple stores (all with same identity)
pinecone_store = XLinkPineconeDocumentStore(
connection_id="multi-source-rag",
index_name="technical-docs"
)
weaviate_store = XLinkWeaviateDocumentStore(
connection_id="multi-source-rag",
index_name="user-content"
)
# Create multi-source pipeline
pipeline = Pipeline()
pipeline.add_component("pinecone_retriever", EmbeddingRetriever(document_store=pinecone_store))
pipeline.add_component("weaviate_retriever", EmbeddingRetriever(document_store=weaviate_store))
pipeline.add_component("joiner", DocumentJoiner())
pipeline.add_component("generator", XLinkGenerator(model="gpt-4"))
# Connect components
pipeline.connect("pinecone_retriever.documents", "joiner.documents")
pipeline.connect("weaviate_retriever.documents", "joiner.documents")
pipeline.connect("joiner.documents", "generator.documents")
# Query both stores
result = pipeline.run({
"pinecone_retriever": {"query_embedding": query_embedding},
"weaviate_retriever": {"query_embedding": query_embedding},
"generator": {"query": "What are the security guarantees?"}
})
Migration from API Keys
Replace existing Haystack code in 3 steps:
Before (API Keys)
from haystack.document_stores import PineconeDocumentStore
from haystack.nodes import EmbeddingRetriever, OpenAIAnswerGenerator
# Requires API keys in environment
document_store = PineconeDocumentStore(
api_key=os.environ["PINECONE_API_KEY"],
index="my-index"
)
retriever = EmbeddingRetriever(
document_store=document_store,
embedding_model="text-embedding-ada-002",
api_key=os.environ["OPENAI_API_KEY"]
)
generator = OpenAIAnswerGenerator(
api_key=os.environ["OPENAI_API_KEY"]
)
After (xLink Identity)
from haystack_xlink import (
XLinkPineconeDocumentStore,
XLinkEmbedder,
XLinkGenerator
)
# No API keys needed
document_store = XLinkPineconeDocumentStore(
connection_id="my-connection",
index_name="my-index"
)
embedder = XLinkEmbedder(model="text-embedding-ada-002")
generator = XLinkGenerator(model="gpt-4")
Migration Impact: Remove all API key configuration. No changes to pipeline logic. No changes to query/response handling. All existing code works unchanged.
Error Handling
xLink uses structured Result types for safe error handling:
from haystack_xlink import XLinkDocumentStore
document_store = XLinkDocumentStore(connection_id="my-store")
# Result type pattern (ok/error)
result = document_store.write_documents(documents)
if result.ok:
print(f"Indexed {len(result.value)} documents")
else:
print(f"Error: {result.error.code}")
print(f"Message: {result.error.message}")
# Field-level validation errors (if present)
if result.error.fields:
for field, error in result.error.fields.items():
print(f" {field}: {error}")
Configuration
Configure connection behavior via environment variables:
# Connection timeout (default: 30s)
XLINK_TIMEOUT=60
# Retry configuration
XLINK_MAX_RETRIES=3
XLINK_RETRY_BACKOFF=2.0
# Connection pool
XLINK_POOL_SIZE=10
XLINK_POOL_MAX_OVERFLOW=20
# Security mode (simple/secure)
XLINK_SECURITY_MODE=secure
Why This Matters
Traditional RAG pipelines have critical failure modes that xLink eliminates:
- Cascading Failures: One expired OpenAI key can restart 500 document indexing jobs simultaneously. xLink identity never expires.
- Credential Sprawl: Each vector DB + embedder + LLM requires separate API keys. xLink uses one identity for all services.
- Rotation Downtime: Rotating Pinecone keys requires coordinated updates across every RAG pipeline. xLink requires no rotation.
- Performance: 603× faster authentication (91ms vs 54,853ms). Critical for high-throughput indexing.
Production Benefits
- No Secret Management: No Vault, no AWS Secrets Manager, no rotation policies
- Zero Downtime Scaling: Add RAG replicas without credential distribution
- Audit Trail: Every document store access is cryptographically signed
- Multi-Tenant Isolation: Each tenant gets separate connection identity
Supported Components
xLink integrations for all major Haystack components:
XLinkPineconeDocumentStore- Pinecone vector searchXLinkWeaviateDocumentStore- Weaviate vector databaseXLinkElasticsearchDocumentStore- Elasticsearch hybrid searchXLinkEmbedder- OpenAI/Cohere/Hugging Face embeddingsXLinkGenerator- OpenAI/Anthropic/Gemini generationXLinkRanker- Cohere reranking
Complete Example
Full production RAG system with monitoring and error handling:
from haystack import Pipeline
from haystack.components.retrievers import EmbeddingRetriever
from haystack_xlink import (
XLinkPineconeDocumentStore,
XLinkEmbedder,
XLinkGenerator,
XLinkRanker
)
# Initialize production components
document_store = XLinkPineconeDocumentStore(
connection_id="production-rag",
index_name="knowledge-base",
dimension=1536
)
embedder = XLinkEmbedder(
model="text-embedding-ada-002",
connection_id="openai-embed"
)
retriever = EmbeddingRetriever(
document_store=document_store,
top_k=20
)
ranker = XLinkRanker(
model="rerank-english-v2.0",
connection_id="cohere-rerank",
top_k=5
)
generator = XLinkGenerator(
model="gpt-4",
connection_id="openai-gen"
)
# Build pipeline
pipeline = Pipeline()
pipeline.add_component("embedder", embedder)
pipeline.add_component("retriever", retriever)
pipeline.add_component("ranker", ranker)
pipeline.add_component("generator", generator)
# Connect components
pipeline.connect("embedder.embedding", "retriever.query_embedding")
pipeline.connect("retriever.documents", "ranker.documents")
pipeline.connect("ranker.documents", "generator.documents")
# Production query with error handling
def query_rag(question: str) -> dict:
result = pipeline.run({
"embedder": {"text": question},
"generator": {
"query": question,
"generation_kwargs": {
"temperature": 0.7,
"max_tokens": 500
}
}
})
if not result.ok:
print(f"Pipeline error: {result.error.message}")
return {"error": result.error.code}
return {
"answer": result["generator"]["replies"][0],
"sources": result["ranker"]["documents"]
}
# Run query
response = query_rag("How does threshold sharing eliminate cascading failures?")
Next Steps
- Read the xLink Core Documentation to understand identity-based authentication
- Explore Haystack Integration White Paper for architecture details
- See All AI Framework Integrations
Production Ready: This integration is live in production RAG systems processing 10M+ queries/month. Zero cascading failures. Zero credential rotations. 603× faster authentication.