Advanced Production-Grade
RAG Pipeline Implementation
Building enterprise-ready retrieval-augmented generation systems with semantic search, adaptive policies, and self-correction loops
Embedding Models Self-Correction Loops MLOps
22 comprehensive sections covering architecture, implementation, and production deployment
What is RAG?
RAG is not a single model but a complete AI application architecture that combines retrieval systems with language models to ground responses in external knowledge.
Three Pillars
- ✓ Multi-stage Retrieval — Hybrid, sparse, dense, and ranked retrieval
- ✓ Adaptive Policies — Context-aware retrieval strategies
- ✓ Self-Correction Loops — Reflection and iterative refinement
Enterprise Risks
Beyond hallucinations, consider:
- ⚠ Permission leakage
- ⚠ Prompt injection attacks
- ⚠ Data poisoning
- ⚠ Unbounded cost/latency
- ⚠ Silent quality regressions
Use Cases
- 📄 Employee Knowledge Work — Internal docs, wikis
- 🤝 Customer Support — FAQs, tickets, logs
- 📊 Structured+Unstructured — Reports, databases, forms
- 🎨 Multimodal Knowledge — PDFs, images, videos
Indexing Plane
Off-line: Data ingestion, parsing, chunking, embedding, and vector storage. Built once, queried many times.
Serving Plane
On-line: Query processing, retrieval, reranking, LLM inference, and safety checks. Low-latency, high-throughput.
Full Architecture: Two Planes + Governance
Complete end-to-end RAG architecture with indexing, serving, and governance layers
IndexingPipeline
QueryPipeline
Document Ingestion: Connectors + Contracts
Reliable data onboarding with standardized contracts, multi-format support, and three-speed indexing
Canonical Document Contract
Every document must conform to this schema for consistent retrieval and governance:
Connector Types & Data Sources
Enterprise Document Parsing
Apache Tika, Unstructured.io — Parse PDFs, DOCX, images with layout preservation and OCR support
Structured Data
CRM, ERP, Databases — Direct queries or treat as "tool use" for on-demand retrieval; knowledge views
Streaming & CDC
Debezium → Kafka — Real-time event streams from databases; capture inserts, updates, deletes
Web Content
Crawlers with Compliance — Respect robots.txt, rate limits, GDPR; extract HTML/JSON with link tracking
Multimodal Sources
OCR + Image Embeddings — Extract text from images, create vision embeddings; preserve layout
Custom Connectors
Plugin API — Implement standardized interface for proprietary systems, internal APIs, legacy apps
Three-Speed Indexing Model
Batch Rebuilds
Full reindexing of large datasets weekly/monthly; highest throughput, controlled resources. Use for bulk imports, historical data.
Incremental Upserts
Append new chunks, update modified docs via change detection; moderate latency (seconds). Triggered by scheduled jobs or webhooks.
Real-Time Streams
Event-driven CDC or message queue ingestion; sub-second latency for hot data. Use for live chat logs, sensor feeds, user events.
Element-Aware PDF Parsing
Extract with positional metadata (page, bbox, reading order). Preserve table structure, preserve images. Enables citation anchoring.
Dead Letter Queue Pattern
Send unparseable docs to DLQ for manual inspection; enable retry with fallback parsers or human review. Never silently drop data.
ProductionIngester Example
Chunking Strategies
Chunk for retrieval (findability) and store separate representations for generation (readability)
| Strategy | Description | Best For | Trade-offs |
|---|---|---|---|
| Fixed-Size | Split at token/word boundary | Predictable, simple baseline | May split sentences; low semantic coherence |
| Recursive | Split recursively by delimiters (newline, paragraph, sentence) | Structured documents, code | Still may cut semantically important boundaries |
| Semantic | Embed sentences, split at embedding distance threshold | Narrative text, research papers | Expensive; latency + cost for embedding all chunks |
| Document-Structure | Respect sections, headings, tables, code blocks | Mixed-format documents (PDFs, Markdown) | Requires parser awareness |
| Agentic/LLM | Use LLM to decide breaks and chunk metadata | Complex domain logic, multilingual | High cost and latency; not real-time |
| Sliding Window | Overlapping fixed-size chunks with stride | Preserve local context, boundary queries | Higher storage; redundant retrieval |
| Parent-Child (Sentence-Window) | Store fine-grained chunks; expand with surrounding context at retrieval | Precision + context balance | Requires two-stage retrieval; complex indexing |
SemanticChunker: Embedding Similarity Breakpoints
- • Chunk size: 256–512 tokens (optimal for retrieval + generation trade-off)
- • Overlap: 10–15% to preserve boundary context
- • Metadata inheritance: Propagate doc_id, section, source to every chunk
- • Context enrichment: Prepend section headers or document title to chunk
- • Element-aware parsing: Preserve tables, code blocks, images as intact units in PDFs
Recommended: Hybrid Multi-Layer Chunking for Production
No single chunking strategy works for all document types. Production systems use a document-type router that selects the best chunking strategy per document, combined with a parent-child indexing pattern that stores small chunks for precise retrieval but returns larger context windows for generation.
Production Chunking Pipeline
class ProductionChunkingPipeline:
"""Route-aware, parent-child, metadata-enriched."""
def __init__(self):
self.router = DocTypeRouter()
self.parsers = {
"pdf": UnstructuredParser(strategy="hi_res"),
"markdown": MarkdownHeaderSplitter(),
"html": HTMLSectionSplitter(),
"code": ASTChunker(), # tree-sitter
"plaintext": SemanticChunker(),
}
self.semantic = SemanticChunker(
model="all-MiniLM-L6-v2",
max_tokens=384, # child chunk size
threshold=0.5,
)
def process(self, doc: Document) -> list[Chunk]:
# Step 1: Route to parser
doc_type = self.router.classify(doc)
elements = self.parsers[doc_type].parse(doc)
# Step 2: Create parent sections
parents = self.group_into_sections(elements)
# Step 3: Split parents into child chunks
all_chunks = []
for parent in parents:
children = self.semantic.split(parent.text)
for i, child_text in enumerate(children):
chunk = Chunk(
text=child_text,
parent_id=parent.id,
parent_text=parent.text, # stored separately
position=i,
metadata=self.enrich(doc, parent, child_text),
)
all_chunks.append(chunk)
return all_chunks
def enrich(self, doc, parent, text):
"""Prepend context + propagate metadata."""
return {
"doc_id": doc.id,
"source": doc.source,
"section": parent.heading,
"page": parent.page_num,
"doc_type": doc.content_type,
"indexed_at": datetime.utcnow(),
# Prepended for better retrieval:
"enriched_text": (
f"{doc.title} > {parent.heading}\n"
f"{text}"
),
}Parent-Child Retrieval at Query Time
class ParentChildRetriever:
def search(self, query, top_k=5):
# 1. Search CHILD chunks (precise)
children = self.vector_db.search(
query, top_k=top_k * 3 # over-fetch
)
# 2. Expand to PARENT sections
parent_ids = set(c.parent_id for c in children)
parents = self.doc_store.get_parents(parent_ids)
# 3. Deduplicate + rank parents by
# best child match score
scored = {}
for child in children:
pid = child.parent_id
if pid not in scored or child.score > scored[pid]:
scored[pid] = child.score
ranked = sorted(
parents, key=lambda p: scored[p.id],
reverse=True
)[:top_k]
return ranked # full parent contextChunk Size Guide by Document Type
| Doc Type | Child (Search) | Parent (Context) | Strategy |
|---|---|---|---|
| Product docs | 128–256 tok | 512–1024 tok | Heading-based + semantic |
| Legal / Policy | 256–384 tok | 1024–2048 tok | Section-based, keep clauses intact |
| Research papers | 256–512 tok | 1024–2048 tok | Semantic breakpoints |
| FAQ / KB | Whole Q&A pair | Same (no parent) | Question-Answer as unit |
| Code | Function/class | File or module | AST-aware (tree-sitter) |
| Chat logs | Single turn | Full conversation | Turn-based splitting |
| Tables / CSV | Row group | Full table + header | Keep header with every chunk |
Why Parent-Child Wins
Problem: Small chunks retrieve precisely but lose context. Large chunks give context but pollute retrieval with irrelevant text.
Solution: Index small (128–256 tok) for search precision. At retrieval time, expand to the parent section (512–1024 tok) for coherent LLM context. Best of both worlds.
Context Enrichment (Prepending)
Prepend the document title and section heading to each chunk before embedding. This dramatically improves retrieval for ambiguous queries.
# Without enrichment:
"Returns are accepted within 30 days."
# With enrichment:
"Product Policy > Returns & Refunds\n"
"Returns are accepted within 30 days."
# Now retrieves for "return policy" queriesTools for Production
Parsing: unstructured.io (hi_res), LlamaParse, Docling
Splitting: LangChain RecursiveCharacterTextSplitter, LlamaIndex SentenceWindowNodeParser
Semantic: Sentence Transformers + custom breakpoint
Code: tree-sitter (AST), CodeSplitter
Parent-Child: LlamaIndex ParentDocumentRetriever, custom doc_store + vector_db combo
Embedding Models & Strategies
| Model Family | Type | Notable Capabilities | Operational Considerations | Cost/Latency |
|---|---|---|---|---|
| OpenAI text-embedding-3 | API | small/large variants; dimension shortening; multilingual | Quota limits; regional latency | $0.02/M tokens (small); higher for large |
| Cohere Embed v3/v4 | API | Multilingual + multimodal (text+image); fine-tuning available | Document and query encoding modes | $0.10/1M tokens |
| BGE-M3 | Open-source (HuggingFace) | Multi-lingual multi-function (dense+sparse+multi-vector) | 8192 token context; self-hosted overhead | Free; requires GPU infrastructure |
| Multilingual E5 | Open-source | Strong multilingual; published training/eval methodology | Community-maintained; good reproducibility | Free; 2–5ms per chunk on A100 |
| GTE-Qwen2 (7B) | Open-source | State-of-the-art; 131K context window | Larger model; requires more VRAM | Free; ~20ms/chunk on A100 |
| voyage-3-large | API | Long-context (128K) + code understanding | Premium pricing; excellent for code RAG | $0.15/1M tokens |
| nomic-embed-text-v1.5 | Open-source | Matryoshka embeddings; dimension flexibility | Efficient storage; truncation-stable | Free; 3–4ms latency on CPU |
EmbeddingService: Caching, Rate-Limiting, Batch Processing
Matryoshka Embeddings
Truncate high-dimensional embeddings to lower dimensions without retraining. Trade-off: smaller vectors (10–20% storage savings) vs. slight accuracy loss.
Fine-Tuning with Contrastive Learning
Train embeddings on domain-specific relevance pairs using triplet loss. Improves domain-specific retrieval by 15–30% with 5K–50K labeled pairs.
Instruction-Tuned Embeddings
Prepend task instructions ("Retrieve document for query: ") to asymmetrically encode queries vs. documents. Boosts retrieval by leveraging prompt tuning.
Vector Database Selection
FAISS is a similarity search library, not a networked vector database. Production RAG requires distributed, replicable systems.
| Database | Architecture | Key Features | Scaling Model | Ops Burden |
|---|---|---|---|---|
| FAISS | In-memory library | Highest performance; no persistence | Single-node only | High (build/rebuild cycles) |
| Milvus | Distributed (K8s native) | Multi-replica, auto-sharding, metadata filtering | Horizontal (scale nodes) | High (K8s expertise required) |
| Pinecone | Managed SaaS | Serverless, metadata filtering, pod-type scaling | Serverless (auto) | Low (fully managed) |
| Weaviate | Hybrid (vector+BM25) | Combined dense/sparse search, replication controls | Cluster-based | Medium |
| Chroma | Lightweight SQLite/in-memory | Simple API; good for prototypes | Single-node | Low (dev only) |
| Elasticsearch | Existing infra (if already deployed) | Dense vectors + BM25 + analytics | Cluster-based | Medium |
| pgvector | PostgreSQL extension | SQL + vectors; ACID transactions | Postgres replication | Medium |
Qdrant/Milvus Production Config: HNSW + Quantization + Replication
Key Design Decisions
- HNSW vs IVF: HNSW faster recall, IVF better for billion-scale; prefer HNSW for sub-100M datasets
- Quantization: 8-bit scalar quantization saves 4x memory with <2% recall loss; essential for cost control
- Namespaces/Partitions: Isolate indices by tenant, project, or time period for multi-tenancy and retention
- Replication: RF=3 minimum for production SLA; prevents single-point failures
- TTL & Garbage Collection: Auto-expire old chunks; configure cleanup policies for cost
- Backup & Point-in-Time Recovery: Daily snapshots; test restore procedures quarterly
Query Transformation — From One Query to Many
Users ask vague, ambiguous, or narrowly-worded questions. A single embedding of that raw query often misses relevant chunks. Query transformation rewrites, decomposes, and expands the user's query into multiple targeted search queries — dramatically improving chunk filtering and retrieval quality.
Six Query Transformation Strategies
1. Query Rewriting
LLM rewrites the query to be clearer and more search-friendly. Fixes typos, expands abbreviations, makes implicit context explicit.
# Input: "how 2 fix auth"
# Output: "How to troubleshoot and fix
# authentication errors"
prompt = f"""Rewrite this query to be
clearer for a search engine.
Fix typos, expand abbreviations.
Query: {query}"""When: Always. First step in every pipeline. Cheap and fast (~50ms with Haiku).
2. Multi-Query Expansion
Generate 3–5 diverse reformulations targeting different vocabulary, specificity levels, and perspectives.
# Input: "fix auth errors"
# Output:
# - "authentication failure troubleshoot"
# - "401 403 OAuth token expired"
# - "login session invalid API key"
# - "how to debug access denied"When: Ambiguous or broad queries. Biggest recall improvement (15–30%). See deep-dive in Retrieval section.
3. Step-Back Prompting
Generate a higher-level abstract query to retrieve foundational context, then the specific query for details.
# Input: "why does JWT expire in 15min"
# Step-back: "JWT token lifecycle and
# security best practices"
# Then search BOTH queries:
# → foundational + specific chunksWhen: "Why" questions, conceptual queries. Provides background context the LLM needs to reason.
4. HyDE (Hypothetical Document)
Ask the LLM to generate a hypothetical answer, embed THAT, and search for similar real documents. Bridges the query-document embedding gap.
# Input: "fix auth errors"
# LLM generates hypothetical doc:
hypo = "To fix authentication errors,
first check if your OAuth token
has expired. Refresh using the
/auth/refresh endpoint..."
# Embed hypo → search → find real docs
# that are SIMILAR to this answerWhen: Technical queries where query language differs from document language. Adds ~300ms latency.
5. Query Decomposition
Break multi-part or complex questions into atomic sub-queries, retrieve for each independently, then merge.
# Input: "compare pricing of Plan A
# vs Plan B and which has
# better support"
# Decompose into:
# Q1: "Plan A pricing details"
# Q2: "Plan B pricing details"
# Q3: "Plan A support features"
# Q4: "Plan B support features"When: Compound questions, comparisons, multi-entity queries. Critical for completeness.
6. Metadata Filter Extraction
Extract structured filters (date, category, product, region) from the query to narrow the search pool BEFORE vector search.
# Input: "2024 return policy for EU"
# Extract:
# - filter: year=2024
# - filter: region=EU
# - query: "return policy"
# → pre-filter chunks THEN embed searchWhen: Queries with temporal, geographic, or categorical constraints. Dramatically reduces search pool.
Recommended Production Strategy: Adaptive Query Transform
Don't apply all strategies to every query — that's wasteful and slow. Instead, classify the query complexity and apply the minimum transformation needed. Simple factual queries need only rewriting; complex multi-part queries need decomposition + expansion.
AdaptiveQueryTransformer — Production Implementation
class AdaptiveQueryTransformer:
"""Classify query → apply minimum transform.
Simple queries: just rewrite (50ms).
Complex queries: full pipeline (200-400ms)."""
def __init__(self, llm, fast_llm):
self.llm = llm # strong model
self.fast = fast_llm # Haiku / mini
self.classifier = QueryClassifier()
self.cache = TransformCache(ttl=3600)
async def transform(self, query: str) -> TransformResult:
# Check cache first
cached = self.cache.get(query)
if cached:
return cached
# Step 1: Classify query complexity
qtype = self.classifier.classify(query)
# Step 2: Route to appropriate strategy
if qtype == "simple_factual":
# "What's the return policy?" → just rewrite
queries = [await self.rewrite(query)]
filters = self.extract_filters(query)
elif qtype == "ambiguous":
# "fix auth" → rewrite + expand
rewritten = await self.rewrite(query)
expanded = await self.expand(query, n=3)
queries = [rewritten] + expanded
filters = self.extract_filters(query)
elif qtype == "compound":
# "compare A vs B pricing + support"
sub_queries = await self.decompose(query)
queries = sub_queries
filters = self.extract_filters(query)
elif qtype == "conceptual":
# "why does X happen?" → step-back + specific
abstract = await self.step_back(query)
queries = [query, abstract]
filters = {}
elif qtype == "technical":
# Technical jargon → HyDE + expand
hyde_doc = await self.generate_hyde(query)
expanded = await self.expand(query, n=2)
queries = [query] + expanded
hyde_queries = [hyde_doc] # separate embed
filters = self.extract_filters(query)
else: # fallback: rewrite + 2 expansions
queries = [query] + await self.expand(query, 2)
filters = {}
result = TransformResult(
original=query,
queries=queries,
filters=filters,
strategy=qtype,
)
self.cache.set(query, result)
return resultQuery Classification — Route to Strategy
| Query Type | Example | Strategy | Latency |
|---|---|---|---|
| Simple factual | "What's the return policy?" | Rewrite only | ~50ms |
| Ambiguous | "fix auth errors" | Rewrite + Expand(3) | ~200ms |
| Compound | "compare A vs B pricing + support" | Decompose into sub-Qs | ~250ms |
| Conceptual | "why does JWT expire?" | Step-back + specific | ~150ms |
| Technical | "CORS preflight 403 nginx" | HyDE + Expand(2) | ~400ms |
| Lookup | "order #12345 status" | Extract ID → direct DB | ~5ms |
Query Classifier Implementation
class QueryClassifier:
"""Fast classifier: embedding + rules.
~5ms. No LLM call needed."""
def classify(self, query: str) -> str:
# Rule-based fast path
if re.match(r"(order|tracking|#)\s*\d+", query):
return "lookup"
if "vs" in query or "compare" in query:
return "compound"
if query.startswith(("why", "how does", "explain")):
return "conceptual"
if len(query.split()) <= 6:
return "ambiguous"
# Embedding-based classifier for rest
emb = self.encoder.encode(query)
pred = self.classifier_model.predict(emb)
return pred # SetFit / fine-tunedMetadata Filter Extraction — Pre-Filter Before Vector Search
Extract structured constraints from the query to narrow the chunk pool BEFORE embedding search. This dramatically improves precision for queries with temporal, categorical, or entity-specific constraints.
class FilterExtractor:
"""Extract structured filters from query.
Runs in parallel with query expansion."""
def extract(self, query: str) -> dict:
filters = {}
# Temporal: "2024", "last month", "recent"
date = self.parse_date(query)
if date:
filters["date_after"] = date
# Category: "pricing", "support", "API"
category = self.classify_topic(query)
if category:
filters["doc_type"] = category
# Entity: product names, plan names
entities = self.ner.extract(query)
if entities:
filters["entities"] = entities
# Region: "EU", "US", "APAC"
region = self.detect_region(query)
if region:
filters["region"] = region
return filters
# Applied to vector search:
# db.search(query_emb, filters=filters)
# → searches ONLY chunks matching filtersWhy this matters:
Without filters, "2024 EU return policy" searches ALL chunks and relies on the embedding to distinguish 2024 EU docs from 2023 US docs. Embeddings are bad at temporal and geographic precision. Pre-filtering narrows the pool from 10M chunks to maybe 50K — making vector search both faster and more accurate.
| Filter Type | Example | Extraction Method |
|---|---|---|
| Temporal | "2024", "this week", "latest" | Regex + dateparser |
| Category | "pricing", "API docs", "FAQ" | Topic classifier |
| Entity | Product names, plan names | NER (spaCy / custom) |
| Region | "EU", "US", "Germany" | Regex + geo lookup |
| Language | Query language detection | langdetect / fasttext |
| Access level | User's role / permissions | Session context (ACL) |
Raw single query: Recall@5 = 62% | + Rewrite: 68% (+6%) | + Multi-Query Expand: 82% (+20%) | + Metadata Filters: 87% (+5%) | + Cross-Encoder Rerank: 94% (+7%) | Total lift: +32 percentage points
Latency Strategy — Generating 5 Queries in <50ms
The naive approach — call an LLM to generate 5 queries — takes 200–400ms. That's unacceptable for real-time voice agents or low-latency search. Here are four production strategies to get multi-query expansion down to <50ms.
Strategy 1: Template-Based Expansion (5ms)
No LLM call at all. Use rule-based templates that generate query variants from the original query using synonym dictionaries, regex patterns, and structural transformations.
class TemplateExpander:
"""Zero-LLM query expansion. ~5ms.
Generates 5 variants using rules."""
def __init__(self):
self.synonyms = SynonymDict.load("domain_synonyms.json")
self.stopwords = set(["the", "a", "is", "how", "do", "I"])
def expand(self, query: str) -> list[str]:
tokens = query.lower().split()
keywords = [t for t in tokens if t not in self.stopwords]
variants = [query] # always include original
# V1: Synonym swap (most impactful)
for kw in keywords:
if kw in self.synonyms:
syn = self.synonyms[kw][0]
variants.append(query.replace(kw, syn))
break # one swap per variant
# V2: Keyword-only (drop question words)
variants.append(" ".join(keywords))
# V3: Reversed keyword order
variants.append(" ".join(reversed(keywords)))
# V4: Add domain context prefix
variants.append(f"documentation: {query}")
return variants[:5]
# Example:
# Input: "how do I fix auth errors"
# Output: [
# "how do I fix auth errors", # original
# "how do I fix authentication errors", # synonym
# "fix auth errors", # keywords-only
# "errors auth fix", # reversed
# "documentation: how do I fix auth errors" # prefixed
# ]Pros: Zero latency, zero cost, deterministic. Cons: Limited diversity, no semantic understanding. Best for: First-pass expansion while LLM results are pending.
Strategy 2: Fine-Tuned Small Model (10–30ms)
Distill a large LLM's query expansion capability into a small local model (T5-small, FLAN-T5-base, or a 60M-param custom model). Runs on CPU in 10–30ms.
from transformers import T5ForConditionalGeneration
class LocalQueryExpander:
"""Fine-tuned T5-small for query expansion.
~15ms on CPU. No API call."""
def __init__(self):
self.model = T5ForConditionalGeneration.from_pretrained(
"./models/query-expander-t5-small"
)
self.tokenizer = AutoTokenizer.from_pretrained(
"./models/query-expander-t5-small"
)
def expand(self, query: str, n=5) -> list[str]:
prompt = f"expand query: {query}"
inputs = self.tokenizer(prompt, return_tensors="pt")
outputs = self.model.generate(
**inputs,
num_return_sequences=n,
num_beams=n,
max_new_tokens=64,
do_sample=False,
)
return [
self.tokenizer.decode(o, skip_special_tokens=True)
for o in outputs
]
# Training data: 50K (query, expansion) pairs
# generated by GPT-4/Claude from prod logs.
# Fine-tune T5-small for 3 epochs. ~2hrs on 1 GPU.Pros: Fast, free at inference, semantic-aware. Cons: Requires training, model maintenance. Best for: High-QPS production systems.
Strategy 3: Pre-Computed Cache (0ms hit / 300ms miss)
Cache LLM-generated expansions by normalized query. First request is slow; all subsequent identical or near-identical queries are instant. Use semantic similarity for fuzzy cache matching.
class SemanticExpansionCache:
"""Cache LLM expansions. 0ms on hit.
Semantic fuzzy matching for near-dupes."""
def __init__(self, redis, encoder, llm):
self.redis = redis # exact cache
self.encoder = encoder # for fuzzy match
self.index = FAISSIndex() # query embedding index
self.llm = llm # fallback generator
async def get_expansions(self, query: str) -> list[str]:
# L1: Exact match (Redis, ~0.1ms)
key = hashlib.md5(query.lower().encode()).hexdigest()
cached = self.redis.get(key)
if cached:
return json.loads(cached)
# L2: Semantic fuzzy match (~2ms)
q_emb = self.encoder.encode(query)
hits = self.index.search(q_emb, top_k=1)
if hits and hits[0].score > 0.95:
# "fix auth errors" ≈ "fix authentication errors"
return self.redis.get(hits[0].id)
# L3: Cache miss → generate (async, don't block)
expansions = await self.llm.expand(query)
self.redis.setex(key, 3600, json.dumps(expansions))
self.index.add(q_emb, key)
return expansionsHit rate: 40–70% for most production systems (users ask similar questions). Semantic matching pushes this to 60–85%.
★ Strategy 4: Hybrid — The Recommended Approach
Combine all three: serve template-generated queries instantly (5ms), check cache for LLM-quality expansions (0ms if hit), and fire-and-forget an async LLM call to upgrade the cache for next time.
class HybridQueryExpander:
"""5ms P95 response. Best quality over time.
Template → Cache → Async LLM backfill."""
def __init__(self):
self.template = TemplateExpander() # 5ms
self.cache = SemanticExpansionCache() # 0ms hit
self.llm = LLMExpander() # 300ms
async def expand(self, query: str) -> list[str]:
# Phase 1: Instant (5ms) — always available
template_variants = self.template.expand(query)
# Phase 2: Cache check (0–2ms)
cached = await self.cache.get(query)
if cached:
# Merge template + cached LLM variants
return self.dedupe(cached + template_variants)[:5]
# Phase 3: Return templates NOW,
# fire async LLM to backfill cache
asyncio.create_task(
self._async_backfill(query)
)
return template_variants # 5ms total
async def _async_backfill(self, query):
"""Runs in background. Next identical
query will get LLM-quality expansions."""
try:
expansions = await self.llm.expand(query)
await self.cache.set(query, expansions)
except Exception:
pass # template fallback is fineResult: First request gets template variants in 5ms. Second request gets LLM-quality variants from cache in 0ms. No user ever waits for the LLM.
Complete Latency Breakdown — Query Transform Pipeline
| Step | Operation | Latency | Runs | Can Parallelize? |
|---|---|---|---|---|
| Classify | Rule-based + embedding classifier | ~3ms | Always | — |
| Template expand | Synonym swap, keyword extract, prefix | ~2ms | Always | — |
| Cache lookup | Redis exact + FAISS semantic | ~2ms | Always | ✓ parallel with templates |
| Filter extract | Regex + NER for metadata | ~5ms | Always | ✓ parallel with above |
| LLM expand | Haiku/mini generate 5 variants | ~300ms | Cache miss only | Async (fire-and-forget) |
| HyDE generate | Hypothetical doc generation | ~400ms | Technical queries only | Async (fire-and-forget) |
| Total user-facing latency (hybrid) | 5–15ms P95 | LLM runs async, result cached for next request | ||
Warm-Up Strategy
Pre-populate the expansion cache by running your top 10,000 queries from production logs through the LLM expander offline. This gives instant cache hits for the most common queries from day one.
# Offline warm-up script
for query in top_10k_queries:
expansions = await llm.expand(query)
cache.set(query, expansions)
# Run nightly. ~$3 for 10K queries.Batch LLM Calls
If you must call an LLM synchronously, batch multiple queries into a single request. Generate all 5 variants in one prompt (not 5 separate calls). This cuts 5×300ms to 1×350ms.
# One call, 5 variants:
prompt = f"""Generate 5 diverse search
queries for: "{query}"
Return as JSON array."""
# → 1 API call ≈ 300ms
# NOT: 5 calls × 300ms = 1.5s ❌Streaming + Speculative
Start retrieval with template queries immediately. If LLM expansions arrive (from cache or async), merge them into the result set before reranking. The LLM expansions enrich, never block.
# Speculative parallel execution
template_results = retrieve(template_qs)
# If LLM expansions arrive in time:
llm_results = retrieve(llm_qs) # bonus
merged = rrf_merge(template_results, llm_results)
# If not: template results alone are fine① Classify query type (3ms, rule-based)
② Generate template variants (2ms, synonym + keyword)
③ Check semantic cache for LLM variants (2ms, Redis + FAISS) — in parallel with ②
④ Extract metadata filters (5ms, regex + NER) — in parallel with ②③
⑤ If cache miss: fire-and-forget async LLM call to backfill cache for next time
⑥ Return template+cached variants immediately → start retrieval
Total: 5–15ms P95. User never waits for LLM. Quality improves over time as cache fills.
Advanced Retrieval Strategies
Sparse, dense, and hybrid retrieval each encode different failure modes; hybrid retrieval fuses signals.
Retrieval Strategy Patterns
Hybrid Search (Dense + Sparse)
Run BM25 and vector search in parallel; fuse results via Reciprocal Rank Fusion (RRF) or weighted sum.
Multi-Query Expansion
LLM generates 3–5 diverse rephrased queries targeting different aspects of the user's question. Retrieve for each, then merge and deduplicate. Detailed deep-dive below.
HyDE (Hypothetical Document Embeddings)
LLM generates hypothetical document for query; embed it; search nearest neighbors. Bridges intent-execution gap.
Query Routing
Classify query intent; route to specialized indices (e.g., FAQ vs. technical docs). Faster and more precise.
Parent Document Retrieval
Retrieve fine-grained child chunks; expand with parent (full section). Balance precision + context.
Step-Back Prompting
Ask "What high-level concept does this question ask?"; retrieve abstract info first; then detailed.
Metadata Filtering
Pre-filter chunks by date, source, or category before vector search. Reduce retrieval pool; improve relevance.
Contextual Compression
Retrieve top-K; use LLM to extract relevant sentences. Reduce context window; increase token efficiency.
Learned Sparse (SPLADE)
SPLADE-family models: learned sparse vectors; interpretable term weights; combines dense + sparse strengths.
HybridRetriever Example
Multi-Query Expansion — Deep Dive
A single user query often captures only one perspective of what they need. Multi-Query Expansion uses an LLM to generate diverse reformulations that target different angles, vocabulary, and levels of specificity — then retrieves for each and merges results. This typically improves Recall@K by 15–30%.
Production MultiQueryExpander
class MultiQueryExpander:
PROMPT = """Generate {n} diverse search queries
for the user question below. Each query should
target a DIFFERENT aspect:
- One using technical terms / error codes
- One using simple plain language
- One asking the "why" behind the issue
- One focused on the solution / fix
User question: {query}
Return as JSON array of strings."""
def __init__(self, llm, n_queries=4):
self.llm = llm
self.n = n_queries
self.cache = QueryExpansionCache(ttl=3600)
async def expand(self, query: str) -> list[str]:
# Check cache first (same query = same expansions)
cached = self.cache.get(query)
if cached:
return cached
result = await self.llm.generate(
self.PROMPT.format(n=self.n, query=query),
model="claude-haiku-4-5-20251001", # fast + cheap
temperature=0.7, # some diversity
)
variants = json.loads(result)
# Always include original query
all_queries = [query] + variants[:self.n]
self.cache.set(query, all_queries)
return all_queriesMulti-Query Retriever with RRF Fusion
class MultiQueryRetriever:
def __init__(self, expander, retriever, reranker):
self.expander = expander
self.retriever = retriever
self.reranker = reranker
async def search(self, query, top_k=5):
# Step 1: Expand query
queries = await self.expander.expand(query)
# Step 2: Parallel retrieval for all variants
all_results = await asyncio.gather(*[
self.retriever.search(q, top_k=top_k * 2)
for q in queries
])
# Step 3: Reciprocal Rank Fusion
fused = self.rrf_merge(all_results, k=60)
# Step 4: Rerank against ORIGINAL query
# (not the variants!)
reranked = self.reranker.rerank(
query=query, # original intent
candidates=fused[:top_k * 3],
top_k=top_k
)
return reranked
def rrf_merge(self, result_lists, k=60):
"""Reciprocal Rank Fusion across all
query variants."""
scores = {}
for results in result_lists:
for rank, doc in enumerate(results):
if doc.id not in scores:
scores[doc.id] = 0
scores[doc.id] += 1.0 / (k + rank)
return sorted(scores.items(),
key=lambda x: x[1], reverse=True)Expansion Strategies
Synonym expansion: Replace key terms with alternatives ("auth" → "authentication", "login")
Specificity ladder: Abstract ("security issue") + specific ("OAuth 2.0 token expired 401")
Perspective shift: Problem ("auth fails") + solution ("fix authentication") + cause ("why does token expire")
Domain injection: Add domain context ("in Kubernetes" or "for REST API")
When NOT to Use
Exact-match queries: Order ID lookups, SKU searches, specific error codes — expansion adds noise
Low-latency paths: Adds ~200–400ms for LLM expansion. Use only when retrieval quality matters more than speed
Small corpus (<1K docs): Expansion just returns the same docs repeatedly. Not worth the cost
Production Optimizations
Cache expansions: Same query → same variants. 1-hour TTL covers repeated queries
Use cheapest LLM: Haiku/GPT-4o-mini for expansion (~$0.001 per query)
Parallel retrieval: Run all variant searches simultaneously with asyncio.gather
Rerank against original: Always rerank using the ORIGINAL query, not variants — variants help recall, reranking restores precision
Low-Latency Retrieval — Hitting <30ms P95
For real-time chat and voice agents, the entire retrieval pipeline (query → embed → search → filter → return chunks) must complete in under 30ms P95. Here's how production systems achieve this.
Embedding Latency — <5ms
The query embedding step is on the critical path. Every millisecond counts.
# Strategy: Pre-warm + GPU + small model
class FastEmbedder:
def __init__(self):
# Use small model: 384d, ~3ms on GPU
self.model = SentenceTransformer(
"all-MiniLM-L6-v2",
device="cuda"
)
# Pre-warm: run dummy inference
self.model.encode("warmup")
# ONNX quantized for CPU-only deploys:
# self.model = ORTModel("model.onnx")
# → ~5ms on CPU vs ~15ms PyTorch
def encode(self, text: str):
return self.model.encode(
text, normalize_embeddings=True
)Options: all-MiniLM-L6 (3ms GPU), ONNX quantized (5ms CPU), Matryoshka 256d (2ms, -1% quality), API (10-20ms + network).
Vector Search — <10ms at Scale
HNSW indexes deliver sub-10ms search even at 10M+ vectors. Key: tune ef_search, keep quantized index in RAM.
# Qdrant: tuned for low latency
collection_config = {
"vectors": {
"size": 384, # small = faster
"distance": "Cosine",
},
"hnsw_config": {
"m": 16, # graph density
"ef_construct": 200, # build quality
},
"quantization_config": {
"scalar": {
"type": "int8", # 4x smaller
"always_ram": True, # no disk IO
}
},
"on_disk_payload": True, # metadata on disk
}
# Search params:
search_params = {
"hnsw_ef": 64, # lower = faster (vs 128)
"exact": False, # ANN, not brute force
}
# Result: ~5ms for 10M vectors, int8 quantizedParallel Hybrid Search
Run BM25 and vector search simultaneously. Both return in ~5ms. RRF merge takes ~1ms. Total hybrid: ~6ms vs 10ms serial.
# Parallel hybrid: 6ms total
dense, sparse = await asyncio.gather(
vector_db.search(q_emb, top_k=50),
bm25_index.search(q_text, top_k=50),
)
fused = rrf_merge(dense, sparse)
# NOT: dense = await ...; sparse = await ...
# That's serial: 5+5 = 10ms ❌Retrieval Result Cache
Cache the final retrieved chunks by normalized query hash. 30–50% hit rate for production systems. 0ms on hit.
# Redis retrieval cache
key = md5(normalize(query) + user_acl)
cached = redis.get(key)
if cached:
return json.loads(cached) # 0ms
# ACL in key prevents cross-user leakage
# TTL: 15min (balance freshness vs speed)Connection Pooling
Cold connections to vector DB add 20–50ms. Pool connections and keep them warm. Use gRPC over HTTP for lower overhead.
# Qdrant gRPC connection pool
client = QdrantClient(
url="qdrant:6334",
prefer_grpc=True, # not REST
grpc_options={
"grpc.keepalive_time_ms": 10000,
},
)
# Pre-warm: send dummy search on startup✓ Small embedding model (384d, GPU or ONNX quantized) — saves 10ms vs large model
✓ Int8 quantized HNSW index, always in RAM — saves 5–20ms vs disk
✓ Parallel BM25 + vector search with asyncio.gather — saves 5ms vs serial
✓ gRPC connection pooling to vector DB — saves 20–50ms cold start
✓ Retrieval result cache with ACL-aware keys (15min TTL) — 0ms on 30–50% of queries
✓ FlashRank fast reranker instead of cross-encoder for first pass — 5ms vs 50ms
✓ Metadata pre-filtering to reduce search pool before vector search
✓ Lower ef_search (64 vs 128) for HNSW — ~2ms savings, <1% recall drop
Reranking & Relevance Scoring
Quality Improvement Pipeline
| Reranker | Type | Latency | Accuracy | Cost / Deployment |
|---|---|---|---|---|
| Cohere Rerank v3.5 | API cross-encoder | 50–150ms | SOTA | $0.001 / 1000 queries |
| Jina Reranker v2 | API | 100–200ms | Excellent | $0.0005 / query |
| cross-encoder/ms-marco | Open-source HF | 5–20ms (A100) | Good (BERT-base) | Free; self-hosted |
| BGE Reranker v2.5 | Open-source HF | 10–30ms (A100) | Very Good | Free; self-hosted |
| RankGPT (LLM-based) | LLM proxy | 200ms–1s | SOTA (model-dependent) | API cost; slow |
| FlashRank (tiny) | Open-source distilled | 2–5ms (CPU) | Acceptable (70–80%) | Free; ultra-fast |
MultiStageReranker: Cascade Strategy
Prompt Engineering & Generation
"Prompting is a contract between retrieval and generation" — context discipline, citations, and answer modes matter.
Production RAG Prompt Template
RAGGenerator: Streaming, Fallback, Confidence Gating
Streaming (SSE/WebSocket)
Return tokens as they arrive, not end-to-end. Target <500ms TTFT (time-to-first-token). Improves perceived latency and UX.
Citation Extraction
Parse [Source N] references; validate against retrieved chunks. Enable user verification; prevent hallucinated citations.
Fallback Strategy
Route to cheaper/faster model if retrieval confidence is low. Use strong model only when context is rich. Optimize cost/quality.
Self-RAG decides whether retrieval is needed per token; critiques its own outputs. Studies show 10–15% improvements in factuality by selective retrieval. Implement using token-level confidence scores from the LLM.
LLM Orchestration Policies
- Model Routing: Classify query complexity; route simple queries to fast model (GPT-3.5), complex to strong model (GPT-4). Save 50%+ on inference cost.
- Caching Layers: Cache responses by normalized query + context hash. ACL-sensitive keys (per-user); 24-72h TTL. Reduces latency and cost for repeated queries.
- Hallucination Mitigation Toolbox: Use retrieval-augmented verification (CTRL), confidence thresholds, structured output format (JSON schema), and post-generation fact-checking against context.
Response Evaluation Layer
In production, the LLM alone is NOT trusted. Every response passes through a parallel evaluation layer — grounding verification, intent alignment, safety moderation, and confidence scoring — all within 50–200ms.
1. Grounding Check — Deep Dive
The grounding check is the single most important validator in a production RAG system. It verifies that every claim in the LLM's response is actually supported by the retrieved context — catching hallucinations before they reach the user.
Tier 1: Embedding Similarity
Fastest check (~5–15ms). Runs on every response. Converts answer + context chunks to embeddings, measures cosine similarity.
from sentence_transformers import SentenceTransformer
import numpy as np
class EmbeddingGrounder:
def __init__(self):
self.model = SentenceTransformer(
"all-MiniLM-L6-v2" # 384d, fast
)
def check(self, answer, chunks):
a_emb = self.model.encode(answer)
c_embs = self.model.encode(
[c.text for c in chunks]
)
# Max similarity across chunks
scores = np.dot(c_embs, a_emb) / (
np.linalg.norm(c_embs, axis=1)
* np.linalg.norm(a_emb)
)
score = float(scores.max())
if score > 0.75:
return Grounded(score)
elif score > 0.5:
return Ambiguous(score) # → T2
else:
return Hallucinated(score)Tools: FAISS, pgvector, Sentence Transformers, HuggingFace Embeddings, OpenAI text-embedding-3-small
Tier 2: Cross-Encoder / NLI
More accurate (~30–80ms). Only runs on ambiguous T1 results. Uses Natural Language Inference to classify each claim as entailed, neutral, or contradicted by context.
from transformers import pipeline
class NLIGrounder:
def __init__(self):
self.nli = pipeline(
"text-classification",
model="cross-encoder/"
"nli-deberta-v3-large"
)
def check(self, answer, context):
# Split answer into claims
claims = self.extract_claims(answer)
results = []
for claim in claims:
pred = self.nli(
f"{context} [SEP] {claim}"
)
label = pred[0]["label"]
# entailment/neutral/contradiction
results.append((claim, label))
contradictions = [
c for c, l in results
if l == "contradiction"
]
return NLIResult(
grounded=len(contradictions)==0,
flagged_claims=contradictions
)Models: DeBERTa-v3-large-NLI, cross-encoder/nli-MiniLM, BART-large-MNLI, Cohere Rerank v3.5
Tier 3: LLM-as-Judge
Most flexible (~300–800ms). Only runs on disputed claims from T2. Performs claim-by-claim verification with explicit reasoning.
class LLMGrounder:
PROMPT = """Verify each claim against
the context. For each claim, respond:
SUPPORTED / NOT SUPPORTED / PARTIAL
Context: {context}
Claims to verify:
{claims}
Respond as JSON:
[{{"claim": "...", "verdict": "...",
"evidence": "...", "confidence": 0.0}}]
"""
async def check(self, claims, ctx):
result = await self.llm.generate(
self.PROMPT.format(
context=ctx,
claims="\n".join(claims)
),
model="claude-haiku-4-5-20251001",
# Use cheap fast model
temperature=0,
)
verdicts = json.loads(result)
unsupported = [
v for v in verdicts
if v["verdict"] != "SUPPORTED"
]
return LLMVerdict(
grounded=len(unsupported)==0,
unsupported_claims=unsupported
)Models: Claude Haiku (cheapest), GPT-4o-mini, Gemini Flash, Llama 3.1 8B (self-hosted)
Production Grounding Service (Cascading)
class ProductionGroundingService:
"""Cascading grounding: fast → accurate → LLM
P95 latency: ~20ms (80% exit at T1)"""
def __init__(self):
self.t1 = EmbeddingGrounder() # ~10ms
self.t2 = NLIGrounder() # ~50ms
self.t3 = LLMGrounder() # ~500ms
self.metrics = GroundingMetrics()
async def verify(self, answer, chunks, query):
# Tier 1: Embedding (always runs)
t1 = self.t1.check(answer, chunks)
self.metrics.record("t1", t1.score)
if t1.score > 0.75:
return GroundingResult(
grounded=True, tier=1,
score=t1.score
)
if t1.score < 0.4:
return GroundingResult(
grounded=False, tier=1,
score=t1.score,
action="regenerate"
)
# Tier 2: NLI (ambiguous zone 0.4–0.75)
claims = self.extract_claims(answer)
t2 = self.t2.check(answer, chunks)
self.metrics.record("t2", t2)
if t2.grounded:
return GroundingResult(
grounded=True, tier=2
)
if len(t2.flagged_claims) == 0:
return GroundingResult(
grounded=True, tier=2
)
# Tier 3: LLM judge (disputed claims only)
t3 = await self.t3.check(
t2.flagged_claims,
"\n".join(c.text for c in chunks)
)
self.metrics.record("t3", t3)
return GroundingResult(
grounded=t3.grounded, tier=3,
unsupported=t3.unsupported_claims,
action="regenerate" if not t3.grounded else None
)Tools & Libraries Comparison
| Tool | Type | Latency | Best For |
|---|---|---|---|
| Sentence Transformers | Embedding | ~5ms | T1 — fast similarity |
| FAISS | Vector index | ~1ms | Batch embedding lookup |
| pgvector | Postgres ext | ~5ms | SQL-native similarity |
| DeBERTa-v3 NLI | Cross-encoder | ~50ms | T2 — NLI classification |
| BART-large-MNLI | NLI model | ~40ms | T2 — zero-shot NLI |
| Cohere Rerank | API reranker | ~60ms | T2 — relevance scoring |
| Claude Haiku | LLM API | ~400ms | T3 — claim verification |
| GPT-4o-mini | LLM API | ~500ms | T3 — claim verification |
| Guardrails AI | Framework | varies | Orchestrate all tiers |
| RAGAS | Eval framework | offline | Measure faithfulness |
| TruLens | Eval+trace | offline | Groundedness monitoring |
| DeepEval | CI eval | offline | Hallucination CI gate |
How the 80 / 15 / 5 Cascading Exit Works
In production, you do NOT run all three tiers on every response. Instead, you cascade: the fast cheap check runs first, and only ambiguous results escalate to the next tier. This is why 80% of requests cost ~10ms and only 5% ever hit the expensive LLM judge.
Tier 1 Exit (80%) — Clear Match
Most RAG answers closely paraphrase the retrieved context. Embedding similarity catches these trivially.
# Example: clear grounding
Context: "Returns accepted within 30 days
of purchase with original receipt."
Answer: "You can return items within 30 days
if you have the original receipt."
cosine_similarity = 0.91 # > 0.75
# → PASS at Tier 1. No further checks.
# Latency: ~10ms. Cost: $0.00.This covers: direct paraphrasing, factual restatement, simple summarization, exact quotes, and minor rewording. The embedding model captures semantic equivalence without needing deeper reasoning.
Tier 2 Escalation (15%) — Ambiguous Zone
When the answer uses different vocabulary or adds inference, embeddings give a middling score. NLI resolves the ambiguity.
# Example: inference from context
Context: "Premium members get free shipping
on orders over $50."
Answer: "As a premium member, your $75 order
qualifies for free shipping."
cosine_similarity = 0.62 # ambiguous zone
# → Escalate to Tier 2
NLI("Premium members get free shipping
on orders over $50",
"$75 order qualifies for free shipping")
# → entailment (0.94 confidence)
# → PASS at Tier 2. Latency: ~60ms.This covers: logical inference, numerical reasoning ("$75 > $50"), conditional application, combining info from multiple chunks, and contextual deduction.
Tier 3 Escalation (5%) — Disputed Claims
When NLI returns "neutral" (neither entailed nor contradicted) or there are mixed verdicts across claims, the LLM judge arbitrates.
# Example: mixed/complex claim
Context: "The product is available in blue
and red. Ships within 3-5 days."
Answer: "The product comes in blue, red, and
green. Usually arrives in a week."
T1 cosine_similarity = 0.58 # ambiguous
T2 NLI:
"blue and red" → entailment ✓
"green" → neutral ⚠️ # not in ctx
"arrives in a week" → neutral ⚠️
# → Escalate disputed claims to Tier 3
LLM Judge:
"green": NOT SUPPORTED # hallucination!
"week": PARTIAL # 3-5 days ≈ week
# → REJECT "green", accept "week"
# → Strip hallucinated claim, regenerateWhy This Works — The Math
The cascade works because most RAG answers are well-grounded (the retrieval pipeline already found relevant context). Only edge cases need expensive verification.
| Metric | All T3 | Cascade | Savings |
|---|---|---|---|
| Avg latency | 500ms | 40ms | 12.5x faster |
| P50 latency | 500ms | 10ms | 50x faster |
| P95 latency | 800ms | 60ms | 13x faster |
| Cost / 1K queries | $0.50 | $0.03 | 16x cheaper |
| Hallucination catch | ~98% | ~96% | -2% (acceptable) |
Key insight: You trade ~2% hallucination detection rate for a 12x latency reduction and 16x cost reduction. For the remaining 2%, user feedback loops and offline evaluation catch regressions.
Tuning the Thresholds — Production Guidance
T1 Pass Threshold (default: 0.75)
Raise to 0.80–0.85 for high-stakes domains (medical, legal, financial). Lower to 0.65–0.70 for casual Q&A where speed matters more. Tune by measuring T2/T3 escalation rate — if <5% escalate, threshold is too low.
T1 Reject Threshold (default: 0.4)
Below this, the answer is clearly unrelated to context — skip T2/T3 and regenerate immediately. Raise to 0.5 for stricter domains. Monitor false-rejection rate via user feedback.
T2→T3 Escalation (default: any contradiction)
Only escalate if T2 finds "contradiction" (not just "neutral"). Neutral means the context doesn't address the claim — which might be acceptable for partial answers. Tune per use case.
# Threshold config per use case
GROUNDING_CONFIG = {
"default": {
"t1_pass": 0.75, "t1_reject": 0.40,
"t2_escalate_on": ["contradiction"],
},
"medical": {
"t1_pass": 0.85, "t1_reject": 0.50, # stricter
"t2_escalate_on": ["contradiction", "neutral"], # always verify
},
"casual_qa": {
"t1_pass": 0.65, "t1_reject": 0.35, # faster
"t2_escalate_on": ["contradiction"], # only clear issues
},
}2. Intent Check — Response Matches User Intent
Verifies the response actually addresses what the user asked. Catches drift where the model answers a different question entirely.
User: "Track my order" → Answer: "Here are some shoes you may like" — intent mismatch!
# Intent alignment pipeline
class IntentAlignmentChecker:
def check(self, query, response):
# Classify both through intent model
query_intent = self.intent_model.predict(query)
response_intent = self.intent_model.predict(response)
# Or use embedding similarity
q_emb = self.encoder.encode(query)
r_emb = self.encoder.encode(response)
similarity = cosine_similarity(q_emb, r_emb)
if similarity < 0.8:
return IntentResult(
aligned=False,
query_intent=query_intent,
response_intent=response_intent
)Common intent models: Rasa, SetFit, fine-tuned classifiers. For production voice agents, embedding-based intent similarity with threshold >0.8 is fastest.
3. Safety Check — Content Moderation
Prevents unsafe or policy-violating responses: illegal instructions, abusive content, financial advice risks, policy violations.
A. Moderation Models
# Dedicated safety classifiers
result = moderation_api.classify(response)
# Output: {"violence": false, "hate": false, "self_harm": false}
if any(result.values()):
return block_response()B. Rule Engine
Hard rules for regulated domains: refund policies, medical/financial advice, guaranteed outcomes. Example: if answer contains "guaranteed profit" → reject.
C. Guardrail Frameworks
Production libraries: Guardrails AI, NeMo Guardrails. Enforce content policies, structured outputs, and safe responses declaratively.
4. Confidence Score — Final Decision Engine
Aggregates all evaluator scores into a weighted confidence signal. Detailed deep-dive below.
Confidence Score — How It's Calculated
The confidence engine is the final gate before a response reaches the user. It takes raw scores from every evaluator, normalizes them, applies domain-specific weights, and produces a single decision: pass, retry, or fallback.
Production ConfidenceEngine Implementation
class ConfidenceEngine:
def __init__(self, config: DomainConfig):
self.weights = config.weights
self.thresholds = config.thresholds
self.veto_rules = config.veto_rules
def calculate(self, scores: EvalScores) -> Decision:
# Step 1: Check hard veto rules first
for rule in self.veto_rules:
if rule.triggered(scores):
return Decision(
action="REJECT",
reason=rule.name,
confidence=0.0,
vetoed=True
)
# Step 2: Weighted aggregation
raw_score = sum(
self.weights[k] * getattr(scores, k)
for k in self.weights
)
# Step 3: Apply penalty for low-scoring
# individual signals (even if weighted
# average is high)
penalty = 0.0
for k, threshold in self.thresholds.min_per_signal.items():
val = getattr(scores, k)
if val < threshold:
gap = threshold - val
penalty += gap * 0.5 # 50% of gap
final = max(0.0, raw_score - penalty)
# Step 4: Map to decision
if final >= self.thresholds.pass_threshold:
return Decision("PASS", final)
elif final >= self.thresholds.retry_threshold:
return Decision("RETRY", final)
else:
return Decision("FALLBACK", final)Why These Weights?
| Signal | Weight | Rationale |
|---|---|---|
| Grounding | 0.30 | Highest — a hallucinated answer is the #1 failure mode. If grounding fails, nothing else matters. |
| Retrieval | 0.25 | If retrieval quality is low, the LLM is working with bad context. Garbage in → garbage out. |
| Intent | 0.15 | Answering the wrong question is bad but less dangerous than hallucinating facts. |
| Safety | 0.10 | Low weight in formula BUT has a hard veto — any safety flag = instant reject regardless of score. |
| Citation | 0.10 | Verifies source attribution. Important for trust but not critical for correctness. |
| Freshness | 0.10 | Only matters for temporal queries. Many questions are time-independent. |
Veto Rules — Hard Overrides
Certain conditions bypass the weighted score entirely and force an immediate reject. No amount of high scores elsewhere can compensate.
VETO_RULES = [
VetoRule("unsafe_content",
lambda s: s.safety < 0.5),
VetoRule("severe_hallucination",
lambda s: s.grounding < 0.3),
VetoRule("pii_leakage",
lambda s: s.pii_detected),
VetoRule("citation_fraud",
lambda s: s.citation_valid_pct < 0.5),
VetoRule("blocked_topic",
lambda s: s.blocked_content),
]
# If ANY veto fires → instant REJECT
# regardless of weighted scoreWorked Examples — Three Scenarios
Scenario A — PASS
"What's your return policy?"
Retrieval: 0.88 × 0.25 = 0.220
Intent: 0.95 × 0.15 = 0.143
Safety: 1.00 × 0.10 = 0.100
Citation: 0.90 × 0.10 = 0.090
Fresh: 1.00 × 0.10 = 0.100
─────────────────
Total: 0.926 → Penalty: 0
Decision: PASS ✓
Scenario B — RETRY
"Compare Plan A vs Plan B pricing"
Retrieval: 0.70 × 0.25 = 0.175
Intent: 0.90 × 0.15 = 0.135
Safety: 1.00 × 0.10 = 0.100
Citation: 0.40 × 0.10 = 0.040
Fresh: 1.00 × 0.10 = 0.100
─────────────────
Raw: 0.736 | Penalty: -0.05
Final: 0.686 → RETRY ↻
Retry with more context chunks
Scenario C — VETO REJECT
"Show me other users' orders"
Retrieval: 0.80 × 0.25 = 0.200
Intent: 0.92 × 0.15 = 0.138
Safety: 0.20 × 0.10 = 0.020
─── VETO TRIGGERED ───
safety < 0.5 → unsafe_content
Decision: REJECT ⚠
Even though weighted=0.85 would pass,
veto overrides. Response blocked.
Domain-Specific Weight Profiles
Different use cases need different weight distributions. A medical chatbot prioritizes grounding above all else; a casual Q&A bot prioritizes speed and intent alignment.
| Domain | Grounding | Retrieval | Intent | Safety | Citation | Fresh | Pass | Retry |
|---|---|---|---|---|---|---|---|---|
| General Q&A | 0.30 | 0.25 | 0.15 | 0.10 | 0.10 | 0.10 | >0.85 | >0.60 |
| Medical / Legal | 0.40 | 0.20 | 0.10 | 0.15 | 0.10 | 0.05 | >0.90 | >0.70 |
| E-commerce | 0.25 | 0.20 | 0.20 | 0.10 | 0.10 | 0.15 | >0.82 | >0.55 |
| Voice Agent | 0.30 | 0.25 | 0.20 | 0.10 | 0.05 | 0.10 | >0.80 | >0.55 |
| Internal Docs | 0.25 | 0.30 | 0.15 | 0.05 | 0.15 | 0.10 | >0.80 | >0.55 |
| Financial | 0.35 | 0.20 | 0.10 | 0.15 | 0.10 | 0.10 | >0.92 | >0.75 |
# Config per domain
DOMAIN_CONFIGS = {
"medical": DomainConfig(
weights={"grounding": 0.40, "retrieval": 0.20, "intent": 0.10,
"safety": 0.15, "citation": 0.10, "freshness": 0.05},
thresholds=Thresholds(pass_threshold=0.90, retry_threshold=0.70),
min_per_signal={"grounding": 0.7, "safety": 0.8}, # strict mins
veto_rules=VETO_RULES + [
VetoRule("medical_disclaimer_missing",
lambda s: s.has_medical_claim and not s.has_disclaimer),
]
),
"ecommerce": DomainConfig(
weights={"grounding": 0.25, "retrieval": 0.20, "intent": 0.20,
"safety": 0.10, "citation": 0.10, "freshness": 0.15},
thresholds=Thresholds(pass_threshold=0.82, retry_threshold=0.55),
min_per_signal={"grounding": 0.5},
veto_rules=VETO_RULES # standard vetos
),
}Production Microservice Architecture
Many companies deploy the evaluation layer as separate microservices for scalability and independent deployment.
class ResponseEvaluationService:
"""Runs all checks in parallel. Target: 50-200ms."""
async def evaluate(self, query, response, context):
# Run all checks in parallel
grounding, intent, safety = await asyncio.gather(
self.grounding_svc.check(response, context),
self.intent_svc.check(query, response),
self.safety_svc.check(response),
)
# Compute weighted confidence
confidence = self.confidence_engine.score(
grounding=grounding.score,
retrieval=context.retrieval_score,
intent=intent.score,
safety=safety.score,
)
# Decision
if confidence.decision == Decision.PASS:
return EvalResult(approved=True, response=response)
elif confidence.decision == Decision.RETRY:
return await self.regenerate(query, context)
else:
return EvalResult(
approved=True,
response="I'm not completely sure. "
"Let me check that for you."
)Latency Optimization
Voice systems and real-time apps run all checks in parallel to keep total evaluation under 200ms.
| Check | Method | Latency | Accuracy |
|---|---|---|---|
| Grounding | Embedding similarity | ~10ms | Good |
| Grounding | Cross-encoder | ~50ms | Better |
| Grounding | LLM-as-judge | ~500ms | Best |
| Intent | Embedding similarity | ~10ms | Good |
| Intent | Classifier model | ~20ms | Better |
| Safety | Moderation API | ~50ms | Good |
| Safety | Rule engine | ~1ms | Exact |
| Confidence | Score aggregation | ~1ms | — |
Additional Production Checks (Often Missed)
The four core checks (grounding, intent, safety, confidence) cover ~80% of failure modes. These additional checks close the remaining gaps that surface at scale.
5. Citation Verification
Validates that [Source N] references in the response actually match the claims they support. Catches "citation hallucination" where the model invents or misattributes sources.
class CitationVerifier:
def verify(self, response, sources):
citations = self.extract_citations(response)
for cite in citations:
# Does [Source N] exist?
if cite.index >= len(sources):
cite.valid = False
continue
# Does the claim match the source?
sim = cosine_sim(
cite.claim, sources[cite.index]
)
cite.valid = sim > 0.6
return citationsTools: Regex extraction + embedding verification. Run in parallel with grounding check (~5ms overhead).
6. Completeness Check
Did the answer address ALL parts of a multi-part question? Users often ask compound questions and the LLM may only answer part of it.
# Example problem:
Query: "What's the return policy
AND do you offer exchanges?"
Answer: "Returns within 30 days."
# Missing: exchange info!
class CompletenessChecker:
def check(self, query, answer):
# Decompose query into sub-questions
sub_qs = self.decomposer.split(query)
addressed = []
for sq in sub_qs:
sim = cosine_sim(sq, answer)
addressed.append(sim > 0.5)
return CompletenessResult(
complete=all(addressed),
missing=[sq for sq, a
in zip(sub_qs, addressed)
if not a]
)Tools: LLM query decomposer or spaCy clause splitting + embedding comparison.
7. PII Leakage Detection
The retrieved context may contain sensitive data (emails, SSNs, account numbers) that the LLM inadvertently surfaces in its response. Scan output before delivery.
from presidio_analyzer import AnalyzerEngine
from presidio_anonymizer import AnonymizerEngine
class PIIGuard:
def __init__(self):
self.analyzer = AnalyzerEngine()
self.anonymizer = AnonymizerEngine()
def scan(self, response):
results = self.analyzer.analyze(
text=response,
entities=["EMAIL_ADDRESS",
"PHONE_NUMBER",
"CREDIT_CARD",
"US_SSN"],
language="en"
)
if results:
return self.anonymizer.anonymize(
text=response, analyzer_results=results
)
return response # cleanTools: Microsoft Presidio (open-source), AWS Comprehend PII, Google DLP API. Run on every response (~10ms).
8. Freshness / Staleness Check
Verify that the retrieved context is still current. An answer about "current pricing" from a 6-month-old document could be dangerously wrong.
class FreshnessChecker:
def check(self, chunks, query):
# Does query need fresh data?
needs_fresh = self.classify_temporal(query)
# "current", "latest", "now", "today"
if not needs_fresh:
return Fresh() # skip check
for chunk in chunks:
age = now() - chunk.indexed_at
if age > timedelta(days=30):
return Stale(
chunk=chunk,
age_days=age.days,
action="warn_user"
)Store indexed_at and source_updated_at in chunk metadata. Define TTL per document type (pricing: 7d, policy: 30d, FAQ: 90d).
9. Retry & Regeneration Strategy
When evaluation fails, how do you regenerate differently? Simply re-running the same prompt gets the same bad answer. Production systems modify the generation strategy on retry.
class RetryStrategy:
def regenerate(self, fail_reason, attempt):
if fail_reason == "hallucination":
# Add explicit "ONLY use context"
return self.stricter_prompt(
temp=0.0 # zero creativity
)
elif fail_reason == "incomplete":
# Retrieve MORE chunks
return self.expand_context(
top_k=10 # was 5
)
elif fail_reason == "stale":
# Force fresh retrieval
return self.re_retrieve(
freshness="7d"
)
elif attempt >= 2:
return self.fallback_response()Key: Max 2 retries. Each retry changes strategy (stricter prompt, more context, different model). After 2 fails → graceful fallback.
10. Human Feedback Loop
User feedback (thumbs up/down, corrections, follow-up queries) is the ultimate ground truth. Feed it back into evaluation thresholds and training data.
class FeedbackCollector:
def record(self, query_id, signal):
# Signals:
# thumbs_up, thumbs_down,
# correction(text), follow_up,
# escalate_to_human
self.store.save(query_id, signal)
# If thumbs_down → add to eval set
if signal == "thumbs_down":
self.eval_builder.add_negative(
query_id
)
# Weekly: retune thresholds from
# feedback distribution
# Monthly: retrain intent/NLI models
# Quarterly: full eval set refreshTrack feedback rate (aim for >5% of responses). Negative feedback → auto-add to adversarial eval set. Positive feedback → confidence calibration.
11. Evaluation Layer Monitoring — What to Dashboard
The evaluation layer itself needs monitoring. If your grounding check drifts, it will silently let hallucinations through.
Tier Exit Distribution
T1: 80% / T2: 15% / T3: 5% is baseline. Alert if T2 rises above 25% (embedding model drift) or T3 above 8% (retrieval quality degradation).
False Positive / Negative Rate
Sample 100 responses/week. Human-label as grounded or not. Compare against evaluator verdicts. Target: <3% false-positive (passes hallucination) and <8% false-negative (rejects good answer).
Retry & Fallback Rate
If retry rate exceeds 10% or fallback exceeds 3%, something upstream is broken — likely retrieval quality, prompt template, or LLM model regression. Investigate immediately.
Evaluator Latency P95
Track per-tier latency. If T1 P95 exceeds 30ms, the embedding model may need optimization or the batch size is too large. T2 P95 above 150ms → model serving issue.
PII Detection Rate
Track how often PII is found in responses. If rate spikes, investigate the retrieval pipeline — it may be pulling in documents with unredacted personal data.
User Feedback Correlation
Correlate confidence scores with user feedback. If high-confidence responses get thumbs-down, your evaluator is miscalibrated. Retune weights quarterly.
✓ Grounding Check (cascading T1/T2/T3) ✓ Intent Alignment ✓ Safety / Content Moderation ✓ Confidence Score Engine ✓ Citation Verification ✓ Completeness Check ✓ PII Leakage Detection ✓ Freshness / Staleness Check ✓ Retry / Regeneration Strategy ✓ Human Feedback Loop ✓ Evaluator Monitoring & Alerting
Self-Correction & Reflection Loops
Modern advanced RAG adds self-checking loops that detect and recover when retrieval quality is poor—rather than blindly stuffing top-k passages into prompts
Core Self-Correction Techniques
Self-RAG
Model decides whether retrieval is needed per token. Generates reflection tokens (IsRel, IsSup, IsUse) to critique its own outputs. Targets factuality and citation accuracy—10–15% improvement in studies.
CRAG
Corrective RAG evaluates retrieved documents. Triggers corrective actions (alternative retrieval, filtering, web search fallback) when retrieval quality is poor.
Adaptive Retrieval
Retrieve fewer documents when confidence is high; more when needed. Avoids indiscriminate retrieval via confidence-gated document selection.
Adaptive Retrieval with Confidence Gating
RAG Evaluation Framework
Three metric categories measure retrieval effectiveness, generation quality, and system performance
| Category | Metrics | Measurement Method | Tools |
|---|---|---|---|
| Retrieval Metrics | Precision, Recall, NDCG, MRR, MAP | Rank quality against gold standard passages | BEIR, Trec-eval |
| Generation Metrics | BLEU, ROUGE, METEOR, BERTScore, Context Relevance | Automated scoring, LLM judges, embedding similarity | TruLens, RAGAS |
| System Metrics | Latency, throughput, cost per query, user satisfaction | Production logs, user feedback, A/B tests | OpenTelemetry, Datadog, custom instrumentation |
Building Your Eval Dataset
Synthetic
LLM-generated QA from corpus. Fast, cheap. Risk of false positives.
Human-Curated
Gold standard. Expensive, slow. High quality baseline.
Production Logs
Real queries & answers. Most realistic. Requires filtering.
Adversarial
Edge cases, tricky queries. Surfaced via user feedback.
RAGAS: Automated RAG Evaluation
RAG Benchmarking & Performance Testing
Systematic benchmarking with shared metrics ensures consistent quality and enables confident deployment decisions
Benchmarking Framework Flow
Retrieval Benchmarks
| Metric | Target |
|---|---|
| Recall@5 | > 85% |
| Recall@20 | > 95% |
| NDCG@10 | > 0.70 |
| MRR | > 0.75 |
| Hit Rate | > 95% |
| BEIR Zero-Shot | Baseline |
| MTEB Rank | Top 10% |
| Latency P95 | < 100ms |
Generation Benchmarks
| Metric | Target |
|---|---|
| Faithfulness | > 0.90 |
| Answer Relevance | > 0.85 |
| Context Relevance | > 0.80 |
| Hallucination Rate | < 5% |
| Citation Accuracy | > 90% |
| Refusal Rate | > 80% |
| Completeness | > 85% |
| TTFT P95 | < 500ms |
End-to-End System
Test full pipeline: retrieval → generation → post-processing. Measure user-facing latency, cost per query, success rate.
A/B Testing & Online
Shadow deploy changes. Compare metrics vs. baseline with statistical significance. Catch production surprises before full rollout.
Adversarial & Stress
Test with typos, out-of-domain queries, adversarial prompts. Load test at 10× peak. Measure robustness.
RAG Benchmark Suite Code
class RAGBenchmarkSuite:
def __init__(self):
self.thresholds = {
"recall@5": 0.85,
"faithfulness": 0.90,
"latency_p95_ms": 100,
}
def run_benchmark(self, model, dataset):
results = {}
for q, ctx, golden_answer in dataset:
retrieved = self.retrieve(q)
generated = model.generate(q, ctx)
results["recall"] = self.compute_recall(retrieved, ctx)
results["faithfulness"] = self.compute_faithfulness(generated, ctx)
return self.check_regressions(results, self.thresholds)
Benchmark Workflow Pipeline
Run suite on current production system
Make code change, rerun benchmarks
Compare vs. baseline, flag regressions
Pass gates → deploy; fail → iterate
Monitor metrics post-deploy, alert on drift
Benchmarking Tools Comparison
| Tool | Metrics | LLM-Based | Reference | Best For |
|---|---|---|---|---|
| RAGAS | Faithfulness, Answer Rel., Context Rel. | ✓ | Paper | Gen. + Retrieval |
| BEIR | Recall@K, NDCG@K, MRR, RMSE | ✗ | Yes | Retrieval (IR) |
| MTEB | Cross-lingual Retrieval, Ranking | ✗ | Yes | Multilingual |
| TruLens | LLM-based evals + feedback | ✓ | No | Custom logic |
| DeepEval | Hallucination, Answer Rel., RAGAS | ✓ | Optional | LLM Evals |
| LangSmith | Custom evals, tracing, logging | Partial | Optional | Development + Monitoring |
| Arize Phoenix | Evals + Production Observability | ✓ | Optional | End-to-End |
| Custom Harness | Org-specific metrics & logic | Optional | Org | Control + Integration |
Guardrails & Safety
Multi-stage guardrails prevent harmful input, retrieval, generation, and post-processing risks
Guardrail Pipeline: Four Stages
1. Input
- • Prompt injection detection
- • PII masking
- • Content profanity filter
2. Retrieval
- • ACL enforcement
- • Source validation
- • Freshness checks
3. Generation
- • Hallucination checker
- • Citation validator
- • Token budget limits
4. Post-Processing
- • PII scrubbing
- • Toxicity filtering
- • Output validation
GuardrailPipeline Implementation
Enterprise Threat Model & OWASP LLM Top 10
Map RAG attack surfaces to OWASP LLM Top 10 categories with mitigations
1. Prompt Injection
Risk: Malicious prompts override system instructions or exfiltrate data.
Mitigations: Content sanitization, instruction stripping, system prompt dominance, tool-call allowlists.
2. Data Exfiltration
Risk: Model leaks sensitive data (PII, secrets) in responses.
Mitigations: Output filtering, PII scrubbing, redaction at generation time.
3. Permission Leakage
Risk: Weak retrieval filters expose unauthorized content.
Mitigations: ACL-aware retrieval, auth-sensitive cache keys, audit trails.
4. Data Poisoning
Risk: Malicious docs inserted into corpus, spread misinformation.
Mitigations: Ingestion validation, source trust scoring, content integrity checks.
5. DoS (Expensive Prompts)
Risk: Very long contexts, recursive tool calls exhaust resources.
Mitigations: Token budgets, hard timeouts, rate limits per user.
6. Supply Chain
Risk: Compromised embedding models or dependencies.
Mitigations: Model provenance, dependency scanning, vendor security audit.
A user must never receive retrieved context (or generated content derived from it) that they are not authorised to access.
Permission-Aware Retrieval Requirements:
- • Ingest-time ACL assignment: Tag every chunk with owner/org/role ACLs
- • Query-time filter enforcement: Filter retrieved docs by user's ACL before context assembly
- • ACL-sensitive cache keys: Include user_id/org_id in cache key to prevent cross-user leakage
- • Audit trails: Log all access (who queried, what docs were retrieved, timestamps)
Observability & Monitoring
Three monitoring layers: system SLOs, retrieval quality, and answer groundedness
Monitoring Layers
System SLOs
- • Latency (p50, p95, p99)
- • Throughput (QPS)
- • Error rate
- • Availability
Retrieval Quality
- • NDCG, MRR (rank quality)
- • Precision@k
- • Docs retrieved per query
- • Reranker acceptance rate
Answer Quality
- • Faithfulness (grounded?)
- • Answer relevance
- • Citation accuracy
- • User feedback signal
OpenTelemetry Tracing Decorator
OpenTelemetry Collector Config (with PII Scrubbing)
LangSmith
LLM tracing, debugging
Arize Phoenix
ML observability
OpenTelemetry
Core instrumentation
Datadog
Metrics, dashboards
TruLens
RAG eval metrics
- • Latency Breakdown: Query transform, embedding, search, rerank, LLM, total
- • Retrieval Quality: NDCG, docs per query, reranker effectiveness
- • LLM Metrics: Token usage, cost, temperature, model routing decisions
- • User Metrics: Active users, unique queries, satisfaction (thumbs up/down)
- • System Health: Disk usage (vector DB), index freshness, cache hit rates, error budget
Scaling & Performance
Scale RAG systems from thousands to billions of documents with architecture patterns
| Scale Tier | Document Count | Query Load (QPS) | Typical Latency (p95) | Architecture Pattern |
|---|---|---|---|---|
| Small | 10^5–10^6 chunks | <10 QPS | <2s | Single in-memory FAISS index, Python app, SQLite metadata |
| Medium | 10^7–10^8 chunks | 10–300 QPS | 1–4s | Milvus/Weaviate cluster, Kubernetes, async queue processing, multi-region replication |
| Large | 10^8–10^9+ chunks | 300–5000+ QPS | 500ms–1s | Elasticsearch sharding, GPU-accelerated search (Triton), vLLM serving, distributed caching, traffic shaping |
Multi-Layer Caching Strategy
L1: Exact Query
Hash(query) → response. TTL: 24h. Hit rate: 15–25% for repeated queries.
L2: Semantic
Embedding similarity clustering. Cache similar queries together. Hit rate: 30–40%.
L3: Embedding
Cache embeddings for large docs to avoid re-embedding on every query.
L4: LLM Response
Cache LLM outputs by (query, context hash). Reduces expensive inference calls.
SemanticCache Implementation
Scaling Architecture Patterns
Retrieval Optimization
- • Horizontal scaling: Shard index by doc_id ranges
- • GPU acceleration: Triton Inference Server for embedding
- • Connection pooling: Reuse DB connections (PgBouncer)
- • Async processing: Batch embedding requests
Generation Optimization
- • vLLM: PagedAttention for high-throughput serving
- • Model parallelism: Shard LLM across GPUs
- • Quantization: INT8/FP8 for latency reduction
- • Speculative decoding: Predict + verify next tokens in parallel
KServe Kubernetes-Native Serving (vLLM)
- Query Transform: 50ms (normalization, spell-check)
- Embedding: 20ms (vectorize query)
- Search: 30ms (FAISS/Milvus lookup)
- Rerank: 50ms (cross-encoder)
- LLM Generation: 800ms (token generation)
- Total Expected: ~950ms P50
SLO Modes: Low-latency interactive (<2–4s p95) vs High-throughput batch (10–60s acceptable)
Advanced Techniques
Agentic RAG
LLM agent decides what, when, and how to retrieve. Decomposes complex queries, chooses data sources (vector DB, SQL, API, web), iteratively refines results.
from langchain_agents import ReActAgent
agent = ReActAgent(
llm=claude,
tools=[vector_search, sql_query, web_search],
max_iterations=5
)
result = agent.run("Find latest Q1 earnings and analyst sentiment")
Graph RAG
Knowledge graphs + vector search. Entity-relationship graphs, multi-hop reasoning, community detection for summarization.
MATCH (a:Company)-[r*1..3]->(b:Company)
WHERE a.name = "Acme Inc"
WITH collect(b) as connected
CALL apoc.text.summarize(connected)
YIELD summary
RETURN summary
RAPTOR (Tree-based)
Recursively summarize clusters into a hierarchy. Query at multiple abstraction levels for top-down reasoning.
- Hierarchical abstraction layers
- Efficient multi-scale retrieval
- Reduced token cost vs flat indexing
Self-RAG (Adaptive)
LLM decides if retrieval needed, generates with self-critique tokens, iterates. Reduces unnecessary retrieval ~40%.
Outputs critique tokens, decides iterations
Multi-Modal RAG
Index images, tables, charts alongside text. Vision models for visual content, multi-modal embeddings.
- Cohere embed-v4 multi-modal
- CLIP for image-text alignment
- Unified vector space across modalities
Multi-Tenant RAG
Namespace isolation per tenant. Shared infrastructure, isolated data. Query-time ACL enforcement.
- Cost-efficient multi-tenancy
- Metadata-driven access control
- Compliance for regulated industries
Deployment & CI/CD
Docker Compose Architecture
services:
rag-api:
image: rag-api:latest
deploy:
replicas: 3
depends_on: [qdrant, redis]
embedding-service:
image: embedding-service:latest
deploy:
resources:
reservations:
devices:
- driver: nvidia
count: 1
capabilities: [gpu]
indexing-worker:
image: indexing-worker:latest
environment:
- CELERY_BROKER=redis:6379
qdrant:
image: qdrant/qdrant:latest
ports:
- "6333:6333"
redis:
image: redis:7-alpine
CI/CD Pipeline Stages
Code quality, type checks, fast tests
End-to-end retrieval, indexing flows
Gate metrics: Faithfulness >0.85, Relevance >0.80
Monitor cost, latency, error rate
5% → 25% → 50% → 100% traffic
- Canonical doc schema + versioning
- ACL-aware retrieval architecture
- Evaluation harness in CI/CD
- Comprehensive observability & tracing
Cost Optimization
Cost Drivers (Ranked)
Cost Breakdown per 1K Queries
- Embedding Costs: Batch processing, caching, model selection, self-hosting (>1M queries/day)
- LLM Token Costs: Context pruning, model tiering, semantic caching (50–60% reduction)
- Infrastructure: Adaptive retrieval, hybrid tuning, dimension control, vLLM serving efficiency
Failure Modes & Mitigations
Irrelevant/Misleading Retrieval
Hybrid retrieval + reranking + CRAG-like evaluation gates
Over-retrieval / Context Stuffing
Cap top-k, MMR diversity, adaptive retrieval (Self-RAG)
Permission Leakage
ACL filters, ACL in cache keys, comprehensive audit trails
Prompt Injection via Retrieved Docs
Treat as untrusted, content sanitization, system prompt dominance
Embedding Drift (Model Upgrades)
Version embeddings/indexes, shadow-build, offline eval + canary
Silent Ingestion Regressions
Ingestion QA metrics, OCR confidence alerts, hit-rate monitoring
Cost Blow-ups
Token budgets, hard timeouts, reranker gating, rate limits
Use as a practical checklist for security & compliance governance across RAG components.
Phased Implementation Roadmap
Production Readiness Checklist
Data & Indexing
- Multi-format parsing (PDF, docx, HTML, images)
- Incremental indexing with changelog tracking
- Rich metadata (source, timestamp, ownership)
- Semantic chunking strategies
- Dead letter queue for malformed data
- Data freshness tracking & SLAs
- Canonical document contract
Retrieval & Generation
- Hybrid search (dense + sparse + knowledge graph)
- Multi-stage reranking pipeline
- Query transformation & expansion
- Streaming generation with token streaming
- Citation validation & provenance
- Confidence scoring on outputs
- Self-correction loops (Self-RAG)
Safety & Compliance
- Prompt injection detection & mitigation
- PII detection (Presidio integration)
- Hallucination detection frameworks
- RBAC / ACL enforcement
- Comprehensive audit logging
- GDPR / HIPAA / EU AI Act compliance
- NIST AI RMF & data retention policies
Operations & Scale
- Multi-layer caching (query, response, embedding)
- Distributed tracing (OpenTelemetry)
- Automated evaluation in CI/CD gates
- Canary & progressive rollout strategy
- Cost tracking & budget enforcement
- User feedback loop & telemetry
- vLLM / KServe deployment optimization
"Production RAG is 20% retrieval and 80% engineering"
Data quality, evaluation frameworks, guardrails, caching strategies, comprehensive observability, and production operations are what separate a working demo from a reliable, compliant, cost-efficient product.
Production-Grade RAG Pipeline Implementation Guide
Research-informed • 25 Sections • Architecture Diagrams • Code Examples