Sections
01 Home 02 Overview 03 Architecture 04 Ingestion 05 Chunking 06 Embedding 07 Vector DB 08 Query Transform 09 Retrieval 10 Reranking 11 Generation 12 Response Eval 13 Self-Correction 14 Evaluation 15 Benchmarking 16 Guardrails 17 Threat Model 18 Observability 19 Scaling 20 Advanced 21 Deployment 22 Cost 23 Failure Modes 24 Roadmap 25 Checklist

Advanced Production-Grade
RAG Pipeline Implementation

Building enterprise-ready retrieval-augmented generation systems with semantic search, adaptive policies, and self-correction loops

LLM Orchestration Vector Search Semantic Retrieval
Embedding Models Self-Correction Loops MLOps

22 comprehensive sections covering architecture, implementation, and production deployment

01 / Cover

What is RAG?

RAG is not a single model but a complete AI application architecture that combines retrieval systems with language models to ground responses in external knowledge.

RAG = AI Application Architecture Knowledge Sources Docs, databases, APIs, vector stores Retrieval System Search, rank, rerank, filter LLM Generator Answer synthesis, grounding, citations Self-Correction / Reflection

Three Pillars

  • Multi-stage Retrieval — Hybrid, sparse, dense, and ranked retrieval
  • Adaptive Policies — Context-aware retrieval strategies
  • Self-Correction Loops — Reflection and iterative refinement

Enterprise Risks

Beyond hallucinations, consider:

  • ⚠ Permission leakage
  • ⚠ Prompt injection attacks
  • ⚠ Data poisoning
  • ⚠ Unbounded cost/latency
  • ⚠ Silent quality regressions

Use Cases

  • 📄 Employee Knowledge Work — Internal docs, wikis
  • 🤝 Customer Support — FAQs, tickets, logs
  • 📊 Structured+Unstructured — Reports, databases, forms
  • 🎨 Multimodal Knowledge — PDFs, images, videos

Indexing Plane

Off-line: Data ingestion, parsing, chunking, embedding, and vector storage. Built once, queried many times.

Serving Plane

On-line: Query processing, retrieval, reranking, LLM inference, and safety checks. Low-latency, high-throughput.

OWASP LLM Top 10: This presentation addresses major threats including prompt injection (LLM01), insecure output handling (LLM02), training data poisoning (LLM03), and model denial of service (LLM04).
02 / Overview

Full Architecture: Two Planes + Governance

Complete end-to-end RAG architecture with indexing, serving, and governance layers

INDEXING PLANE (Offline) Data Sources Connectors Parser/OCR (Unstructured) Chunking + Metadata Embedding Jobs Vector Store Object Store SERVING PLANE (Online) User Query API Gateway Auth + Query Proc Sparse Ret. Dense Ret. Fusion (RRF) Reranker Context Bldr LLM Orchestrator (Prompt) LLM Inference (Gen) Response + Citations + Safety Checks GOVERNANCE Tracing + Metrics Policies (PII/ACL) Eval (RAGAS)

IndexingPipeline

class IndexingPipeline: def ingest(self, source): docs = self.connector.fetch(source) chunks = self.chunker.split(docs) embeddings = self.model.embed(chunks) self.vector_db.upsert(embeddings) # Update metadata, manage versions

QueryPipeline

class QueryPipeline: def query(user_q): sparse = bm25(user_q) dense = vector_search(user_q) fused = rrf_fusion(sparse, dense) ranked = reranker.score(fused) return llm.generate(ranked)
03 / Architecture

Document Ingestion: Connectors + Contracts

Reliable data onboarding with standardized contracts, multi-format support, and three-speed indexing

Canonical Document Contract

Every document must conform to this schema for consistent retrieval and governance:

{ "id": "doc-uuid", # Unique identifier "tenant_id": "org-123", # Multi-tenancy support "acl": ["user-1", "group-2"], # Access control list "source": "salesforce", # Provenance "timestamp": "2025-03-17", # Ingestion time "version": 2, # Document version "content": "...", # Raw/parsed text "metadata": {...} # Custom fields }

Connector Types & Data Sources

Enterprise Document Parsing

Apache Tika, Unstructured.io — Parse PDFs, DOCX, images with layout preservation and OCR support

parsed = tika.extract( filename, ocr=True, extract_tables=True )

Structured Data

CRM, ERP, Databases — Direct queries or treat as "tool use" for on-demand retrieval; knowledge views

records = fetch_from_salesforce( "Contact", filters={"updated_at": last_sync} )

Streaming & CDC

Debezium → Kafka — Real-time event streams from databases; capture inserts, updates, deletes

stream = kafka.subscribe( "postgres.public.documents" )

Web Content

Crawlers with Compliance — Respect robots.txt, rate limits, GDPR; extract HTML/JSON with link tracking

docs = crawl( seed_urls, max_depth=3, respect_robots=True )

Multimodal Sources

OCR + Image Embeddings — Extract text from images, create vision embeddings; preserve layout

text, img_emb = extract_image( img_path, vision_model="CLIP" )

Custom Connectors

Plugin API — Implement standardized interface for proprietary systems, internal APIs, legacy apps

class MyConnector(Connector): def fetch(...): pass

Three-Speed Indexing Model

1

Batch Rebuilds

Full reindexing of large datasets weekly/monthly; highest throughput, controlled resources. Use for bulk imports, historical data.

2

Incremental Upserts

Append new chunks, update modified docs via change detection; moderate latency (seconds). Triggered by scheduled jobs or webhooks.

3

Real-Time Streams

Event-driven CDC or message queue ingestion; sub-second latency for hot data. Use for live chat logs, sensor feeds, user events.

Nightly/Weekly Minutes-Hours Seconds Batch Rebuilds Full re-embed • Nightly/Weekly Incremental Upserts Delta updates • Minutes-Hours Real-Time Streams CDC / Events • Seconds Frequency

Element-Aware PDF Parsing

Extract with positional metadata (page, bbox, reading order). Preserve table structure, preserve images. Enables citation anchoring.

Dead Letter Queue Pattern

Send unparseable docs to DLQ for manual inspection; enable retry with fallback parsers or human review. Never silently drop data.

ProductionIngester Example

class ProductionIngester: def __init__(self, config): self.connectors = config.connectors # Multi-source self.parser = TikaParser(ocr=True) self.chunker = SemanticChunker() self.vector_db = PineconeDB() self.dlq = DeadLetterQueue() self.metrics = PrometheusMetrics() def ingest_batch(self, source, docs): for doc in docs: try: parsed = self.parser.extract(doc) chunks = self.chunker.split(parsed) self.vector_db.upsert(chunks) self.metrics.inc("ingested_docs") except ParsingError as e: self.dlq.enqueue(doc, error=e) self.metrics.inc("dlq_docs")
04 / Ingestion
Preprocessing

Chunking Strategies

Chunk for retrieval (findability) and store separate representations for generation (readability)

Document Fixed Recursive Semantic Parent-Child
Strategy Description Best For Trade-offs
Fixed-Size Split at token/word boundary Predictable, simple baseline May split sentences; low semantic coherence
Recursive Split recursively by delimiters (newline, paragraph, sentence) Structured documents, code Still may cut semantically important boundaries
Semantic Embed sentences, split at embedding distance threshold Narrative text, research papers Expensive; latency + cost for embedding all chunks
Document-Structure Respect sections, headings, tables, code blocks Mixed-format documents (PDFs, Markdown) Requires parser awareness
Agentic/LLM Use LLM to decide breaks and chunk metadata Complex domain logic, multilingual High cost and latency; not real-time
Sliding Window Overlapping fixed-size chunks with stride Preserve local context, boundary queries Higher storage; redundant retrieval
Parent-Child (Sentence-Window) Store fine-grained chunks; expand with surrounding context at retrieval Precision + context balance Requires two-stage retrieval; complex indexing

SemanticChunker: Embedding Similarity Breakpoints

class SemanticChunker: def __init__(self, embedding_model, threshold=0.5): self.embed = embedding_model self.threshold = threshold def split(self, text): sentences = self.sent_tokenize(text) embeddings = self.embed.batch_embed(sentences) chunks, current = [], [] for i, (sent, emb) in enumerate(zip(sentences, embeddings)): if i > 0: # Cosine similarity to previous sim = cosine_similarity(emb, embeddings[i-1]) if sim < self.threshold and current: # Semantic break detected chunks.append(" ".join(current)) current = [] current.append(sent) if current: chunks.append(" ".join(current)) return chunks
Production Best Practices:
  • Chunk size: 256–512 tokens (optimal for retrieval + generation trade-off)
  • Overlap: 10–15% to preserve boundary context
  • Metadata inheritance: Propagate doc_id, section, source to every chunk
  • Context enrichment: Prepend section headers or document title to chunk
  • Element-aware parsing: Preserve tables, code blocks, images as intact units in PDFs

Recommended: Hybrid Multi-Layer Chunking for Production

No single chunking strategy works for all document types. Production systems use a document-type router that selects the best chunking strategy per document, combined with a parent-child indexing pattern that stores small chunks for precise retrieval but returns larger context windows for generation.

RECOMMENDED: HYBRID MULTI-LAYER CHUNKING PIPELINE Document Router Classify doc type PDF → Structure-aware Markdown → Heading split Code → AST-aware Element Parser Extract typed elements Title, NarrativeText Table, CodeBlock Image, ListItem Semantic Splitter Embedding breakpoints 256–512 token target cosine distance splits keep tables/code intact Parent-Child Index Dual storage Child → embed + search Parent → full section Return parent on match Enrich + Store Metadata + embed prepend heading add source metadata embed → vector DB QUERY TIME: Parent-Child Retrieval Pattern Query embed query Search Child Chunks precise semantic match (top-K) Expand to Parent retrieve full section context Dedup + Rerank remove overlaps, score LLM Context coherent, complete Why this works: small chunks give precise retrieval (high recall@K), parent expansion gives coherent context (high faithfulness) Child chunk = 128–256 tokens (search unit) → Parent window = 512–1024 tokens (context unit) → Sent to LLM with full surrounding context Result: 15–25% higher faithfulness vs flat chunking at same recall, per RAGAS benchmarks

Production Chunking Pipeline

class ProductionChunkingPipeline: """Route-aware, parent-child, metadata-enriched.""" def __init__(self): self.router = DocTypeRouter() self.parsers = { "pdf": UnstructuredParser(strategy="hi_res"), "markdown": MarkdownHeaderSplitter(), "html": HTMLSectionSplitter(), "code": ASTChunker(), # tree-sitter "plaintext": SemanticChunker(), } self.semantic = SemanticChunker( model="all-MiniLM-L6-v2", max_tokens=384, # child chunk size threshold=0.5, ) def process(self, doc: Document) -> list[Chunk]: # Step 1: Route to parser doc_type = self.router.classify(doc) elements = self.parsers[doc_type].parse(doc) # Step 2: Create parent sections parents = self.group_into_sections(elements) # Step 3: Split parents into child chunks all_chunks = [] for parent in parents: children = self.semantic.split(parent.text) for i, child_text in enumerate(children): chunk = Chunk( text=child_text, parent_id=parent.id, parent_text=parent.text, # stored separately position=i, metadata=self.enrich(doc, parent, child_text), ) all_chunks.append(chunk) return all_chunks def enrich(self, doc, parent, text): """Prepend context + propagate metadata.""" return { "doc_id": doc.id, "source": doc.source, "section": parent.heading, "page": parent.page_num, "doc_type": doc.content_type, "indexed_at": datetime.utcnow(), # Prepended for better retrieval: "enriched_text": ( f"{doc.title} > {parent.heading}\n" f"{text}" ), }

Parent-Child Retrieval at Query Time

class ParentChildRetriever: def search(self, query, top_k=5): # 1. Search CHILD chunks (precise) children = self.vector_db.search( query, top_k=top_k * 3 # over-fetch ) # 2. Expand to PARENT sections parent_ids = set(c.parent_id for c in children) parents = self.doc_store.get_parents(parent_ids) # 3. Deduplicate + rank parents by # best child match score scored = {} for child in children: pid = child.parent_id if pid not in scored or child.score > scored[pid]: scored[pid] = child.score ranked = sorted( parents, key=lambda p: scored[p.id], reverse=True )[:top_k] return ranked # full parent context

Chunk Size Guide by Document Type

Doc TypeChild (Search)Parent (Context)Strategy
Product docs128–256 tok512–1024 tokHeading-based + semantic
Legal / Policy256–384 tok1024–2048 tokSection-based, keep clauses intact
Research papers256–512 tok1024–2048 tokSemantic breakpoints
FAQ / KBWhole Q&A pairSame (no parent)Question-Answer as unit
CodeFunction/classFile or moduleAST-aware (tree-sitter)
Chat logsSingle turnFull conversationTurn-based splitting
Tables / CSVRow groupFull table + headerKeep header with every chunk

Why Parent-Child Wins

Problem: Small chunks retrieve precisely but lose context. Large chunks give context but pollute retrieval with irrelevant text.

Solution: Index small (128–256 tok) for search precision. At retrieval time, expand to the parent section (512–1024 tok) for coherent LLM context. Best of both worlds.

Context Enrichment (Prepending)

Prepend the document title and section heading to each chunk before embedding. This dramatically improves retrieval for ambiguous queries.

# Without enrichment: "Returns are accepted within 30 days." # With enrichment: "Product Policy > Returns & Refunds\n" "Returns are accepted within 30 days." # Now retrieves for "return policy" queries

Tools for Production

Parsing: unstructured.io (hi_res), LlamaParse, Docling
Splitting: LangChain RecursiveCharacterTextSplitter, LlamaIndex SentenceWindowNodeParser
Semantic: Sentence Transformers + custom breakpoint
Code: tree-sitter (AST), CodeSplitter
Parent-Child: LlamaIndex ParentDocumentRetriever, custom doc_store + vector_db combo

Production Recommendation: Start with recursive character splitting (LangChain default) as your baseline — it's simple and works surprisingly well. Add parent-child retrieval once you have evaluation metrics showing context gaps. Only add semantic chunking when recursive splitting provably fails on narrative/research content. Measure every change against your eval set — chunking changes can regress retrieval quality in unexpected ways.
05 / Chunking Strategies
Core ML

Embedding Models & Strategies

Dimension 2 Dimension 1 Query Query Relevant docs Irrelevant docs
Model Family Type Notable Capabilities Operational Considerations Cost/Latency
OpenAI text-embedding-3 API small/large variants; dimension shortening; multilingual Quota limits; regional latency $0.02/M tokens (small); higher for large
Cohere Embed v3/v4 API Multilingual + multimodal (text+image); fine-tuning available Document and query encoding modes $0.10/1M tokens
BGE-M3 Open-source (HuggingFace) Multi-lingual multi-function (dense+sparse+multi-vector) 8192 token context; self-hosted overhead Free; requires GPU infrastructure
Multilingual E5 Open-source Strong multilingual; published training/eval methodology Community-maintained; good reproducibility Free; 2–5ms per chunk on A100
GTE-Qwen2 (7B) Open-source State-of-the-art; 131K context window Larger model; requires more VRAM Free; ~20ms/chunk on A100
voyage-3-large API Long-context (128K) + code understanding Premium pricing; excellent for code RAG $0.15/1M tokens
nomic-embed-text-v1.5 Open-source Matryoshka embeddings; dimension flexibility Efficient storage; truncation-stable Free; 3–4ms latency on CPU

EmbeddingService: Caching, Rate-Limiting, Batch Processing

class EmbeddingService: def __init__(self, model, redis_cache, rate_limiter): self.model = model self.cache = redis_cache # Cache embeddings by text hash self.limiter = rate_limiter # Token/sec quota def embed_batch(self, texts: List[str]) -> List[ndarray]: results = [] missing = [t for t in texts if not self.cache.get(hash(t))] if missing: self.limiter.wait(len(missing)) # Rate limit embeds = self.model.encode(missing, batch_size=32) for text, emb in zip(missing, embeds): self.cache.set(hash(text), emb, ttl=7*24*3600) return [self.cache.get(hash(t)) for t in texts]

Matryoshka Embeddings

Truncate high-dimensional embeddings to lower dimensions without retraining. Trade-off: smaller vectors (10–20% storage savings) vs. slight accuracy loss.

Fine-Tuning with Contrastive Learning

Train embeddings on domain-specific relevance pairs using triplet loss. Improves domain-specific retrieval by 15–30% with 5K–50K labeled pairs.

Instruction-Tuned Embeddings

Prepend task instructions ("Retrieve document for query: ") to asymmetrically encode queries vs. documents. Boosts retrieval by leveraging prompt tuning.

Compliance Note: Where data is processed matters. API-based embeddings (OpenAI, Cohere) send data to third-party servers. Self-hosted models (BGE, E5, GTE-Qwen) keep data in-house. Evaluate data residency, privacy, and contractual requirements before choosing.
06 / Embedding Models
Storage Layer

Vector Database Selection

FAISS is a similarity search library, not a networked vector database. Production RAG requires distributed, replicable systems.

Layer 0 (Base) Layer 1 Layer 2 (Entry) Query
Database Architecture Key Features Scaling Model Ops Burden
FAISS In-memory library Highest performance; no persistence Single-node only High (build/rebuild cycles)
Milvus Distributed (K8s native) Multi-replica, auto-sharding, metadata filtering Horizontal (scale nodes) High (K8s expertise required)
Pinecone Managed SaaS Serverless, metadata filtering, pod-type scaling Serverless (auto) Low (fully managed)
Weaviate Hybrid (vector+BM25) Combined dense/sparse search, replication controls Cluster-based Medium
Chroma Lightweight SQLite/in-memory Simple API; good for prototypes Single-node Low (dev only)
Elasticsearch Existing infra (if already deployed) Dense vectors + BM25 + analytics Cluster-based Medium
pgvector PostgreSQL extension SQL + vectors; ACID transactions Postgres replication Medium

Qdrant/Milvus Production Config: HNSW + Quantization + Replication

# Qdrant production config (YAML) collection: name: rag_documents vectors: size: 1536 # OpenAI embedding dim distance: "cosine" hnsw_config: m: 16 # graph connectivity ef_construct: 500 # trade-off precision/speed ef: 100 quantization_config: scalar: type: "int8" # 8-bit quantization quantile: 0.99 replication_factor: 3 # high availability sharding: "auto" ttl: 2592000 # 30-day retention

Key Design Decisions

  • HNSW vs IVF: HNSW faster recall, IVF better for billion-scale; prefer HNSW for sub-100M datasets
  • Quantization: 8-bit scalar quantization saves 4x memory with <2% recall loss; essential for cost control
  • Namespaces/Partitions: Isolate indices by tenant, project, or time period for multi-tenancy and retention
  • Replication: RF=3 minimum for production SLA; prevents single-point failures
  • TTL & Garbage Collection: Auto-expire old chunks; configure cleanup policies for cost
  • Backup & Point-in-Time Recovery: Daily snapshots; test restore procedures quarterly
07 / Vector Database
Query Pipeline

Query Transformation — From One Query to Many

Users ask vague, ambiguous, or narrowly-worded questions. A single embedding of that raw query often misses relevant chunks. Query transformation rewrites, decomposes, and expands the user's query into multiple targeted search queries — dramatically improving chunk filtering and retrieval quality.

QUERY TRANSFORMATION PIPELINE — RECOMMENDED PRODUCTION STRATEGY User Query "how do I fix auth?" ① Classify Intent Simple? → skip transform Complex? → full pipeline ② Transform Strategies (Parallel) Original: "how do I fix auth?" Rewrite: "troubleshoot authentication" Technical: "401 403 OAuth token error" Step-back: "auth system architecture" HyDE: [hypothetical answer doc] Decompose: Q1 + Q2 + Q3 sub-queries ③ Parallel Retrieve 6 queries × top-K each asyncio.gather() → RRF merge → deduplicate → unique chunks pool ④ Rerank Against ORIGINAL query (not the variants) → Top-5 to LLM Key insight: variants improve RECALL (find more relevant chunks), reranking against original restores PRECISION (filter to the best) Typical improvement: Recall@5 goes from 62% (single query) → 85% (multi-query) → 94% (multi-query + rerank)

Six Query Transformation Strategies

1. Query Rewriting

LLM rewrites the query to be clearer and more search-friendly. Fixes typos, expands abbreviations, makes implicit context explicit.

# Input: "how 2 fix auth" # Output: "How to troubleshoot and fix # authentication errors" prompt = f"""Rewrite this query to be clearer for a search engine. Fix typos, expand abbreviations. Query: {query}"""

When: Always. First step in every pipeline. Cheap and fast (~50ms with Haiku).

2. Multi-Query Expansion

Generate 3–5 diverse reformulations targeting different vocabulary, specificity levels, and perspectives.

# Input: "fix auth errors" # Output: # - "authentication failure troubleshoot" # - "401 403 OAuth token expired" # - "login session invalid API key" # - "how to debug access denied"

When: Ambiguous or broad queries. Biggest recall improvement (15–30%). See deep-dive in Retrieval section.

3. Step-Back Prompting

Generate a higher-level abstract query to retrieve foundational context, then the specific query for details.

# Input: "why does JWT expire in 15min" # Step-back: "JWT token lifecycle and # security best practices" # Then search BOTH queries: # → foundational + specific chunks

When: "Why" questions, conceptual queries. Provides background context the LLM needs to reason.

4. HyDE (Hypothetical Document)

Ask the LLM to generate a hypothetical answer, embed THAT, and search for similar real documents. Bridges the query-document embedding gap.

# Input: "fix auth errors" # LLM generates hypothetical doc: hypo = "To fix authentication errors, first check if your OAuth token has expired. Refresh using the /auth/refresh endpoint..." # Embed hypo → search → find real docs # that are SIMILAR to this answer

When: Technical queries where query language differs from document language. Adds ~300ms latency.

5. Query Decomposition

Break multi-part or complex questions into atomic sub-queries, retrieve for each independently, then merge.

# Input: "compare pricing of Plan A # vs Plan B and which has # better support" # Decompose into: # Q1: "Plan A pricing details" # Q2: "Plan B pricing details" # Q3: "Plan A support features" # Q4: "Plan B support features"

When: Compound questions, comparisons, multi-entity queries. Critical for completeness.

6. Metadata Filter Extraction

Extract structured filters (date, category, product, region) from the query to narrow the search pool BEFORE vector search.

# Input: "2024 return policy for EU" # Extract: # - filter: year=2024 # - filter: region=EU # - query: "return policy" # → pre-filter chunks THEN embed search

When: Queries with temporal, geographic, or categorical constraints. Dramatically reduces search pool.

Recommended Production Strategy: Adaptive Query Transform

Don't apply all strategies to every query — that's wasteful and slow. Instead, classify the query complexity and apply the minimum transformation needed. Simple factual queries need only rewriting; complex multi-part queries need decomposition + expansion.

AdaptiveQueryTransformer — Production Implementation

class AdaptiveQueryTransformer: """Classify query → apply minimum transform. Simple queries: just rewrite (50ms). Complex queries: full pipeline (200-400ms).""" def __init__(self, llm, fast_llm): self.llm = llm # strong model self.fast = fast_llm # Haiku / mini self.classifier = QueryClassifier() self.cache = TransformCache(ttl=3600) async def transform(self, query: str) -> TransformResult: # Check cache first cached = self.cache.get(query) if cached: return cached # Step 1: Classify query complexity qtype = self.classifier.classify(query) # Step 2: Route to appropriate strategy if qtype == "simple_factual": # "What's the return policy?" → just rewrite queries = [await self.rewrite(query)] filters = self.extract_filters(query) elif qtype == "ambiguous": # "fix auth" → rewrite + expand rewritten = await self.rewrite(query) expanded = await self.expand(query, n=3) queries = [rewritten] + expanded filters = self.extract_filters(query) elif qtype == "compound": # "compare A vs B pricing + support" sub_queries = await self.decompose(query) queries = sub_queries filters = self.extract_filters(query) elif qtype == "conceptual": # "why does X happen?" → step-back + specific abstract = await self.step_back(query) queries = [query, abstract] filters = {} elif qtype == "technical": # Technical jargon → HyDE + expand hyde_doc = await self.generate_hyde(query) expanded = await self.expand(query, n=2) queries = [query] + expanded hyde_queries = [hyde_doc] # separate embed filters = self.extract_filters(query) else: # fallback: rewrite + 2 expansions queries = [query] + await self.expand(query, 2) filters = {} result = TransformResult( original=query, queries=queries, filters=filters, strategy=qtype, ) self.cache.set(query, result) return result

Query Classification — Route to Strategy

Query TypeExampleStrategyLatency
Simple factual"What's the return policy?"Rewrite only~50ms
Ambiguous"fix auth errors"Rewrite + Expand(3)~200ms
Compound"compare A vs B pricing + support"Decompose into sub-Qs~250ms
Conceptual"why does JWT expire?"Step-back + specific~150ms
Technical"CORS preflight 403 nginx"HyDE + Expand(2)~400ms
Lookup"order #12345 status"Extract ID → direct DB~5ms

Query Classifier Implementation

class QueryClassifier: """Fast classifier: embedding + rules. ~5ms. No LLM call needed.""" def classify(self, query: str) -> str: # Rule-based fast path if re.match(r"(order|tracking|#)\s*\d+", query): return "lookup" if "vs" in query or "compare" in query: return "compound" if query.startswith(("why", "how does", "explain")): return "conceptual" if len(query.split()) <= 6: return "ambiguous" # Embedding-based classifier for rest emb = self.encoder.encode(query) pred = self.classifier_model.predict(emb) return pred # SetFit / fine-tuned
Production Tip: Use a simple rule-based classifier for 70% of queries (lookups, simple factual, "why" questions). Only call the embedding classifier for the remaining 30%. This keeps classification under 5ms for most queries.

Metadata Filter Extraction — Pre-Filter Before Vector Search

Extract structured constraints from the query to narrow the chunk pool BEFORE embedding search. This dramatically improves precision for queries with temporal, categorical, or entity-specific constraints.

class FilterExtractor: """Extract structured filters from query. Runs in parallel with query expansion.""" def extract(self, query: str) -> dict: filters = {} # Temporal: "2024", "last month", "recent" date = self.parse_date(query) if date: filters["date_after"] = date # Category: "pricing", "support", "API" category = self.classify_topic(query) if category: filters["doc_type"] = category # Entity: product names, plan names entities = self.ner.extract(query) if entities: filters["entities"] = entities # Region: "EU", "US", "APAC" region = self.detect_region(query) if region: filters["region"] = region return filters # Applied to vector search: # db.search(query_emb, filters=filters) # → searches ONLY chunks matching filters

Why this matters:

Without filters, "2024 EU return policy" searches ALL chunks and relies on the embedding to distinguish 2024 EU docs from 2023 US docs. Embeddings are bad at temporal and geographic precision. Pre-filtering narrows the pool from 10M chunks to maybe 50K — making vector search both faster and more accurate.

Filter TypeExampleExtraction Method
Temporal"2024", "this week", "latest"Regex + dateparser
Category"pricing", "API docs", "FAQ"Topic classifier
EntityProduct names, plan namesNER (spaCy / custom)
Region"EU", "US", "Germany"Regex + geo lookup
LanguageQuery language detectionlangdetect / fasttext
Access levelUser's role / permissionsSession context (ACL)
Benchmark: Query Transform Impact on Retrieval Quality
Raw single query: Recall@5 = 62%  |  + Rewrite: 68% (+6%)  |  + Multi-Query Expand: 82% (+20%)  |  + Metadata Filters: 87% (+5%)  |  + Cross-Encoder Rerank: 94% (+7%)  |  Total lift: +32 percentage points
Production Recommendation: Start with rewrite + 3-query expansion as your default pipeline — it gives the best cost/quality tradeoff (~200ms, ~$0.001/query). Add HyDE only for technical domains where query-document vocabulary gap is large. Add decomposition only if your eval shows multi-part questions are a common failure mode. Always cache transformations (1h TTL) — the same query pattern produces the same expansions.

Latency Strategy — Generating 5 Queries in <50ms

The naive approach — call an LLM to generate 5 queries — takes 200–400ms. That's unacceptable for real-time voice agents or low-latency search. Here are four production strategies to get multi-query expansion down to <50ms.

LATENCY COMPARISON — 5 QUERY GENERATION STRATEGIES STRATEGY LATENCY BAR QUALITY Naive: Single LLM call 300–500ms ❌ Too slow Best quality Cached LLM expansion 0ms / 300ms cache miss = slow Best quality Template + Synonym expansion 5–15ms Good quality Fine-tuned small model (local) 10–30ms Very good ★ Hybrid: Template + Cache + Async LLM 5–15ms P95 Best tradeoff ✓ Recommended: serve template-generated queries instantly, then async-upgrade with LLM-expanded queries if cache misses

Strategy 1: Template-Based Expansion (5ms)

No LLM call at all. Use rule-based templates that generate query variants from the original query using synonym dictionaries, regex patterns, and structural transformations.

class TemplateExpander: """Zero-LLM query expansion. ~5ms. Generates 5 variants using rules.""" def __init__(self): self.synonyms = SynonymDict.load("domain_synonyms.json") self.stopwords = set(["the", "a", "is", "how", "do", "I"]) def expand(self, query: str) -> list[str]: tokens = query.lower().split() keywords = [t for t in tokens if t not in self.stopwords] variants = [query] # always include original # V1: Synonym swap (most impactful) for kw in keywords: if kw in self.synonyms: syn = self.synonyms[kw][0] variants.append(query.replace(kw, syn)) break # one swap per variant # V2: Keyword-only (drop question words) variants.append(" ".join(keywords)) # V3: Reversed keyword order variants.append(" ".join(reversed(keywords))) # V4: Add domain context prefix variants.append(f"documentation: {query}") return variants[:5] # Example: # Input: "how do I fix auth errors" # Output: [ # "how do I fix auth errors", # original # "how do I fix authentication errors", # synonym # "fix auth errors", # keywords-only # "errors auth fix", # reversed # "documentation: how do I fix auth errors" # prefixed # ]

Pros: Zero latency, zero cost, deterministic. Cons: Limited diversity, no semantic understanding. Best for: First-pass expansion while LLM results are pending.

Strategy 2: Fine-Tuned Small Model (10–30ms)

Distill a large LLM's query expansion capability into a small local model (T5-small, FLAN-T5-base, or a 60M-param custom model). Runs on CPU in 10–30ms.

from transformers import T5ForConditionalGeneration class LocalQueryExpander: """Fine-tuned T5-small for query expansion. ~15ms on CPU. No API call.""" def __init__(self): self.model = T5ForConditionalGeneration.from_pretrained( "./models/query-expander-t5-small" ) self.tokenizer = AutoTokenizer.from_pretrained( "./models/query-expander-t5-small" ) def expand(self, query: str, n=5) -> list[str]: prompt = f"expand query: {query}" inputs = self.tokenizer(prompt, return_tensors="pt") outputs = self.model.generate( **inputs, num_return_sequences=n, num_beams=n, max_new_tokens=64, do_sample=False, ) return [ self.tokenizer.decode(o, skip_special_tokens=True) for o in outputs ] # Training data: 50K (query, expansion) pairs # generated by GPT-4/Claude from prod logs. # Fine-tune T5-small for 3 epochs. ~2hrs on 1 GPU.

Pros: Fast, free at inference, semantic-aware. Cons: Requires training, model maintenance. Best for: High-QPS production systems.

Strategy 3: Pre-Computed Cache (0ms hit / 300ms miss)

Cache LLM-generated expansions by normalized query. First request is slow; all subsequent identical or near-identical queries are instant. Use semantic similarity for fuzzy cache matching.

class SemanticExpansionCache: """Cache LLM expansions. 0ms on hit. Semantic fuzzy matching for near-dupes.""" def __init__(self, redis, encoder, llm): self.redis = redis # exact cache self.encoder = encoder # for fuzzy match self.index = FAISSIndex() # query embedding index self.llm = llm # fallback generator async def get_expansions(self, query: str) -> list[str]: # L1: Exact match (Redis, ~0.1ms) key = hashlib.md5(query.lower().encode()).hexdigest() cached = self.redis.get(key) if cached: return json.loads(cached) # L2: Semantic fuzzy match (~2ms) q_emb = self.encoder.encode(query) hits = self.index.search(q_emb, top_k=1) if hits and hits[0].score > 0.95: # "fix auth errors" ≈ "fix authentication errors" return self.redis.get(hits[0].id) # L3: Cache miss → generate (async, don't block) expansions = await self.llm.expand(query) self.redis.setex(key, 3600, json.dumps(expansions)) self.index.add(q_emb, key) return expansions

Hit rate: 40–70% for most production systems (users ask similar questions). Semantic matching pushes this to 60–85%.

★ Strategy 4: Hybrid — The Recommended Approach

Combine all three: serve template-generated queries instantly (5ms), check cache for LLM-quality expansions (0ms if hit), and fire-and-forget an async LLM call to upgrade the cache for next time.

HYBRID QUERY EXPANSION — REQUEST FLOW User Query SYNCHRONOUS — RUNS IN PARALLEL (5ms total) Phase 1: Template Expand synonym + keyword + prefix ~2ms → 5 variants Phase 2: Cache Lookup Redis exact + FAISS fuzzy ~2ms → hit or miss Phase 2b: Filter Extract date, category, entity, ACL ~5ms → metadata filters Cache Hit? YES Merge template + cached → 5 best variants (0ms) MISS Immediate: return templates Async: fire LLM backfill → (cache for next request) Background: LLM expand (300ms) → writes to cache, user doesn't wait Return 5 Queries → Parallel Retrieve Total: 5ms (hit) or 7ms (miss) 1st request: template quality (5ms) → 2nd identical request: LLM quality from cache (0ms) → quality improves over time, latency never degrades
class HybridQueryExpander: """5ms P95 response. Best quality over time. Template → Cache → Async LLM backfill.""" def __init__(self): self.template = TemplateExpander() # 5ms self.cache = SemanticExpansionCache() # 0ms hit self.llm = LLMExpander() # 300ms async def expand(self, query: str) -> list[str]: # Phase 1: Instant (5ms) — always available template_variants = self.template.expand(query) # Phase 2: Cache check (0–2ms) cached = await self.cache.get(query) if cached: # Merge template + cached LLM variants return self.dedupe(cached + template_variants)[:5] # Phase 3: Return templates NOW, # fire async LLM to backfill cache asyncio.create_task( self._async_backfill(query) ) return template_variants # 5ms total async def _async_backfill(self, query): """Runs in background. Next identical query will get LLM-quality expansions.""" try: expansions = await self.llm.expand(query) await self.cache.set(query, expansions) except Exception: pass # template fallback is fine

Result: First request gets template variants in 5ms. Second request gets LLM-quality variants from cache in 0ms. No user ever waits for the LLM.

Complete Latency Breakdown — Query Transform Pipeline

StepOperationLatencyRunsCan Parallelize?
ClassifyRule-based + embedding classifier~3msAlways
Template expandSynonym swap, keyword extract, prefix~2msAlways
Cache lookupRedis exact + FAISS semantic~2msAlways✓ parallel with templates
Filter extractRegex + NER for metadata~5msAlways✓ parallel with above
LLM expandHaiku/mini generate 5 variants~300msCache miss onlyAsync (fire-and-forget)
HyDE generateHypothetical doc generation~400msTechnical queries onlyAsync (fire-and-forget)
Total user-facing latency (hybrid)5–15ms P95LLM runs async, result cached for next request

Warm-Up Strategy

Pre-populate the expansion cache by running your top 10,000 queries from production logs through the LLM expander offline. This gives instant cache hits for the most common queries from day one.

# Offline warm-up script for query in top_10k_queries: expansions = await llm.expand(query) cache.set(query, expansions) # Run nightly. ~$3 for 10K queries.

Batch LLM Calls

If you must call an LLM synchronously, batch multiple queries into a single request. Generate all 5 variants in one prompt (not 5 separate calls). This cuts 5×300ms to 1×350ms.

# One call, 5 variants: prompt = f"""Generate 5 diverse search queries for: "{query}" Return as JSON array.""" # → 1 API call ≈ 300ms # NOT: 5 calls × 300ms = 1.5s ❌

Streaming + Speculative

Start retrieval with template queries immediately. If LLM expansions arrive (from cache or async), merge them into the result set before reranking. The LLM expansions enrich, never block.

# Speculative parallel execution template_results = retrieve(template_qs) # If LLM expansions arrive in time: llm_results = retrieve(llm_qs) # bonus merged = rrf_merge(template_results, llm_results) # If not: template results alone are fine
The 5ms Query Transform — Summary Recipe:
① Classify query type (3ms, rule-based)
② Generate template variants (2ms, synonym + keyword)
③ Check semantic cache for LLM variants (2ms, Redis + FAISS) — in parallel with ②
④ Extract metadata filters (5ms, regex + NER) — in parallel with ②③
⑤ If cache miss: fire-and-forget async LLM call to backfill cache for next time
⑥ Return template+cached variants immediately → start retrieval
Total: 5–15ms P95. User never waits for LLM. Quality improves over time as cache fills.
08 / Query Transformation
Query Time

Advanced Retrieval Strategies

Sparse, dense, and hybrid retrieval each encode different failure modes; hybrid retrieval fuses signals.

Query Transform (rewrite/expand) BM25 Vector Fusion (RRF/Weighted) Reranker (cross-encoder) Context Builder LLM

Retrieval Strategy Patterns

Hybrid Search (Dense + Sparse)

Run BM25 and vector search in parallel; fuse results via Reciprocal Rank Fusion (RRF) or weighted sum.

score = (bm25_score * w1) + (vector_score * w2) rank_bm25 = 1 / (k + position_bm25) rank_vector = 1 / (k + position_vector) final = rank_bm25 + rank_vector

Multi-Query Expansion

LLM generates 3–5 diverse rephrased queries targeting different aspects of the user's question. Retrieve for each, then merge and deduplicate. Detailed deep-dive below.

HyDE (Hypothetical Document Embeddings)

LLM generates hypothetical document for query; embed it; search nearest neighbors. Bridges intent-execution gap.

Query Routing

Classify query intent; route to specialized indices (e.g., FAQ vs. technical docs). Faster and more precise.

Parent Document Retrieval

Retrieve fine-grained child chunks; expand with parent (full section). Balance precision + context.

Step-Back Prompting

Ask "What high-level concept does this question ask?"; retrieve abstract info first; then detailed.

Metadata Filtering

Pre-filter chunks by date, source, or category before vector search. Reduce retrieval pool; improve relevance.

Contextual Compression

Retrieve top-K; use LLM to extract relevant sentences. Reduce context window; increase token efficiency.

Learned Sparse (SPLADE)

SPLADE-family models: learned sparse vectors; interpretable term weights; combines dense + sparse strengths.

HybridRetriever Example

class HybridRetriever: def retrieve(self, query: str, top_k=5): # Sparse: BM25 bm25_hits = self.bm25.retrieve(query, top_k=top_k*2) # Dense: Vector search query_emb = self.embed.encode(query) vector_hits = self.vector_db.search(query_emb, top_k=top_k*2) # Reciprocal Rank Fusion (RRF) fused = self.reciprocal_rank_fusion( bm25_hits, vector_hits, weights=(0.4, 0.6) ) return fused[:top_k]

Multi-Query Expansion — Deep Dive

A single user query often captures only one perspective of what they need. Multi-Query Expansion uses an LLM to generate diverse reformulations that target different angles, vocabulary, and levels of specificity — then retrieves for each and merges results. This typically improves Recall@K by 15–30%.

MULTI-QUERY EXPANSION FLOW Original Query "How do I fix auth errors?" LLM Expander Generate 4 diverse reformulations Q0: "How do I fix auth errors?" Q1: "authentication failure troubleshoot" Q2: "401 403 unauthorized error resolve" Q3: "OAuth token expired refresh flow" Q4: "login session invalid API key fix" Parallel Retrieve 5 queries × top-K each → merge + deduplicate → RRF rank fusion Merged Results Higher recall Broader coverage Each query variant captures different vocabulary (auth/401/OAuth/token/login), specificity levels, and error types. Documents matching ANY variant are surfaced — dramatically improving recall for ambiguous or technical queries.

Production MultiQueryExpander

class MultiQueryExpander: PROMPT = """Generate {n} diverse search queries for the user question below. Each query should target a DIFFERENT aspect: - One using technical terms / error codes - One using simple plain language - One asking the "why" behind the issue - One focused on the solution / fix User question: {query} Return as JSON array of strings.""" def __init__(self, llm, n_queries=4): self.llm = llm self.n = n_queries self.cache = QueryExpansionCache(ttl=3600) async def expand(self, query: str) -> list[str]: # Check cache first (same query = same expansions) cached = self.cache.get(query) if cached: return cached result = await self.llm.generate( self.PROMPT.format(n=self.n, query=query), model="claude-haiku-4-5-20251001", # fast + cheap temperature=0.7, # some diversity ) variants = json.loads(result) # Always include original query all_queries = [query] + variants[:self.n] self.cache.set(query, all_queries) return all_queries

Multi-Query Retriever with RRF Fusion

class MultiQueryRetriever: def __init__(self, expander, retriever, reranker): self.expander = expander self.retriever = retriever self.reranker = reranker async def search(self, query, top_k=5): # Step 1: Expand query queries = await self.expander.expand(query) # Step 2: Parallel retrieval for all variants all_results = await asyncio.gather(*[ self.retriever.search(q, top_k=top_k * 2) for q in queries ]) # Step 3: Reciprocal Rank Fusion fused = self.rrf_merge(all_results, k=60) # Step 4: Rerank against ORIGINAL query # (not the variants!) reranked = self.reranker.rerank( query=query, # original intent candidates=fused[:top_k * 3], top_k=top_k ) return reranked def rrf_merge(self, result_lists, k=60): """Reciprocal Rank Fusion across all query variants.""" scores = {} for results in result_lists: for rank, doc in enumerate(results): if doc.id not in scores: scores[doc.id] = 0 scores[doc.id] += 1.0 / (k + rank) return sorted(scores.items(), key=lambda x: x[1], reverse=True)

Expansion Strategies

Synonym expansion: Replace key terms with alternatives ("auth" → "authentication", "login")
Specificity ladder: Abstract ("security issue") + specific ("OAuth 2.0 token expired 401")
Perspective shift: Problem ("auth fails") + solution ("fix authentication") + cause ("why does token expire")
Domain injection: Add domain context ("in Kubernetes" or "for REST API")

When NOT to Use

Exact-match queries: Order ID lookups, SKU searches, specific error codes — expansion adds noise
Low-latency paths: Adds ~200–400ms for LLM expansion. Use only when retrieval quality matters more than speed
Small corpus (<1K docs): Expansion just returns the same docs repeatedly. Not worth the cost

Production Optimizations

Cache expansions: Same query → same variants. 1-hour TTL covers repeated queries
Use cheapest LLM: Haiku/GPT-4o-mini for expansion (~$0.001 per query)
Parallel retrieval: Run all variant searches simultaneously with asyncio.gather
Rerank against original: Always rerank using the ORIGINAL query, not variants — variants help recall, reranking restores precision

Benchmark Impact: Multi-Query Expansion typically improves Recall@5 by 15–30% and Recall@20 by 10–20% compared to single-query retrieval. Combined with cross-encoder reranking, the full pipeline achieves ~94% Recall@5 vs ~62% for vector-search-only. The cost is ~200ms added latency (cacheable) and ~$0.001/query for the expansion LLM call.

Low-Latency Retrieval — Hitting <30ms P95

For real-time chat and voice agents, the entire retrieval pipeline (query → embed → search → filter → return chunks) must complete in under 30ms P95. Here's how production systems achieve this.

RETRIEVAL LATENCY BUDGET — TARGET: <30ms P95 OPERATION TIME Embed Query ~3ms HNSW Vector Search (top-50) ~5ms BM25 Sparse Search (top-50) parallel ↕ Metadata Filter ~2ms RRF Merge + Dedup ~1ms FlashRank (top-20→10) ~5ms Return Top-5 Total: ~16ms P50 | ~28ms P95 | ~45ms P99 ✓ <30ms

Embedding Latency — <5ms

The query embedding step is on the critical path. Every millisecond counts.

# Strategy: Pre-warm + GPU + small model class FastEmbedder: def __init__(self): # Use small model: 384d, ~3ms on GPU self.model = SentenceTransformer( "all-MiniLM-L6-v2", device="cuda" ) # Pre-warm: run dummy inference self.model.encode("warmup") # ONNX quantized for CPU-only deploys: # self.model = ORTModel("model.onnx") # → ~5ms on CPU vs ~15ms PyTorch def encode(self, text: str): return self.model.encode( text, normalize_embeddings=True )

Options: all-MiniLM-L6 (3ms GPU), ONNX quantized (5ms CPU), Matryoshka 256d (2ms, -1% quality), API (10-20ms + network).

Vector Search — <10ms at Scale

HNSW indexes deliver sub-10ms search even at 10M+ vectors. Key: tune ef_search, keep quantized index in RAM.

# Qdrant: tuned for low latency collection_config = { "vectors": { "size": 384, # small = faster "distance": "Cosine", }, "hnsw_config": { "m": 16, # graph density "ef_construct": 200, # build quality }, "quantization_config": { "scalar": { "type": "int8", # 4x smaller "always_ram": True, # no disk IO } }, "on_disk_payload": True, # metadata on disk } # Search params: search_params = { "hnsw_ef": 64, # lower = faster (vs 128) "exact": False, # ANN, not brute force } # Result: ~5ms for 10M vectors, int8 quantized

Parallel Hybrid Search

Run BM25 and vector search simultaneously. Both return in ~5ms. RRF merge takes ~1ms. Total hybrid: ~6ms vs 10ms serial.

# Parallel hybrid: 6ms total dense, sparse = await asyncio.gather( vector_db.search(q_emb, top_k=50), bm25_index.search(q_text, top_k=50), ) fused = rrf_merge(dense, sparse) # NOT: dense = await ...; sparse = await ... # That's serial: 5+5 = 10ms ❌

Retrieval Result Cache

Cache the final retrieved chunks by normalized query hash. 30–50% hit rate for production systems. 0ms on hit.

# Redis retrieval cache key = md5(normalize(query) + user_acl) cached = redis.get(key) if cached: return json.loads(cached) # 0ms # ACL in key prevents cross-user leakage # TTL: 15min (balance freshness vs speed)

Connection Pooling

Cold connections to vector DB add 20–50ms. Pool connections and keep them warm. Use gRPC over HTTP for lower overhead.

# Qdrant gRPC connection pool client = QdrantClient( url="qdrant:6334", prefer_grpc=True, # not REST grpc_options={ "grpc.keepalive_time_ms": 10000, }, ) # Pre-warm: send dummy search on startup
Low-Latency Retrieval Checklist:
Small embedding model (384d, GPU or ONNX quantized) — saves 10ms vs large model
Int8 quantized HNSW index, always in RAM — saves 5–20ms vs disk
Parallel BM25 + vector search with asyncio.gather — saves 5ms vs serial
gRPC connection pooling to vector DB — saves 20–50ms cold start
Retrieval result cache with ACL-aware keys (15min TTL) — 0ms on 30–50% of queries
FlashRank fast reranker instead of cross-encoder for first pass — 5ms vs 50ms
Metadata pre-filtering to reduce search pool before vector search
Lower ef_search (64 vs 128) for HNSW — ~2ms savings, <1% recall drop
09 / Retrieval
Precision

Reranking & Relevance Scoring

100 Candidates FlashRank ~5ms 30 Candidates Cross-Encoder ~50ms 10 Candidates MMR Diversity ~2ms 5 Final Results Ranked & Diverse Ready to generate Stage 1 Stage 2 Stage 3 Output

Quality Improvement Pipeline

Vector Only 62%
+Hybrid Search 74%
+Cross-Encoder Reranker 89%
+Multi-Query + Rerank 94%
Research Note (BEIR Benchmark): Cross-encoder reranking is powerful but expensive (milliseconds per query). Use selectively when retrieval confidence is low. Avoid reranking 1000s of results; pre-filter to top-50.
Reranker Type Latency Accuracy Cost / Deployment
Cohere Rerank v3.5 API cross-encoder 50–150ms SOTA $0.001 / 1000 queries
Jina Reranker v2 API 100–200ms Excellent $0.0005 / query
cross-encoder/ms-marco Open-source HF 5–20ms (A100) Good (BERT-base) Free; self-hosted
BGE Reranker v2.5 Open-source HF 10–30ms (A100) Very Good Free; self-hosted
RankGPT (LLM-based) LLM proxy 200ms–1s SOTA (model-dependent) API cost; slow
FlashRank (tiny) Open-source distilled 2–5ms (CPU) Acceptable (70–80%) Free; ultra-fast

MultiStageReranker: Cascade Strategy

class MultiStageReranker: __init__(self): self.fast = FlashRank() self.strong = CohereRerank() self.diversity = MMRDiversifier() def rerank(self, query, candidates, top_k=5): # Stage 1: Fast filter (FlashRank) stage1 = self.fast.rerank(query, candidates, top_k=20) # Stage 2: Cross-encoder (Cohere) stage2 = self.strong.rerank(query, stage1, top_k=10) # Stage 3: Diversity (MMR) final = self.diversity.diversify(stage2, top_k=top_k) return final
09 / Reranking
LLM Layer

Prompt Engineering & Generation

"Prompting is a contract between retrieval and generation" — context discipline, citations, and answer modes matter.

Production RAG Prompt Template

"""You are a helpful assistant. Answer ONLY from the provided context. Context: {context} Question: {question} Rules: 1. Base your answer ONLY on the provided context. 2. Cite sources using [Source N] for each fact. 3. If the context does not contain the answer, say: "I don't have information on this." 4. Do NOT guess, speculate, or add outside knowledge. 5. Be concise; use bullet points for clarity. Answer: """

RAGGenerator: Streaming, Fallback, Confidence Gating

class RAGGenerator: def __init__(self, primary_model, fallback_model): self.primary = primary_model # GPT-4 self.fallback = fallback_model # GPT-3.5 (cheaper) def generate(self, context, query, stream=True): retrieval_confidence = self.assess_confidence(context) if retrieval_confidence < 0.5: return "Insufficient context. Please refine your query." model = self.primary if retrieval_confidence > 0.8 else self.fallback for chunk in model.stream(prompt=self.prompt.format(context, query)): yield chunk def assess_confidence(self, context): # Score based on retrieval rank, citation density, recency return (0.7 * avg_rank) + (0.2 * citation_count) + (0.1 * recency)

Streaming (SSE/WebSocket)

Return tokens as they arrive, not end-to-end. Target <500ms TTFT (time-to-first-token). Improves perceived latency and UX.

Citation Extraction

Parse [Source N] references; validate against retrieved chunks. Enable user verification; prevent hallucinated citations.

Fallback Strategy

Route to cheaper/faster model if retrieval confidence is low. Use strong model only when context is rich. Optimize cost/quality.

Research: Self-RAG
Self-RAG decides whether retrieval is needed per token; critiques its own outputs. Studies show 10–15% improvements in factuality by selective retrieval. Implement using token-level confidence scores from the LLM.

LLM Orchestration Policies

  • Model Routing: Classify query complexity; route simple queries to fast model (GPT-3.5), complex to strong model (GPT-4). Save 50%+ on inference cost.
  • Caching Layers: Cache responses by normalized query + context hash. ACL-sensitive keys (per-user); 24-72h TTL. Reduces latency and cost for repeated queries.
  • Hallucination Mitigation Toolbox: Use retrieval-augmented verification (CTRL), confidence thresholds, structured output format (JSON schema), and post-generation fact-checking against context.
10 / Generation
Validators

Response Evaluation Layer

In production, the LLM alone is NOT trusted. Every response passes through a parallel evaluation layer — grounding verification, intent alignment, safety moderation, and confidence scoring — all within 50–200ms.

LLM Response Parallel Evaluation Layer (50–200ms) Grounding Check Answer supported by context? Embedding Similarity Cross-Encoder Verify LLM-Based Grounding Intent Check Response matches user intent? Intent Classification Embedding Similarity threshold > 0.8 Safety Check Content policy compliance? Moderation Models Rule Engine NeMo / Guardrails AI Confidence Score Weighted aggregate 0.4×ground + 0.3×retrieval + 0.2×intent + 0.1×safety threshold: 0.85 Decision Engine ✓ PASS ↻ RETRY ⚠ FALLBACK

1. Grounding Check — Deep Dive

The grounding check is the single most important validator in a production RAG system. It verifies that every claim in the LLM's response is actually supported by the retrieved context — catching hallucinations before they reach the user.

GROUNDING VERIFICATION PIPELINE — CASCADING TIERS TIER 1: Embedding ~5–15ms | All responses cosine_sim(answer, ctx) >0.75 → PASS (skip T2/T3) 0.5–0.75 → escalate to T2 TIER 2: Cross-Encoder ~30–80ms | Ambiguous only NLI(premise=ctx, hyp=claim) entailment → PASS neutral/contradiction → T3 TIER 3: LLM-as-Judge ~300–800ms | Disputed only claim_by_claim verify supported → PASS unsupported → REJECT Grounded → Deliver Hallucinated → Retry

Tier 1: Embedding Similarity

Fastest check (~5–15ms). Runs on every response. Converts answer + context chunks to embeddings, measures cosine similarity.

from sentence_transformers import SentenceTransformer import numpy as np class EmbeddingGrounder: def __init__(self): self.model = SentenceTransformer( "all-MiniLM-L6-v2" # 384d, fast ) def check(self, answer, chunks): a_emb = self.model.encode(answer) c_embs = self.model.encode( [c.text for c in chunks] ) # Max similarity across chunks scores = np.dot(c_embs, a_emb) / ( np.linalg.norm(c_embs, axis=1) * np.linalg.norm(a_emb) ) score = float(scores.max()) if score > 0.75: return Grounded(score) elif score > 0.5: return Ambiguous(score) # → T2 else: return Hallucinated(score)

Tools: FAISS, pgvector, Sentence Transformers, HuggingFace Embeddings, OpenAI text-embedding-3-small

Tier 2: Cross-Encoder / NLI

More accurate (~30–80ms). Only runs on ambiguous T1 results. Uses Natural Language Inference to classify each claim as entailed, neutral, or contradicted by context.

from transformers import pipeline class NLIGrounder: def __init__(self): self.nli = pipeline( "text-classification", model="cross-encoder/" "nli-deberta-v3-large" ) def check(self, answer, context): # Split answer into claims claims = self.extract_claims(answer) results = [] for claim in claims: pred = self.nli( f"{context} [SEP] {claim}" ) label = pred[0]["label"] # entailment/neutral/contradiction results.append((claim, label)) contradictions = [ c for c, l in results if l == "contradiction" ] return NLIResult( grounded=len(contradictions)==0, flagged_claims=contradictions )

Models: DeBERTa-v3-large-NLI, cross-encoder/nli-MiniLM, BART-large-MNLI, Cohere Rerank v3.5

Tier 3: LLM-as-Judge

Most flexible (~300–800ms). Only runs on disputed claims from T2. Performs claim-by-claim verification with explicit reasoning.

class LLMGrounder: PROMPT = """Verify each claim against the context. For each claim, respond: SUPPORTED / NOT SUPPORTED / PARTIAL Context: {context} Claims to verify: {claims} Respond as JSON: [{{"claim": "...", "verdict": "...", "evidence": "...", "confidence": 0.0}}] """ async def check(self, claims, ctx): result = await self.llm.generate( self.PROMPT.format( context=ctx, claims="\n".join(claims) ), model="claude-haiku-4-5-20251001", # Use cheap fast model temperature=0, ) verdicts = json.loads(result) unsupported = [ v for v in verdicts if v["verdict"] != "SUPPORTED" ] return LLMVerdict( grounded=len(unsupported)==0, unsupported_claims=unsupported )

Models: Claude Haiku (cheapest), GPT-4o-mini, Gemini Flash, Llama 3.1 8B (self-hosted)

Production Grounding Service (Cascading)

class ProductionGroundingService: """Cascading grounding: fast → accurate → LLM P95 latency: ~20ms (80% exit at T1)""" def __init__(self): self.t1 = EmbeddingGrounder() # ~10ms self.t2 = NLIGrounder() # ~50ms self.t3 = LLMGrounder() # ~500ms self.metrics = GroundingMetrics() async def verify(self, answer, chunks, query): # Tier 1: Embedding (always runs) t1 = self.t1.check(answer, chunks) self.metrics.record("t1", t1.score) if t1.score > 0.75: return GroundingResult( grounded=True, tier=1, score=t1.score ) if t1.score < 0.4: return GroundingResult( grounded=False, tier=1, score=t1.score, action="regenerate" ) # Tier 2: NLI (ambiguous zone 0.4–0.75) claims = self.extract_claims(answer) t2 = self.t2.check(answer, chunks) self.metrics.record("t2", t2) if t2.grounded: return GroundingResult( grounded=True, tier=2 ) if len(t2.flagged_claims) == 0: return GroundingResult( grounded=True, tier=2 ) # Tier 3: LLM judge (disputed claims only) t3 = await self.t3.check( t2.flagged_claims, "\n".join(c.text for c in chunks) ) self.metrics.record("t3", t3) return GroundingResult( grounded=t3.grounded, tier=3, unsupported=t3.unsupported_claims, action="regenerate" if not t3.grounded else None )

Tools & Libraries Comparison

ToolTypeLatencyBest For
Sentence TransformersEmbedding~5msT1 — fast similarity
FAISSVector index~1msBatch embedding lookup
pgvectorPostgres ext~5msSQL-native similarity
DeBERTa-v3 NLICross-encoder~50msT2 — NLI classification
BART-large-MNLINLI model~40msT2 — zero-shot NLI
Cohere RerankAPI reranker~60msT2 — relevance scoring
Claude HaikuLLM API~400msT3 — claim verification
GPT-4o-miniLLM API~500msT3 — claim verification
Guardrails AIFrameworkvariesOrchestrate all tiers
RAGASEval frameworkofflineMeasure faithfulness
TruLensEval+traceofflineGroundedness monitoring
DeepEvalCI evalofflineHallucination CI gate

How the 80 / 15 / 5 Cascading Exit Works

In production, you do NOT run all three tiers on every response. Instead, you cascade: the fast cheap check runs first, and only ambiguous results escalate to the next tier. This is why 80% of requests cost ~10ms and only 5% ever hit the expensive LLM judge.

CASCADING EXIT — DECISION FLOW WITH TRAFFIC DISTRIBUTION 100% of LLM Responses TIER 1: Embedding Similarity (~10ms) cosine_sim(answer_emb, context_emb) score > 0.75 80% EXIT ✓ score < 0.4 ~5% REJECT ✗ ~15% ambiguous (0.4–0.75) TIER 2: Cross-Encoder NLI (~50ms) NLI(premise=context, hypothesis=claim) entailment ~10% EXIT ✓ ~5% disputed claims TIER 3: LLM-as-Judge (~500ms) claim-by-claim verdict: SUPPORTED / NOT ~3% EXIT ✓ ~2% REJECT → Retry Weighted avg latency: (0.80 × 10ms) + (0.15 × 50ms) + (0.05 × 500ms) = 40.5ms P50 — vs 500ms if every response hit LLM judge

Tier 1 Exit (80%) — Clear Match

Most RAG answers closely paraphrase the retrieved context. Embedding similarity catches these trivially.

# Example: clear grounding Context: "Returns accepted within 30 days of purchase with original receipt." Answer: "You can return items within 30 days if you have the original receipt." cosine_similarity = 0.91 # > 0.75 # → PASS at Tier 1. No further checks. # Latency: ~10ms. Cost: $0.00.

This covers: direct paraphrasing, factual restatement, simple summarization, exact quotes, and minor rewording. The embedding model captures semantic equivalence without needing deeper reasoning.

Tier 2 Escalation (15%) — Ambiguous Zone

When the answer uses different vocabulary or adds inference, embeddings give a middling score. NLI resolves the ambiguity.

# Example: inference from context Context: "Premium members get free shipping on orders over $50." Answer: "As a premium member, your $75 order qualifies for free shipping." cosine_similarity = 0.62 # ambiguous zone # → Escalate to Tier 2 NLI("Premium members get free shipping on orders over $50", "$75 order qualifies for free shipping") # → entailment (0.94 confidence) # → PASS at Tier 2. Latency: ~60ms.

This covers: logical inference, numerical reasoning ("$75 > $50"), conditional application, combining info from multiple chunks, and contextual deduction.

Tier 3 Escalation (5%) — Disputed Claims

When NLI returns "neutral" (neither entailed nor contradicted) or there are mixed verdicts across claims, the LLM judge arbitrates.

# Example: mixed/complex claim Context: "The product is available in blue and red. Ships within 3-5 days." Answer: "The product comes in blue, red, and green. Usually arrives in a week." T1 cosine_similarity = 0.58 # ambiguous T2 NLI: "blue and red" → entailment ✓ "green"neutral ⚠️ # not in ctx "arrives in a week"neutral ⚠️ # → Escalate disputed claims to Tier 3 LLM Judge: "green": NOT SUPPORTED # hallucination! "week": PARTIAL # 3-5 days ≈ week # → REJECT "green", accept "week" # → Strip hallucinated claim, regenerate

Why This Works — The Math

The cascade works because most RAG answers are well-grounded (the retrieval pipeline already found relevant context). Only edge cases need expensive verification.

MetricAll T3CascadeSavings
Avg latency500ms40ms12.5x faster
P50 latency500ms10ms50x faster
P95 latency800ms60ms13x faster
Cost / 1K queries$0.50$0.0316x cheaper
Hallucination catch~98%~96%-2% (acceptable)

Key insight: You trade ~2% hallucination detection rate for a 12x latency reduction and 16x cost reduction. For the remaining 2%, user feedback loops and offline evaluation catch regressions.

Tuning the Thresholds — Production Guidance

T1 Pass Threshold (default: 0.75)

Raise to 0.80–0.85 for high-stakes domains (medical, legal, financial). Lower to 0.65–0.70 for casual Q&A where speed matters more. Tune by measuring T2/T3 escalation rate — if <5% escalate, threshold is too low.

T1 Reject Threshold (default: 0.4)

Below this, the answer is clearly unrelated to context — skip T2/T3 and regenerate immediately. Raise to 0.5 for stricter domains. Monitor false-rejection rate via user feedback.

T2→T3 Escalation (default: any contradiction)

Only escalate if T2 finds "contradiction" (not just "neutral"). Neutral means the context doesn't address the claim — which might be acceptable for partial answers. Tune per use case.

# Threshold config per use case GROUNDING_CONFIG = { "default": { "t1_pass": 0.75, "t1_reject": 0.40, "t2_escalate_on": ["contradiction"], }, "medical": { "t1_pass": 0.85, "t1_reject": 0.50, # stricter "t2_escalate_on": ["contradiction", "neutral"], # always verify }, "casual_qa": { "t1_pass": 0.65, "t1_reject": 0.35, # faster "t2_escalate_on": ["contradiction"], # only clear issues }, }
What to Monitor: Track the tier exit distribution over time. If T2 escalation rises above 25%, your embedding model may be drifting (retrain or upgrade). If T3 rejects rise above 5%, your retrieval pipeline quality may be degrading. Dashboard these in Grafana/Datadog alongside your standard RAG metrics.

2. Intent Check — Response Matches User Intent

Verifies the response actually addresses what the user asked. Catches drift where the model answers a different question entirely.

Example Problem:
User: "Track my order" → Answer: "Here are some shoes you may like" — intent mismatch!
# Intent alignment pipeline class IntentAlignmentChecker: def check(self, query, response): # Classify both through intent model query_intent = self.intent_model.predict(query) response_intent = self.intent_model.predict(response) # Or use embedding similarity q_emb = self.encoder.encode(query) r_emb = self.encoder.encode(response) similarity = cosine_similarity(q_emb, r_emb) if similarity < 0.8: return IntentResult( aligned=False, query_intent=query_intent, response_intent=response_intent )

Common intent models: Rasa, SetFit, fine-tuned classifiers. For production voice agents, embedding-based intent similarity with threshold >0.8 is fastest.

3. Safety Check — Content Moderation

Prevents unsafe or policy-violating responses: illegal instructions, abusive content, financial advice risks, policy violations.

A. Moderation Models

# Dedicated safety classifiers result = moderation_api.classify(response) # Output: {"violence": false, "hate": false, "self_harm": false} if any(result.values()): return block_response()

B. Rule Engine

Hard rules for regulated domains: refund policies, medical/financial advice, guaranteed outcomes. Example: if answer contains "guaranteed profit" → reject.

C. Guardrail Frameworks

Production libraries: Guardrails AI, NeMo Guardrails. Enforce content policies, structured outputs, and safe responses declaratively.

4. Confidence Score — Final Decision Engine

Aggregates all evaluator scores into a weighted confidence signal. Detailed deep-dive below.

Confidence Score — How It's Calculated

The confidence engine is the final gate before a response reaches the user. It takes raw scores from every evaluator, normalizes them, applies domain-specific weights, and produces a single decision: pass, retry, or fallback.

CONFIDENCE SCORE CALCULATION — WEIGHTED AGGREGATION + VETO LOGIC RAW SIGNALS grounding_score: 0.91 retrieval_score: 0.85 intent_score: 0.95 safety_score: 1.00 citation_score: 0.88 freshness_score: 1.00 WEIGHTED AGGREGATION 0.30 × grounding = 0.273 0.25 × retrieval = 0.213 0.15 × intent = 0.143 0.10 × safety = 0.100 0.10 × citation = 0.088 0.10 × freshness = 0.100 weighted_sum = 0.917 + VETO CHECK (any hard fail?) VETO OVERRIDES (any one triggers immediate reject) × safety_score < 0.5 × grounding_score < 0.3 × pii_detected == True × citation_fraud == True × blocked_content == True Veto = instant REJECT regardless > 0.85 PASS ✓ 0.60 – 0.85 RETRY ↻ < 0.60 or VETO FALLBACK ⚠ Weights configurable per domain / use case

Production ConfidenceEngine Implementation

class ConfidenceEngine: def __init__(self, config: DomainConfig): self.weights = config.weights self.thresholds = config.thresholds self.veto_rules = config.veto_rules def calculate(self, scores: EvalScores) -> Decision: # Step 1: Check hard veto rules first for rule in self.veto_rules: if rule.triggered(scores): return Decision( action="REJECT", reason=rule.name, confidence=0.0, vetoed=True ) # Step 2: Weighted aggregation raw_score = sum( self.weights[k] * getattr(scores, k) for k in self.weights ) # Step 3: Apply penalty for low-scoring # individual signals (even if weighted # average is high) penalty = 0.0 for k, threshold in self.thresholds.min_per_signal.items(): val = getattr(scores, k) if val < threshold: gap = threshold - val penalty += gap * 0.5 # 50% of gap final = max(0.0, raw_score - penalty) # Step 4: Map to decision if final >= self.thresholds.pass_threshold: return Decision("PASS", final) elif final >= self.thresholds.retry_threshold: return Decision("RETRY", final) else: return Decision("FALLBACK", final)

Why These Weights?

SignalWeightRationale
Grounding0.30Highest — a hallucinated answer is the #1 failure mode. If grounding fails, nothing else matters.
Retrieval0.25If retrieval quality is low, the LLM is working with bad context. Garbage in → garbage out.
Intent0.15Answering the wrong question is bad but less dangerous than hallucinating facts.
Safety0.10Low weight in formula BUT has a hard veto — any safety flag = instant reject regardless of score.
Citation0.10Verifies source attribution. Important for trust but not critical for correctness.
Freshness0.10Only matters for temporal queries. Many questions are time-independent.

Veto Rules — Hard Overrides

Certain conditions bypass the weighted score entirely and force an immediate reject. No amount of high scores elsewhere can compensate.

VETO_RULES = [ VetoRule("unsafe_content", lambda s: s.safety < 0.5), VetoRule("severe_hallucination", lambda s: s.grounding < 0.3), VetoRule("pii_leakage", lambda s: s.pii_detected), VetoRule("citation_fraud", lambda s: s.citation_valid_pct < 0.5), VetoRule("blocked_topic", lambda s: s.blocked_content), ] # If ANY veto fires → instant REJECT # regardless of weighted score

Worked Examples — Three Scenarios

Scenario A — PASS

"What's your return policy?"

Grounding: 0.91 × 0.30 = 0.273
Retrieval: 0.88 × 0.25 = 0.220
Intent: 0.95 × 0.15 = 0.143
Safety: 1.00 × 0.10 = 0.100
Citation: 0.90 × 0.10 = 0.090
Fresh: 1.00 × 0.10 = 0.100
─────────────────
Total: 0.926 → Penalty: 0
Decision: PASS ✓

Scenario B — RETRY

"Compare Plan A vs Plan B pricing"

Grounding: 0.62 × 0.30 = 0.186
Retrieval: 0.70 × 0.25 = 0.175
Intent: 0.90 × 0.15 = 0.135
Safety: 1.00 × 0.10 = 0.100
Citation: 0.40 × 0.10 = 0.040
Fresh: 1.00 × 0.10 = 0.100
─────────────────
Raw: 0.736 | Penalty: -0.05
Final: 0.686 → RETRY ↻
Retry with more context chunks

Scenario C — VETO REJECT

"Show me other users' orders"

Grounding: 0.85 × 0.30 = 0.255
Retrieval: 0.80 × 0.25 = 0.200
Intent: 0.92 × 0.15 = 0.138
Safety: 0.20 × 0.10 = 0.020
─── VETO TRIGGERED ───
safety < 0.5 → unsafe_content
Decision: REJECT ⚠
Even though weighted=0.85 would pass,
veto overrides. Response blocked.

Domain-Specific Weight Profiles

Different use cases need different weight distributions. A medical chatbot prioritizes grounding above all else; a casual Q&A bot prioritizes speed and intent alignment.

DomainGroundingRetrievalIntentSafetyCitationFreshPassRetry
General Q&A0.300.250.150.100.100.10>0.85>0.60
Medical / Legal0.400.200.100.150.100.05>0.90>0.70
E-commerce0.250.200.200.100.100.15>0.82>0.55
Voice Agent0.300.250.200.100.050.10>0.80>0.55
Internal Docs0.250.300.150.050.150.10>0.80>0.55
Financial0.350.200.100.150.100.10>0.92>0.75
# Config per domain DOMAIN_CONFIGS = { "medical": DomainConfig( weights={"grounding": 0.40, "retrieval": 0.20, "intent": 0.10, "safety": 0.15, "citation": 0.10, "freshness": 0.05}, thresholds=Thresholds(pass_threshold=0.90, retry_threshold=0.70), min_per_signal={"grounding": 0.7, "safety": 0.8}, # strict mins veto_rules=VETO_RULES + [ VetoRule("medical_disclaimer_missing", lambda s: s.has_medical_claim and not s.has_disclaimer), ] ), "ecommerce": DomainConfig( weights={"grounding": 0.25, "retrieval": 0.20, "intent": 0.20, "safety": 0.10, "citation": 0.10, "freshness": 0.15}, thresholds=Thresholds(pass_threshold=0.82, retry_threshold=0.55), min_per_signal={"grounding": 0.5}, veto_rules=VETO_RULES # standard vetos ), }
Calibration Tip: Don't guess at weights — measure them. Run your eval dataset through each evaluator independently, then use logistic regression on user feedback (thumbs up/down) to learn optimal weights for your domain. Recalibrate quarterly as your corpus and model evolve.

Production Microservice Architecture

Many companies deploy the evaluation layer as separate microservices for scalability and independent deployment.

class ResponseEvaluationService: """Runs all checks in parallel. Target: 50-200ms.""" async def evaluate(self, query, response, context): # Run all checks in parallel grounding, intent, safety = await asyncio.gather( self.grounding_svc.check(response, context), self.intent_svc.check(query, response), self.safety_svc.check(response), ) # Compute weighted confidence confidence = self.confidence_engine.score( grounding=grounding.score, retrieval=context.retrieval_score, intent=intent.score, safety=safety.score, ) # Decision if confidence.decision == Decision.PASS: return EvalResult(approved=True, response=response) elif confidence.decision == Decision.RETRY: return await self.regenerate(query, context) else: return EvalResult( approved=True, response="I'm not completely sure. " "Let me check that for you." )

Latency Optimization

Voice systems and real-time apps run all checks in parallel to keep total evaluation under 200ms.

CheckMethodLatencyAccuracy
GroundingEmbedding similarity~10msGood
GroundingCross-encoder~50msBetter
GroundingLLM-as-judge~500msBest
IntentEmbedding similarity~10msGood
IntentClassifier model~20msBetter
SafetyModeration API~50msGood
SafetyRule engine~1msExact
ConfidenceScore aggregation~1ms
Key Insight: Use the fastest tier (embedding similarity) as default. Escalate to cross-encoder or LLM-judge only when the fast check is ambiguous (score 0.5–0.75). This cascading approach keeps P95 under 100ms while catching edge cases.
Tools: FAISS, pgvector, Sentence Transformers, BERT cross-encoders, Rasa, SetFit, Guardrails AI, NeMo Guardrails, OpenAI Moderation, LangChain validators.

Additional Production Checks (Often Missed)

The four core checks (grounding, intent, safety, confidence) cover ~80% of failure modes. These additional checks close the remaining gaps that surface at scale.

5. Citation Verification

Validates that [Source N] references in the response actually match the claims they support. Catches "citation hallucination" where the model invents or misattributes sources.

class CitationVerifier: def verify(self, response, sources): citations = self.extract_citations(response) for cite in citations: # Does [Source N] exist? if cite.index >= len(sources): cite.valid = False continue # Does the claim match the source? sim = cosine_sim( cite.claim, sources[cite.index] ) cite.valid = sim > 0.6 return citations

Tools: Regex extraction + embedding verification. Run in parallel with grounding check (~5ms overhead).

6. Completeness Check

Did the answer address ALL parts of a multi-part question? Users often ask compound questions and the LLM may only answer part of it.

# Example problem: Query: "What's the return policy AND do you offer exchanges?" Answer: "Returns within 30 days." # Missing: exchange info! class CompletenessChecker: def check(self, query, answer): # Decompose query into sub-questions sub_qs = self.decomposer.split(query) addressed = [] for sq in sub_qs: sim = cosine_sim(sq, answer) addressed.append(sim > 0.5) return CompletenessResult( complete=all(addressed), missing=[sq for sq, a in zip(sub_qs, addressed) if not a] )

Tools: LLM query decomposer or spaCy clause splitting + embedding comparison.

7. PII Leakage Detection

The retrieved context may contain sensitive data (emails, SSNs, account numbers) that the LLM inadvertently surfaces in its response. Scan output before delivery.

from presidio_analyzer import AnalyzerEngine from presidio_anonymizer import AnonymizerEngine class PIIGuard: def __init__(self): self.analyzer = AnalyzerEngine() self.anonymizer = AnonymizerEngine() def scan(self, response): results = self.analyzer.analyze( text=response, entities=["EMAIL_ADDRESS", "PHONE_NUMBER", "CREDIT_CARD", "US_SSN"], language="en" ) if results: return self.anonymizer.anonymize( text=response, analyzer_results=results ) return response # clean

Tools: Microsoft Presidio (open-source), AWS Comprehend PII, Google DLP API. Run on every response (~10ms).

8. Freshness / Staleness Check

Verify that the retrieved context is still current. An answer about "current pricing" from a 6-month-old document could be dangerously wrong.

class FreshnessChecker: def check(self, chunks, query): # Does query need fresh data? needs_fresh = self.classify_temporal(query) # "current", "latest", "now", "today" if not needs_fresh: return Fresh() # skip check for chunk in chunks: age = now() - chunk.indexed_at if age > timedelta(days=30): return Stale( chunk=chunk, age_days=age.days, action="warn_user" )

Store indexed_at and source_updated_at in chunk metadata. Define TTL per document type (pricing: 7d, policy: 30d, FAQ: 90d).

9. Retry & Regeneration Strategy

When evaluation fails, how do you regenerate differently? Simply re-running the same prompt gets the same bad answer. Production systems modify the generation strategy on retry.

class RetryStrategy: def regenerate(self, fail_reason, attempt): if fail_reason == "hallucination": # Add explicit "ONLY use context" return self.stricter_prompt( temp=0.0 # zero creativity ) elif fail_reason == "incomplete": # Retrieve MORE chunks return self.expand_context( top_k=10 # was 5 ) elif fail_reason == "stale": # Force fresh retrieval return self.re_retrieve( freshness="7d" ) elif attempt >= 2: return self.fallback_response()

Key: Max 2 retries. Each retry changes strategy (stricter prompt, more context, different model). After 2 fails → graceful fallback.

10. Human Feedback Loop

User feedback (thumbs up/down, corrections, follow-up queries) is the ultimate ground truth. Feed it back into evaluation thresholds and training data.

class FeedbackCollector: def record(self, query_id, signal): # Signals: # thumbs_up, thumbs_down, # correction(text), follow_up, # escalate_to_human self.store.save(query_id, signal) # If thumbs_down → add to eval set if signal == "thumbs_down": self.eval_builder.add_negative( query_id ) # Weekly: retune thresholds from # feedback distribution # Monthly: retrain intent/NLI models # Quarterly: full eval set refresh

Track feedback rate (aim for >5% of responses). Negative feedback → auto-add to adversarial eval set. Positive feedback → confidence calibration.

11. Evaluation Layer Monitoring — What to Dashboard

The evaluation layer itself needs monitoring. If your grounding check drifts, it will silently let hallucinations through.

Tier Exit Distribution

T1: 80% / T2: 15% / T3: 5% is baseline. Alert if T2 rises above 25% (embedding model drift) or T3 above 8% (retrieval quality degradation).

False Positive / Negative Rate

Sample 100 responses/week. Human-label as grounded or not. Compare against evaluator verdicts. Target: <3% false-positive (passes hallucination) and <8% false-negative (rejects good answer).

Retry & Fallback Rate

If retry rate exceeds 10% or fallback exceeds 3%, something upstream is broken — likely retrieval quality, prompt template, or LLM model regression. Investigate immediately.

Evaluator Latency P95

Track per-tier latency. If T1 P95 exceeds 30ms, the embedding model may need optimization or the batch size is too large. T2 P95 above 150ms → model serving issue.

PII Detection Rate

Track how often PII is found in responses. If rate spikes, investigate the retrieval pipeline — it may be pulling in documents with unredacted personal data.

User Feedback Correlation

Correlate confidence scores with user feedback. If high-confidence responses get thumbs-down, your evaluator is miscalibrated. Retune weights quarterly.

Complete Response Evaluation Checklist (Production):
Grounding Check (cascading T1/T2/T3)   Intent Alignment   Safety / Content Moderation   Confidence Score Engine   Citation Verification   Completeness Check   PII Leakage Detection   Freshness / Staleness Check   Retry / Regeneration Strategy   Human Feedback Loop   Evaluator Monitoring & Alerting
11 / Response Evaluation Layer
Adaptive

Self-Correction & Reflection Loops

Modern advanced RAG adds self-checking loops that detect and recover when retrieval quality is poor—rather than blindly stuffing top-k passages into prompts

Query Retrieval Needed? Yes Retrieve Quality Good? Yes Generate No: Correct & Retry No: Direct Generate Self-Check Output

Core Self-Correction Techniques

Self-RAG

Model decides whether retrieval is needed per token. Generates reflection tokens (IsRel, IsSup, IsUse) to critique its own outputs. Targets factuality and citation accuracy—10–15% improvement in studies.

CRAG

Corrective RAG evaluates retrieved documents. Triggers corrective actions (alternative retrieval, filtering, web search fallback) when retrieval quality is poor.

Adaptive Retrieval

Retrieve fewer documents when confidence is high; more when needed. Avoids indiscriminate retrieval via confidence-gated document selection.

Adaptive Retrieval with Confidence Gating

class AdaptiveRetriever: def retrieve(self, query, min_confidence=0.7): # Query embedding + confidence score embedding, confidence = self.embed_and_score(query) # Adaptive k: higher confidence → fewer docs k = 3 if confidence > 0.8 else (5 if confidence > 0.6 else 10) docs = self.index.search(embedding, top_k=k) # Evaluate doc relevance and content quality scored_docs = [ {'doc': d, 'relevance': self.relevance_score(d, query)} for d in docs ] # Filter by min_confidence threshold filtered = [d for d in scored_docs if d['relevance'] > min_confidence] return filtered if filtered else docs # Fallback to top docs if none pass
Key Insight: Self-correction loops transform RAG from a one-shot pipeline into a closed-loop system. The model detects when retrieval is noisy or insufficient and can trigger alternative strategies (re-retrieve, summarize context differently, web search) without external intervention.
11 / Self-Correction & Reflection Loops
Quality

RAG Evaluation Framework

Three metric categories measure retrieval effectiveness, generation quality, and system performance

Answer Relevance Context Relevance Groundedness/ Faithfulness
Category Metrics Measurement Method Tools
Retrieval Metrics Precision, Recall, NDCG, MRR, MAP Rank quality against gold standard passages BEIR, Trec-eval
Generation Metrics BLEU, ROUGE, METEOR, BERTScore, Context Relevance Automated scoring, LLM judges, embedding similarity TruLens, RAGAS
System Metrics Latency, throughput, cost per query, user satisfaction Production logs, user feedback, A/B tests OpenTelemetry, Datadog, custom instrumentation

Building Your Eval Dataset

Synthetic

LLM-generated QA from corpus. Fast, cheap. Risk of false positives.

Human-Curated

Gold standard. Expensive, slow. High quality baseline.

Production Logs

Real queries & answers. Most realistic. Requires filtering.

Adversarial

Edge cases, tricky queries. Surfaced via user feedback.

RAGAS: Automated RAG Evaluation

from ragas import evaluate from ragas.metrics import ( context_relevance, answer_relevance, faithfulness, answer_correctness ) # Eval dataset: [{"question": q, "answer": a, "context": c}] result = evaluate( eval_dataset, metrics=[ context_relevance, # Is context relevant to Q? answer_relevance, # Does answer address Q? faithfulness, # Is answer grounded in context? ] ) # Aggregate scores print(f"Context Relevance: {result['context_relevance'].mean():.2f}") print(f"Faithfulness: {result['faithfulness'].mean():.2f}")
BEIR Research Insight: BM25 remains a strong baseline for retrieval. Reranking can outperform but is costly. On evaluation: 30% of eval set should be hard edge cases to catch real-world failure modes. Synthetic eval is 3–5× cheaper than human labeling but systematically biased.
12 / RAG Evaluation Framework
Benchmarking

RAG Benchmarking & Performance Testing

Systematic benchmarking with shared metrics ensures consistent quality and enables confident deployment decisions

Benchmarking Framework Flow

Data Sources Eval Datasets Golden QA Prod Logs Adversarial Set Hard Cases Benchmark Suite Retrieval Accuracy Generation Quality Latency & Cost Context Relevance Faithfulness Hallucination Citation Accuracy IR Metrics Recall@K, NDCG@K MRR, Hit Rate BEIR, MTEB Rank Latency P95 Generation Metrics Faithfulness Answer Relevance Hallucination Rate Citation Accuracy Quality Gates Regression Detection Baseline Comparison Reports Decision Deploy Block

Retrieval Benchmarks

Metric Target
Recall@5 > 85%
Recall@20 > 95%
NDCG@10 > 0.70
MRR > 0.75
Hit Rate > 95%
BEIR Zero-Shot Baseline
MTEB Rank Top 10%
Latency P95 < 100ms

Generation Benchmarks

Metric Target
Faithfulness > 0.90
Answer Relevance > 0.85
Context Relevance > 0.80
Hallucination Rate < 5%
Citation Accuracy > 90%
Refusal Rate > 80%
Completeness > 85%
TTFT P95 < 500ms

End-to-End System

Test full pipeline: retrieval → generation → post-processing. Measure user-facing latency, cost per query, success rate.

A/B Testing & Online

Shadow deploy changes. Compare metrics vs. baseline with statistical significance. Catch production surprises before full rollout.

Adversarial & Stress

Test with typos, out-of-domain queries, adversarial prompts. Load test at 10× peak. Measure robustness.

RAG Benchmark Suite Code

class RAGBenchmarkSuite:
    def __init__(self):
        self.thresholds = {
            "recall@5": 0.85,
            "faithfulness": 0.90,
            "latency_p95_ms": 100,
        }

    def run_benchmark(self, model, dataset):
        results = {}
        for q, ctx, golden_answer in dataset:
            retrieved = self.retrieve(q)
            generated = model.generate(q, ctx)

            results["recall"] = self.compute_recall(retrieved, ctx)
            results["faithfulness"] = self.compute_faithfulness(generated, ctx)

        return self.check_regressions(results, self.thresholds)

Benchmark Workflow Pipeline

1
Establish Baseline
Run suite on current production system
2
Change & Measure
Make code change, rerun benchmarks
3
Regression Detection
Compare vs. baseline, flag regressions
4
Gate Deployment
Pass gates → deploy; fail → iterate
5
Continuous Monitoring
Monitor metrics post-deploy, alert on drift

Benchmarking Tools Comparison

Tool Metrics LLM-Based Reference Best For
RAGAS Faithfulness, Answer Rel., Context Rel. Paper Gen. + Retrieval
BEIR Recall@K, NDCG@K, MRR, RMSE Yes Retrieval (IR)
MTEB Cross-lingual Retrieval, Ranking Yes Multilingual
TruLens LLM-based evals + feedback No Custom logic
DeepEval Hallucination, Answer Rel., RAGAS Optional LLM Evals
LangSmith Custom evals, tracing, logging Partial Optional Development + Monitoring
Arize Phoenix Evals + Production Observability Optional End-to-End
Custom Harness Org-specific metrics & logic Optional Org Control + Integration
Benchmarking Best Practice: Shared metrics across team prevent siloed evaluation. Deploy benchmarks as part of CI/CD to catch regressions early. Use production logs as continuous test set. Golden answers should be updated quarterly as product evolves.
13 / Benchmarking
Safety

Guardrails & Safety

Multi-stage guardrails prevent harmful input, retrieval, generation, and post-processing risks

Guardrail Pipeline: Four Stages

1. Input

  • • Prompt injection detection
  • • PII masking
  • • Content profanity filter

2. Retrieval

  • • ACL enforcement
  • • Source validation
  • • Freshness checks

3. Generation

  • • Hallucination checker
  • • Citation validator
  • • Token budget limits

4. Post-Processing

  • • PII scrubbing
  • • Toxicity filtering
  • • Output validation

GuardrailPipeline Implementation

from microsoft_presidio import AnalyzerEngine, AnonymizerEngine class GuardrailPipeline: def __init__(self): self.prompt_injector = PromptInjectionDetector() self.pii_analyzer = AnalyzerEngine() # Presidio self.topic_clf = TopicClassifier() self.rate_limiter = RateLimiter(max_qps=100) self.hallucination_checker = HallucinationChecker() self.toxicity_filter = ToxicityFilter() self.citation_validator = CitationValidator() async def process(self, user_input, context): # 1. Input guardrails if self.prompt_injector.is_injection(user_input): raise PromptInjectionError() pii_results = self.pii_analyzer.analyze(user_input) cleaned = anonymize(user_input, pii_results) # Redact/hash # 2. Rate limiting await self.rate_limiter.check(user_id=context['user_id']) # 3. Generation & post-processing answer = await self.llm.generate(cleaned, context) if not self.hallucination_checker.check(answer, context): answer = "I cannot answer based on available context." if not self.citation_validator.validate(answer): answer = "Citations missing or invalid." return answer
PII Detection with Microsoft Presidio: Identifies PII entities (email, phone, SSN, credit card). Actions: redact (remove), replace (substitute with category), hash (irreversible), encrypt (reversible with key). Use different strategies per context: remove sensitive PII from logs, hash for deduplication, redact for display.
13 / Guardrails & Safety
Security

Enterprise Threat Model & OWASP LLM Top 10

Map RAG attack surfaces to OWASP LLM Top 10 categories with mitigations

RAG System Prompt Injection Critical Data Exfiltration Critical Permission Leakage High Data Poisoning High DoS / Cost Attack Medium Supply Chain Medium

1. Prompt Injection

Risk: Malicious prompts override system instructions or exfiltrate data.

Mitigations: Content sanitization, instruction stripping, system prompt dominance, tool-call allowlists.

2. Data Exfiltration

Risk: Model leaks sensitive data (PII, secrets) in responses.

Mitigations: Output filtering, PII scrubbing, redaction at generation time.

3. Permission Leakage

Risk: Weak retrieval filters expose unauthorized content.

Mitigations: ACL-aware retrieval, auth-sensitive cache keys, audit trails.

4. Data Poisoning

Risk: Malicious docs inserted into corpus, spread misinformation.

Mitigations: Ingestion validation, source trust scoring, content integrity checks.

5. DoS (Expensive Prompts)

Risk: Very long contexts, recursive tool calls exhaust resources.

Mitigations: Token budgets, hard timeouts, rate limits per user.

6. Supply Chain

Risk: Compromised embedding models or dependencies.

Mitigations: Model provenance, dependency scanning, vendor security audit.

CRITICAL SECURITY INVARIANT:

A user must never receive retrieved context (or generated content derived from it) that they are not authorised to access.

Permission-Aware Retrieval Requirements:
  • Ingest-time ACL assignment: Tag every chunk with owner/org/role ACLs
  • Query-time filter enforcement: Filter retrieved docs by user's ACL before context assembly
  • ACL-sensitive cache keys: Include user_id/org_id in cache key to prevent cross-user leakage
  • Audit trails: Log all access (who queried, what docs were retrieved, timestamps)
14 / Enterprise Threat Model & OWASP LLM Top 10
Operations

Observability & Monitoring

Three monitoring layers: system SLOs, retrieval quality, and answer groundedness

Traces Metrics Logs OpenTelemetry Collector Dashboards & Alerts

Monitoring Layers

System SLOs

  • • Latency (p50, p95, p99)
  • • Throughput (QPS)
  • • Error rate
  • • Availability

Retrieval Quality

  • • NDCG, MRR (rank quality)
  • • Precision@k
  • • Docs retrieved per query
  • • Reranker acceptance rate

Answer Quality

  • • Faithfulness (grounded?)
  • • Answer relevance
  • • Citation accuracy
  • • User feedback signal

OpenTelemetry Tracing Decorator

from opentelemetry import trace from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter from opentelemetry.sdk.trace import TracerProvider tracer = TracerProvider().get_tracer("rag-pipeline") def trace_retrieval(func): def wrapper(*args, **kwargs): with tracer.start_as_current_span("retrieval") as span: span.set_attribute("query", kwargs.get('query')) result = func(*args, **kwargs) span.set_attribute("docs_retrieved", len(result)) span.set_attribute("latency_ms", elapsed) return result return wrapper @trace_retrieval async def retrieve(query): return await index.search(query)

OpenTelemetry Collector Config (with PII Scrubbing)

receivers: otlp: protocols: grpc: endpoint: 0.0.0.0:4317 processors: batch: timeout: 10s send_batch_size: 1024 attributes: actions: - key: user.email action: delete # Remove PII - key: http.request.header.authorization action: delete # Remove tokens exporters: otlp: endpoint: collector:4317 service: pipelines: traces: receivers: [otlp] processors: [batch, attributes] exporters: [otlp]
LangSmith

LLM tracing, debugging

Arize Phoenix

ML observability

OpenTelemetry

Core instrumentation

Datadog

Metrics, dashboards

TruLens

RAG eval metrics

Essential Dashboards:
  • Latency Breakdown: Query transform, embedding, search, rerank, LLM, total
  • Retrieval Quality: NDCG, docs per query, reranker effectiveness
  • LLM Metrics: Token usage, cost, temperature, model routing decisions
  • User Metrics: Active users, unique queries, satisfaction (thumbs up/down)
  • System Health: Disk usage (vector DB), index freshness, cache hit rates, error budget
15 / Observability & Monitoring
Infrastructure

Scaling & Performance

Scale RAG systems from thousands to billions of documents with architecture patterns

Scale Tier Document Count Query Load (QPS) Typical Latency (p95) Architecture Pattern
Small 10^5–10^6 chunks <10 QPS <2s Single in-memory FAISS index, Python app, SQLite metadata
Medium 10^7–10^8 chunks 10–300 QPS 1–4s Milvus/Weaviate cluster, Kubernetes, async queue processing, multi-region replication
Large 10^8–10^9+ chunks 300–5000+ QPS 500ms–1s Elasticsearch sharding, GPU-accelerated search (Triton), vLLM serving, distributed caching, traffic shaping
Query In L1: Exact Query Cache — Redis, ~0ms, 30-40% hit L2: Semantic Cache — Vector similarity, ~5ms, 50-60% hit L3: Embedding Cache — Content hash, avoid re-embed Cache Hit → Return Cache Hit → Return Cache Hit → Return Cache Miss L4: LLM Response Cache — Prompt hash (temp=0)

Multi-Layer Caching Strategy

L1: Exact Query

Hash(query) → response. TTL: 24h. Hit rate: 15–25% for repeated queries.

L2: Semantic

Embedding similarity clustering. Cache similar queries together. Hit rate: 30–40%.

L3: Embedding

Cache embeddings for large docs to avoid re-embedding on every query.

L4: LLM Response

Cache LLM outputs by (query, context hash). Reduces expensive inference calls.

SemanticCache Implementation

class SemanticCache: def __init__(self, embedding_model, similarity_threshold=0.95): self.embed = embedding_model self.threshold = similarity_threshold self.cache = {} # query_embedding → (answer, metadata) self.redis = Redis() # Persistent L2 cache async def get(self, query): query_emb = await self.embed.encode(query) # Find semantically similar cached queries for cached_emb, answer in self.cache.items(): sim = cosine_similarity(query_emb, cached_emb) if sim > self.threshold: return answer # Cache hit! return None def set(self, query, answer): query_emb = await self.embed.encode(query) self.cache[query_emb] = answer self.redis.setex( key=f"cache:{query_emb}", time=86400, # 24h TTL value=answer )

Scaling Architecture Patterns

Retrieval Optimization

  • Horizontal scaling: Shard index by doc_id ranges
  • GPU acceleration: Triton Inference Server for embedding
  • Connection pooling: Reuse DB connections (PgBouncer)
  • Async processing: Batch embedding requests

Generation Optimization

  • vLLM: PagedAttention for high-throughput serving
  • Model parallelism: Shard LLM across GPUs
  • Quantization: INT8/FP8 for latency reduction
  • Speculative decoding: Predict + verify next tokens in parallel

KServe Kubernetes-Native Serving (vLLM)

apiVersion: serving.kserve.io/v1beta1 kind: InferenceService metadata: name: llm-service spec: predictor: minReplicas: 2 maxReplicas: 10 containerSpec: image: vllm/vllm-openai:latest env: - name: MODEL_NAME value: meta-llama/Llama-2-13b-hf ports: - containerPort: 8000 resources: limits: nvidia.com/gpu: "1" scaleTarget: 70 # Auto-scale at 70% GPU util
Latency Budget Breakdown (P50):

  • Query Transform: 50ms (normalization, spell-check)
  • Embedding: 20ms (vectorize query)
  • Search: 30ms (FAISS/Milvus lookup)
  • Rerank: 50ms (cross-encoder)
  • LLM Generation: 800ms (token generation)
  • Total Expected: ~950ms P50

SLO Modes: Low-latency interactive (<2–4s p95) vs High-throughput batch (10–60s acceptable)
16 / Scaling & Performance
Advanced

Advanced Techniques

Agentic RAG

LLM agent decides what, when, and how to retrieve. Decomposes complex queries, chooses data sources (vector DB, SQL, API, web), iteratively refines results.

from langchain_agents import ReActAgent

agent = ReActAgent(
    llm=claude,
    tools=[vector_search, sql_query, web_search],
    max_iterations=5
)
result = agent.run("Find latest Q1 earnings and analyst sentiment")

Graph RAG

Knowledge graphs + vector search. Entity-relationship graphs, multi-hop reasoning, community detection for summarization.

MATCH (a:Company)-[r*1..3]->(b:Company)
WHERE a.name = "Acme Inc"
WITH collect(b) as connected
CALL apoc.text.summarize(connected)
  YIELD summary
RETURN summary

RAPTOR (Tree-based)

Recursively summarize clusters into a hierarchy. Query at multiple abstraction levels for top-down reasoning.

  • Hierarchical abstraction layers
  • Efficient multi-scale retrieval
  • Reduced token cost vs flat indexing

Self-RAG (Adaptive)

LLM decides if retrieval needed, generates with self-critique tokens, iterates. Reduces unnecessary retrieval ~40%.

[Retrieval?] [Utility] [Relevance] [Correctness]
Outputs critique tokens, decides iterations

Multi-Modal RAG

Index images, tables, charts alongside text. Vision models for visual content, multi-modal embeddings.

  • Cohere embed-v4 multi-modal
  • CLIP for image-text alignment
  • Unified vector space across modalities

Multi-Tenant RAG

Namespace isolation per tenant. Shared infrastructure, isolated data. Query-time ACL enforcement.

  • Cost-efficient multi-tenancy
  • Metadata-driven access control
  • Compliance for regulated industries
17 / Advanced Techniques
DevOps

Deployment & CI/CD

Lint & Test Integration Tests RAG Eval Suite GATE faith>0.85, rel>0.80 Canary 5% Progressive Rollout 5%→25%→50%→100% Rollback

Docker Compose Architecture

services:
  rag-api:
    image: rag-api:latest
    deploy:
      replicas: 3
    depends_on: [qdrant, redis]

  embedding-service:
    image: embedding-service:latest
    deploy:
      resources:
        reservations:
          devices:
            - driver: nvidia
              count: 1
              capabilities: [gpu]

  indexing-worker:
    image: indexing-worker:latest
    environment:
      - CELERY_BROKER=redis:6379

  qdrant:
    image: qdrant/qdrant:latest
    ports:
      - "6333:6333"

  redis:
    image: redis:7-alpine

CI/CD Pipeline Stages

1
Lint & Unit Tests

Code quality, type checks, fast tests

2
Integration Tests

End-to-end retrieval, indexing flows

3
RAG Evaluation Suite

Gate metrics: Faithfulness >0.85, Relevance >0.80

4
Canary 5%

Monitor cost, latency, error rate

5
Progressive Rollout

5% → 25% → 50% → 100% traffic

Migration Path (Prototype → Production):
  • Canonical doc schema + versioning
  • ACL-aware retrieval architecture
  • Evaluation harness in CI/CD
  • Comprehensive observability & tracing
18 / Deployment & CI/CD
Economics

Cost Optimization

Cost Drivers (Ranked)

#1 LLM Inference Tokens 45%
#2 Reranker Compute 20%
#3 Embedding Compute 15%
#4 Vector DB Costs 12%
#5 Observability 8%

Cost Breakdown per 1K Queries

LLM Generation
$4.50
Embedding Compute
$0.30
Reranker Compute
$0.20
Vector DB Storage
$1.00
Optimization Levers:
  • Embedding Costs: Batch processing, caching, model selection, self-hosting (>1M queries/day)
  • LLM Token Costs: Context pruning, model tiering, semantic caching (50–60% reduction)
  • Infrastructure: Adaptive retrieval, hybrid tuning, dimension control, vLLM serving efficiency
19 / Cost Optimization
Risk

Failure Modes & Mitigations

Irrelevant/Misleading Retrieval

Hybrid retrieval + reranking + CRAG-like evaluation gates

Over-retrieval / Context Stuffing

Cap top-k, MMR diversity, adaptive retrieval (Self-RAG)

Permission Leakage

ACL filters, ACL in cache keys, comprehensive audit trails

Prompt Injection via Retrieved Docs

Treat as untrusted, content sanitization, system prompt dominance

Embedding Drift (Model Upgrades)

Version embeddings/indexes, shadow-build, offline eval + canary

Silent Ingestion Regressions

Ingestion QA metrics, OCR confidence alerts, hit-rate monitoring

Cost Blow-ups

Token budgets, hard timeouts, reranker gating, rate limits

OWASP LLM Risk Taxonomy:

Use as a practical checklist for security & compliance governance across RAG components.

20 / Failure Modes & Mitigations
Planning

Phased Implementation Roadmap

5-Phase RAG Implementation Timeline Phase 1 Foundations (weeks 1-7) Phase 2 Pilot (weeks 8-12) Phase 3 Hardening (weeks 13-18) Phase 4 Scale (weeks 19-25) Phase 5 Governance (weeks 26-32) Data Contracts Ingestion MVP Baseline Retrieval Hybrid Fusion Citations UI Eval Harness ACL-Aware Retrieval PII Redaction Observability CDC Streaming vLLM/KServe Cost Controls Model Versioning Red-Team Compliance Deliverables by Phase Phase 1: Standardized schemas, working index pipeline Phase 2: Multi-stage retrieval, user-facing citations Phase 3: Security & compliance controls active Phase 4: High-throughput, cost-efficient ops Phase 5: Production-ready for regulated industries
21 / Implementation Roadmap
Summary

Production Readiness Checklist

Data & Indexing

  • Multi-format parsing (PDF, docx, HTML, images)
  • Incremental indexing with changelog tracking
  • Rich metadata (source, timestamp, ownership)
  • Semantic chunking strategies
  • Dead letter queue for malformed data
  • Data freshness tracking & SLAs
  • Canonical document contract

Retrieval & Generation

  • Hybrid search (dense + sparse + knowledge graph)
  • Multi-stage reranking pipeline
  • Query transformation & expansion
  • Streaming generation with token streaming
  • Citation validation & provenance
  • Confidence scoring on outputs
  • Self-correction loops (Self-RAG)

Safety & Compliance

  • Prompt injection detection & mitigation
  • PII detection (Presidio integration)
  • Hallucination detection frameworks
  • RBAC / ACL enforcement
  • Comprehensive audit logging
  • GDPR / HIPAA / EU AI Act compliance
  • NIST AI RMF & data retention policies

Operations & Scale

  • Multi-layer caching (query, response, embedding)
  • Distributed tracing (OpenTelemetry)
  • Automated evaluation in CI/CD gates
  • Canary & progressive rollout strategy
  • Cost tracking & budget enforcement
  • User feedback loop & telemetry
  • vLLM / KServe deployment optimization

"Production RAG is 20% retrieval and 80% engineering"

Data quality, evaluation frameworks, guardrails, caching strategies, comprehensive observability, and production operations are what separate a working demo from a reliable, compliant, cost-efficient product.

22 / Production Readiness Checklist