Documentation

Data Pipelines for RAG & AI Agents

A practical guide to designing, building, and operating the data pipelines that feed retrieval-augmented generation systems and tool-using agents — from raw documents and APIs all the way to production retrieval, memory, and evaluation.

01 Overview

A RAG or agent system is only as good as the data it sees. The pipeline is the part of the system that decides what data exists, in what shape, when it updates, and how the model finds it.

Most production failures of RAG systems and AI agents trace back to the pipeline, not the model. "Bad answers" are usually really one of:

Pipelines are products.
Treat your data pipeline like any other production service: versioned, tested, observable, with SLAs on freshness and quality. Most teams under-invest here and over-invest in prompts.

02 Why pipelines matter

Three axes drive the design: scale (millions of documents), freshness (seconds to days), and quality (clean, deduplicated, attributed).

Scale

Modern corpora are 10K–100M+ documents. You cannot embed and re-embed everything on every change — incremental, idempotent pipelines are mandatory.

Freshness

Stale data poisons retrieval. Customer-support agents need new tickets in minutes; legal-research agents tolerate hours; archival corpora tolerate days.

Quality

Deduplication, PII redaction, content typing, language detection, structural parsing — every quality control you skip shows up as a hallucination later.

03 RAG pipeline architecture

The canonical RAG data pipeline is a 7-stage flow from raw source to answerable retrieval. Each stage has its own failure modes and tools.

01
Ingest
Pull or receive raw data from sources (S3, APIs, DB, webhooks)
02
Parse & clean
Extract text/structure from PDFs, HTML, DOCX, code, transcripts
03
Chunk
Split into retrieval-sized units that preserve semantic boundaries
04
Enrich
Attach metadata: source, author, date, tags, ACLs, summaries
05
Embed
Generate dense (and sparse) vector representations
06
Index
Upsert into vector store + keyword index + metadata store
07
Retrieve & rerank
Hybrid search → rerank → context assembly for the LLM

Sections 04–10 below walk through each stage in detail.

04 Stage 1 — Ingestion

Ingestion is where data enters the pipeline. The dominant question: push or pull, batch or stream.

Common source types

SourcePatternNotes
Object storage (S3, GCS, Azure Blob)Pull / event-drivenS3 events → SQS/Lambda is the most common ingest trigger.
SaaS APIs (Notion, Confluence, Zendesk, Salesforce)Pull (poll) + webhookUse webhooks for freshness, periodic pulls as a backstop.
Databases (Postgres, MySQL, Mongo)CDC (Debezium, Fivetran)Change Data Capture beats periodic snapshot for large tables.
Message queues (Kafka, Kinesis, Pub/Sub)Stream consumerNatural fit for high-volume event streams (logs, transactions).
Web (sites, sitemaps, feeds)Crawler (Scrapy, Firecrawl)Respect robots.txt; budget concurrency; cache aggressively.
Email / chat (Slack, Teams, IMAP)Push via webhook / appWatch for PII; usually requires per-thread ACL propagation.
Code repos (GitHub, GitLab)Git clone + webhookSparse-checkout, file-type filters, language detection.

Design rules

05 Stage 2 — Parsing & cleaning

Raw bytes become structured text. The quality of this stage caps the quality of every downstream stage — a garbage parser produces garbage chunks.

By format

FormatRecommended toolsWhat to extract
PDFUnstructured.io, PyMuPDF, pdfplumber, Marker, Reducto, AWS TextractText + tables + figures + reading order. Scanned PDFs need OCR (Tesseract, Textract).
HTMLTrafilatura, Readability, BeautifulSoup, jusTextMain content, headings, links, code blocks; strip nav/footer/ads.
DOCX / PPTX / XLSXpython-docx, python-pptx, openpyxl, UnstructuredHeadings, lists, comments, speaker notes, sheet structure.
Markdownmarkdown-it, mistune, frontmatterHeading tree → use as natural chunk boundaries; preserve code blocks intact.
CodeTree-sitter, AST parsers per languageFunctions, classes, docstrings — chunk by symbol, not by line count.
Audio / videoWhisper, AssemblyAI, DeepgramTranscript with timestamps; speaker diarization for meetings.
ImagesOCR (Tesseract, GPT-4V, Claude vision), captioning modelsOCR text + caption + alt-text + extracted entities.
Emailmailparser, BeautifulSoup, custom thread reconstructionStrip quoted replies, signatures; preserve thread structure.

Cleaning steps

!
Tables and figures are the silent killers.
A naive PDF parser flattens a table into a stream of numbers. Use a layout-aware parser (Unstructured "hi_res", Marker, Reducto) and consider preserving tables as Markdown or JSON in the chunk text.

06 Stage 3 — Chunking

Chunking splits documents into retrieval units. Too large and the LLM gets diluted context; too small and you lose meaning. Boundaries should respect semantic structure.

Strategies

StrategyHowWhen to use
Fixed-sizeN tokens with M overlap (e.g. 512 / 64)Baseline; works on uniform prose.
RecursiveSplit on paragraph → sentence → word until under target sizeDefault for mixed prose. LangChain's RecursiveCharacterTextSplitter is the workhorse.
StructuralSplit on headings, sections, list itemsMarkdown, HTML, structured docs. Preserves hierarchy.
SemanticSplit where embedding similarity drops (greedy clustering)When prose has implicit topic shifts (interviews, transcripts).
Sentence-windowEmbed single sentence, retrieve neighborsPrecise retrieval with broader context. Good for FAQ / Q&A.
Hierarchical / parent-childEmbed small, retrieve large parentClassic "small-to-big" retrieval. Best of both granularities.
Code-awareSplit by function / class via ASTCode corpora — never split mid-function.
Late chunkingEmbed long context first, slice the embeddingNewer technique — preserves cross-chunk context.

Sizing rules of thumb

07 Stage 4 — Enrichment & metadata

Each chunk carries metadata used for filtering, ranking, attribution, and access control. Underinvested metadata is one of the most common pipeline failures.

Required metadata

Provenance

  • source_id — stable per-document ID
  • source_url / path
  • version / revision
  • ingested_at, updated_at
  • parser_version, chunker_version

Access & lifecycle

  • tenant_id, org_id, acl
  • visibility (public / internal / restricted)
  • retention_until / delete_at
  • language, region

Content

  • doc_type (faq, runbook, code, transcript)
  • section_path (breadcrumb)
  • tags / topics
  • title, summary
  • token_count

Optional / derived

  • Auto-generated questions the chunk answers
  • Named entities (people, products, dates)
  • Sentiment / quality score
  • Difficulty / audience level
ACLs at chunk time, not query time.
Stamp every chunk with the document's access control at ingest. Filter on it at retrieval. Don't trust the LLM to respect ACLs after the fact.

08 Stage 5 — Embedding

Convert each chunk to a vector. The embedding model is the largest single quality lever in retrieval — pick deliberately and version explicitly.

Model selection

Provider / modelTypeNotes
OpenAI text-embedding-3-largeAPI, denseStrong general-purpose; 3072d (truncatable).
Voyage voyage-3, voyage-code-3API, denseOften top-of-leaderboard; specialized variants for code, finance, legal.
Cohere embed-v3API, dense + rerankerStrong multilingual; pairs naturally with Cohere reranker.
Google text-embedding-004API, denseStrong on Google Cloud workloads.
BGE (BAAI), E5, Nomic EmbedOpen weights, denseSelf-host on GPU/CPU; cheap, good quality, no data leaves your VPC.
SPLADE, BM25SparseUse alongside dense for hybrid search — handles rare terms / IDs / acronyms.
ColBERT / ColPaliMulti-vector / late interactionHigher quality at higher index cost. ColPali for visual documents.

Operational concerns

# Batched embedding with versioning
from openai import OpenAI
client = OpenAI()

def embed_batch(chunks):
    resp = client.embeddings.create(
        model="text-embedding-3-large",
        input=[c.text for c in chunks],
    )
    for chunk, item in zip(chunks, resp.data):
        chunk.vector = item.embedding
        chunk.meta["embedder"] = "openai/text-embedding-3-large"
        chunk.meta["embedder_dim"] = 3072
    return chunks

09 Stage 6 — Indexing

Vectors land in a vector store; sparse representations land in a keyword index; metadata lands in a filterable store. In practice you usually want all three, often in the same product.

Vector stores compared

StoreTypeBest for
pgvector (Postgres)ExtensionAlready-Postgres shops; small to medium scale; transactional + vector in one place.
QdrantSelf-host / cloudStrong filtering, hybrid, rich payloads. Open source.
WeaviateSelf-host / cloudHybrid + modules (rerankers, generative); class-based schema.
Milvus / ZillizSelf-host / cloudScale-out to billions of vectors; multiple ANN backends.
PineconeManaged onlyOperationally simplest; serverless tier.
VespaSelf-host / cloudBest-in-class for hybrid + ranking + low-latency at scale.
Elasticsearch / OpenSearchSelf-host / cloudAlready-Elastic shops; native hybrid; good filtering.
Turbopuffer / LanceDBObject-storage nativeCheap large-scale; serverless with cold-start tradeoffs.

Index design

10 Stage 7 — Retrieval & rerank

At query time: rewrite the query, retrieve candidates with hybrid search, rerank, then assemble the LLM context.

The retrieval flow

  1. Query understanding. Rewrite, expand, decompose. Multi-query generates 3–5 paraphrases. HyDE generates a hypothetical answer and embeds that.
  2. Filter. Apply ACLs, tenant, freshness window — never skip this.
  3. Hybrid retrieve. Top-K dense + top-K sparse, then merge (Reciprocal Rank Fusion is the standard).
  4. Rerank. Cross-encoder rerank the top 30–100 candidates down to top 5–10. Cohere Rerank, Voyage Rerank, BGE Reranker are the default options.
  5. Assemble context. Add breadcrumbs and citations; deduplicate near-identical chunks; budget tokens.

Retrieval techniques worth knowing

HyDE

Have the LLM hallucinate a hypothetical answer; embed that instead of the user query. Closes the gap between question-language and answer-language.

Multi-query

Generate N paraphrases of the query, retrieve for each, merge. Trades cost for recall.

Step-back prompting

Generate a more-general query first, retrieve background context, then answer the specific query.

Small-to-big retrieval

Embed small chunks for precision, return their parent doc/section for context.

Self-RAG / CRAG

The model evaluates retrieved docs and decides whether to use, refine, or skip them — guards against bad retrievals.

Graph RAG

Build a knowledge graph from the corpus, retrieve subgraphs. Strong for queries that hop across entities.

11 Agent data pipelines

Agents need more than a vector store. They consume four distinct data substrates — knowledge, tool data, memory, and traces — and each needs its own pipeline.

1. Knowledge (RAG)

Same as above — corpus → chunks → embeddings → retrieval. The agent calls retrieval as a tool.

2. Tool data

Live data from APIs, databases, services. Pipelines here are about schema discovery and response shaping — making tool outputs LLM-readable.

3. Memory

Per-user / per-session state: facts, preferences, past actions. Pipeline writes from conversations, reads back into context.

4. Traces & evals

Every prompt, tool call, retrieval, and outcome flows into an observability pipeline that feeds dashboards, evals, and fine-tuning datasets.

RAG-only system
  • One pipeline: docs → embeddings → retrieve
  • Stateless per query
  • Quality measured by retrieval@k + answer faithfulness
Agentic system
  • Four pipelines: knowledge + tools + memory + traces
  • Stateful across turns and sessions
  • Quality measured by task success, action correctness, cost

12 Agent context engineering

"Context engineering" is the practice of deciding what enters the model's context window on each step. It is the agent equivalent of feature engineering.

Context inputs (per agent step)

SourceLifetimeSelection strategy
System promptStaticVersioned; A/B tested.
Tool catalogPer-sessionFilter to tools relevant to the task; over-stuffing the catalog wastes tokens and hurts selection accuracy.
RAG retrievalPer-stepHybrid + rerank; filter by tenancy and freshness.
Long-term memoryPer-userRetrieve top-N relevant memories by embedding similarity + recency.
Working memory / scratchpadPer-taskCarry intermediate results, plans, sub-task state.
Tool outputsPer-callTruncate / summarize large outputs before re-feeding to the model.
Conversation historyPer-sessionSliding window + summary of older turns.
!
Context rot is real.
As context grows, models get worse at finding relevant pieces and start ignoring instructions ("lost in the middle"). Aggressively prune tool outputs, summarize old turns, and keep retrieved chunks focused.

13 Agent memory pipelines

Memory is data that persists across sessions. Treat it as a write-heavy pipeline with its own ingestion, scoring, and eviction.

Memory types

TypeWhatStorage
EpisodicEvents: "user did X on date Y"Append-only event log + index
SemanticFacts: "user prefers metric units"Key-value or document store + vector index
ProceduralHow-tos: learned tool sequences, playbooksVersioned prompt / skill registry
WorkingCurrent task scratchpadIn-memory or short-TTL store

The memory write pipeline

  1. Extract. After each turn, an LLM call extracts candidate memories from the conversation.
  2. Score. Confidence + utility + novelty. Drop low-value candidates.
  3. Deduplicate. Embed and check against existing memories — update if similar exists, insert otherwise.
  4. Tag. Attach user_id, session_id, source, expiry.
  5. Persist. Write to memory store + vector index.

The memory read pipeline

  1. Embed the current context.
  2. Hybrid retrieve from memory store, filter by user_id and validity.
  3. Rerank by relevance + recency + confidence.
  4. Inject top-N into the system or user prompt with clear provenance ("Recalled from your prior session: …").
Make memory updates auditable.
Every memory write should record what was added/changed/removed and which conversation triggered it — both for debugging and for compliance (right-to-be-forgotten requests).

14 Batch vs streaming

The biggest architectural decision after "what's in the corpus" is "how fresh does it need to be." Batch is cheaper and simpler; streaming is fresher and operationally heavier.

Batch (Airflow / Dagster / cron)
  • Run hourly / daily / weekly
  • Simple to reason about, easy to backfill
  • Cheap — no always-on infra
  • Freshness lag = batch interval
  • Good for: docs, knowledge bases, slowly-changing corpora
Streaming (Kafka + workers / Flink)
  • Process events as they arrive
  • Sub-minute freshness
  • Always-on infra, harder to test
  • Backfills require replay infrastructure
  • Good for: tickets, logs, transactional data, live chat

Hybrid: micro-batch

The pragmatic middle ground: a streaming consumer batches events into 10–60 second windows, then runs the batch pipeline on each window. You get near-real-time freshness with batch's testability.

15 Freshness & incremental updates

Production pipelines must be incremental — full reindexes are an anti-pattern past a few thousand docs. The hard problem is doing inserts, updates, and deletes safely.

Update patterns

OperationTriggerPattern
InsertNew document arrivesRun full pipeline on the new doc; upsert chunks by deterministic ID.
UpdateSource doc changesRe-parse, re-chunk, diff against existing chunks; upsert changed, delete removed.
DeleteSource doc removed or expiresHard-delete or tombstone all chunks with that source_id.
Re-embedEmbedder version changeBackground backfill; route queries to old version until new version reaches parity.
Re-chunkChunker version changeReprocess from raw bytes (kept in object storage) — chunker should be deterministic given config.

Soft delete vs hard delete

Tombstone (soft delete) is usually safer in production — set deleted_at, exclude in retrieval, GC asynchronously. Hard delete is required for GDPR / right-to-be-forgotten requests; build the GC path to actually remove vectors and audit-log the action.

16 Data quality & lineage

Quality controls and lineage tracking turn a research-grade pipeline into a production-grade one.

Quality controls to add

Lineage

For every chunk in the index, you should be able to answer:

Lineage metadata enables incident response, compliance, and intelligent reprocessing. Tools: OpenLineage, DataHub, Marquez.

17 Observability & eval

A pipeline you can't measure is a pipeline you can't improve. Instrument every stage and every retrieval.

Pipeline metrics

Ingestion

  • Documents processed / sec
  • Parse failure rate per format
  • Chunks emitted per doc (distribution)
  • End-to-end latency (source → indexed)

Retrieval

  • Latency p50/p95/p99
  • Recall@k against a labeled set
  • nDCG@k after rerank
  • % queries with zero results
  • Cache hit rate

Generation

  • Faithfulness / groundedness score
  • Citation accuracy
  • Refusal / hallucination rate
  • Token cost per answer

Agent

  • Task success rate
  • Tool-call accuracy
  • Steps to completion (distribution)
  • Cost per task
  • Memory hit rate / utility

Eval frameworks

Ragas TruLens DeepEval Phoenix (Arize) Langfuse Braintrust Patronus OpenTelemetry GenAI semconv

What to evaluate

18 Orchestration tools

You can hand-roll pipelines, but past a certain scale you'll want orchestration. The choice shapes how easy retries, backfills, and observability are.

ToolStyleBest for
AirflowDAG, batchScheduled batch ETL; mature ecosystem; verbose for ML.
DagsterSoftware-defined assetsModern alternative to Airflow; first-class data lineage.
PrefectPythonic flowsLightweight; great for prototypes that grow into production.
TemporalDurable workflowsLong-running, retry-heavy pipelines (e.g. crawl → parse → embed at scale).
Kafka + Flink / Spark StreamingStreamingHigh-throughput event-driven ingestion.
Ray DataDistributed PythonGPU-heavy stages (embedding, parsing with vision models).
Modal / Replicate / RunPodServerless GPUBurst embedding / batch inference without managing GPUs.
LlamaIndex / LangChainFramework, in-processGlue layer for parsing/chunking/embedding/retrieval; not a substitute for an orchestrator at scale.
Unstructured.ioManaged pipelineSpecialized in the parsing + chunking stages with broad format support.

19 Reference tech stack

A representative production stack as of 2026. Substitute components freely — every layer has multiple credible options.

LayerChoiceWhy
Source of truthS3 + PostgresRaw bytes addressable forever; metadata transactional.
IngestionS3 events → SQS → workersDecoupled, retry-safe, scales linearly.
OrchestrationDagster (batch) + Temporal (long workflows)Asset lineage + durable execution.
ParsingUnstructured + Marker for PDFsLayout-aware; handles tables and figures.
ChunkingRecursive + structural by doc typeOne size doesn't fit all formats.
EmbeddingVoyage-3 (general) + Voyage-Code (code)Specialized embeddings beat one-size-fits-all.
Vector storeQdrant or pgvector (small) / Vespa (large)Hybrid + filtering + scale.
RerankerCohere Rerank or BGE Reranker+10–20 points on nDCG, worth the latency.
Memory storePostgres + pgvector + RedisTransactional facts, vector search, fast working memory.
ObservabilityLangfuse / Phoenix + OpenTelemetryTrace every prompt / tool / retrieval.
EvalRagas + BraintrustOffline metrics + production CI.
Agent frameworkLangGraph / Anthropic Agent SDK / OpenAI AgentsPick one and own the abstractions.

20 End-to-end example

A minimal but production-shaped pipeline for a customer-support RAG system. Adapt the details — the structure generalizes.

# pipeline.py — simplified end-to-end
from dataclasses import dataclass
import hashlib

@dataclass
class Chunk:
    id: str
    text: str
    source_id: str
    section_path: str
    meta: dict
    vector: list | None = None

def stable_id(source_id: str, section: str, idx: int) -> str:
    return hashlib.sha256(f"{source_id}:{section}:{idx}".encode()).hexdigest()[:16]

def run(source_uri: str):
    # 1. Ingest — fetch raw bytes, persist immutable copy
    raw = ingest(source_uri)                          # bytes
    raw_uri = persist_raw(raw, source_uri)            # s3://raw/...

    # 2. Parse — extract structured text
    doc = parse(raw, source_uri)                      # Document(sections=[...])

    # 3. Chunk — structural + recursive
    chunks = []
    for section in doc.sections:
        for i, piece in enumerate(recursive_split(section.text, target=600, overlap=80)):
            chunks.append(Chunk(
                id=stable_id(doc.source_id, section.path, i),
                text=f"{doc.title} > {section.path}\n\n{piece}",
                source_id=doc.source_id,
                section_path=section.path,
                meta={
                    "source_uri": source_uri,
                    "updated_at": doc.updated_at,
                    "acl": doc.acl,
                    "doc_type": doc.doc_type,
                    "parser_version": "v3",
                    "chunker_version": "v2",
                },
            ))

    # 4. Embed — batch
    for batch in batched(chunks, 100):
        embed_batch(batch)                            # sets .vector + meta["embedder"]

    # 5. Index — upsert keyed by stable id (idempotent)
    vector_store.upsert(chunks)
    keyword_index.upsert(chunks)

    # 6. Tombstone removed chunks
    existing_ids = vector_store.ids_for(source_id=doc.source_id)
    new_ids = {c.id for c in chunks}
    for dead in existing_ids - new_ids:
        vector_store.tombstone(dead)

    # 7. Emit lineage event
    emit_lineage(doc.source_id, len(chunks), embedder="voyage-3")

Note the design: deterministic IDs make every step idempotent; raw bytes are preserved so you can re-derive without re-fetching; tombstones handle deletes; lineage events feed observability.

21 Anti-patterns

Mistakes that look reasonable but cause grief in production. Recognize and avoid.

Anti-patternWhy it hurtsDo this instead
Random UUIDs as chunk IDsRe-running ingest creates duplicates instead of upserts.Deterministic IDs from (source_id, section, position).
Embedding without versioningCannot tell which embedder produced a vector; cannot mix or migrate cleanly.Tag every vector with embedder_id + version.
Throwing away raw bytesCannot reprocess after a parser bug fix without re-fetching everything.Always persist raw to object storage; everything downstream is derivable.
Token-only chunkingSplits mid-sentence, mid-table, mid-function — every chunk is degraded.Recursive + structural; respect natural boundaries.
Skipping the breadcrumbRetrieved chunk has no idea what document or section it came from.Prepend "Doc > Section\n\n…" to every chunk's stored text.
Dense-only retrievalMisses queries containing rare terms, IDs, codes, or acronyms.Hybrid (dense + sparse) with RRF.
Filtering after retrievalYou retrieve chunks the user can't see, then drop them — recall collapses.Push filters (ACL, tenancy, freshness) into the vector store query.
Full reindex on every changeDoesn't scale past a few thousand docs; downtime; cost.Idempotent incremental upserts.
No eval setCannot tell whether a change improved or regressed retrieval.Maintain a labeled query set; CI it.
Dumping everything into one corpusMulti-tenant data leaks; retrieval relevance drops.Separate indexes or strong per-tenant filters.
Memory as a black boxCannot debug why the agent "remembered" something wrong.Audit-log every memory write; surface provenance in the UI.

22 Production checklist

Before calling a pipeline production-ready, you should be able to check every item below.

Correctness

Quality

Operability

Safety

If you ship one thing first, ship the eval set.
A pipeline without an eval is a pipeline you cannot improve. Even 50 labeled queries beat zero — you can iterate after that.