## The Staleness Problem
Day one, your RAG system is brilliant. Everything is freshly indexed, perfectly up to date, and users love it.
Day thirty, someone asks about the new pricing tier that launched last week. The RAG system confidently answers with the old pricing. Because nobody updated the index.
Day sixty, users stop trusting it. They go back to searching Confluence manually. Your beautiful RAG system becomes shelf-ware.
Freshness isn't a feature. It's survival. A stale RAG system doesn't slowly degrade. It hits a trust cliff, and usage falls off overnight. This connects directly to [re-chunking strategies](/blog/chunking-strategies-at-scale).
## Why "Re-index Everything Nightly" Doesn't Work
The naive approach: dump all documents, re-chunk, re-embed, re-index. Every night. Clean slate.
This works for small knowledge bases (under 10,000 documents). It falls apart at enterprise scale for three reasons.
**Cost.** Embedding API calls aren't free. If you have 500,000 chunks and your embedding model costs $0.0001 per chunk, a full re-index costs $50. Every night. $18,250/year just for embeddings. And that's before compute, storage, and the engineering time to run the pipeline.
**Latency.** A full re-index of a large corpus takes hours. During that window, your index is either stale (using the old one) or incomplete (building the new one). Neither is great.
**Unnecessary.** Most documents don't change between runs. Re-embedding unchanged content is pure waste. The related post on [access control on updated documents](/blog/rag-access-control-permissions) goes further on this point.
## Incremental Indexing: Only Process What Changed
The principle is simple: detect what changed, process only the changes, update the index in place.
```python
class IncrementalIndexer:
def __init__(self, source, chunker, embedder, index, state_store):
self.source = source # Document source (S3, Confluence, etc.)
self.chunker = chunker
self.embedder = embedder
self.index = index
self.state = state_store # Tracks what we've indexed
async def sync(self):
# Get changes since last sync
changes = await self.detect_changes()
for change in changes:
match change.type:
case "created":
await self.index_document(change.document)
case "modified":
await self.reindex_document(change.document)
case "deleted":
await self.remove_document(change.document_id)
await self.state.update_checkpoint(now())
async def detect_changes(self):
"""Compare source state with indexed state."""
last_sync = await self.state.get_checkpoint()
source_docs = await self.source.list_documents(modified_since=last_sync)
indexed_docs = await self.state.get_indexed_documents()
changes = []
for doc in source_docs:
if doc.id not in indexed_docs:
changes.append(Change("created", doc))
elif doc.content_hash != indexed_docs[doc.id].content_hash:
changes.append(Change("modified", doc))
# Detect deletions
source_ids = {doc.id for doc in source_docs}
for doc_id in indexed_docs:
if doc_id not in source_ids:
changes.append(Change("deleted", document_id=doc_id))
return changes
```
## Change Detection Strategies
Different document sources require different change detection approaches.
### Strategy 1: Timestamps
The simplest. Ask the source system for documents modified after a timestamp.
```python
# Works for APIs that support modified_since
async def detect_by_timestamp(source, last_sync):
return await source.list_documents(modified_since=last_sync)
```
**Works well for:** Cloud storage (S3, GCS), CMS platforms, databases with updated_at columns.
**Fails when:** Timestamps aren't reliable. Clocks drift. Bulk operations update timestamps without changing content. Some systems update timestamps on metadata changes (permission changes) that don't affect content.
### Strategy 2: Content Hashing
Compute a hash of each document's content. Compare with the hash stored at last index time.
```python
import hashlib
def content_hash(document_content: bytes) -> str:
return hashlib.sha256(document_content).hexdigest()
async def detect_by_hash(source, state_store):
changes = []
current_docs = await source.list_all_documents()
for doc in current_docs:
content = await source.get_content(doc.id)
new_hash = content_hash(content)
stored_hash = await state_store.get_hash(doc.id)
if stored_hash is None:
changes.append(Change("created", doc))
elif new_hash != stored_hash:
changes.append(Change("modified", doc))
return changes
```
**Works well for:** Any source where you can read the full content. Reliable regardless of timestamp accuracy.
**Fails when:** You can't efficiently read all document content for hashing (too many documents, too large files, API rate limits).
### Strategy 3: Webhooks / Event Streams
Don't poll. Let the source system tell you when something changes.
```python
# Webhook handler for document changes
@app.post("/webhooks/document-change")
async def handle_document_change(event: DocumentEvent):
match event.type:
case "document.created" | "document.updated":
await indexing_queue.enqueue({
"action": "index",
"document_id": event.document_id,
"source": event.source,
"timestamp": event.timestamp,
})
case "document.deleted":
await indexing_queue.enqueue({
"action": "delete",
"document_id": event.document_id,
})
return {"status": "accepted"}
```
**Works well for:** Systems with webhook support (Confluence, SharePoint, GitHub, most modern SaaS).
**Fails when:** Webhooks are missed (service downtime, network issues). Always pair with periodic full reconciliation.
### Strategy 4: Change Data Capture (CDC)
For database-backed knowledge bases, use CDC to stream changes.
```python
# Using Debezium or similar CDC tool
async def process_cdc_events(cdc_stream):
async for event in cdc_stream:
if event.table == "documents":
if event.operation in ("INSERT", "UPDATE"):
await index_document(event.after)
elif event.operation == "DELETE":
await remove_document(event.before["id"])
```
**Works well for:** When your knowledge base lives in a database. Real-time, reliable, zero polling.
## The Reindex Problem: Handling Modified Documents
When a document changes, you can't just add new chunks. You need to remove the old chunks and add new ones. The document might have grown (more chunks), shrunk (fewer chunks), or been completely rewritten.
```python
async def reindex_document(self, document):
"""Atomic reindex: remove old chunks, add new ones."""
# Step 1: Remove all existing chunks for this document
await self.index.delete(
filter={"document_id": document.id}
)
# Step 2: Process and index the new version
await self.index_document(document)
# Step 3: Update state
await self.state.update_document(
document_id=document.id,
content_hash=content_hash(document.content),
chunk_count=len(new_chunks),
indexed_at=now(),
)
```
The delete-then-insert must be atomic (or at least idempotent). If the pipeline crashes between delete and insert, you've lost the document from your index. Use transactions if your vector store supports them, or implement idempotent retries.
## Chunk-Level Diffing: The Efficient Path
For large documents where only a small section changed, re-embedding the entire document is wasteful. Chunk-level diffing only re-embeds changed chunks. For a deeper look, see [the hybrid search layer](/blog/hybrid-search-rag-production).
```python
async def smart_reindex(self, document):
"""Only re-embed chunks that actually changed."""
old_chunks = await self.state.get_chunks(document.id)
new_chunks = self.chunker.chunk(document)
old_map = {chunk_hash(c.text): c for c in old_chunks}
new_map = {chunk_hash(c.text): c for c in new_chunks}
# Unchanged chunks: keep existing embeddings
unchanged = set(old_map.keys()) & set(new_map.keys())
# New/modified chunks: need embedding
to_embed = [new_map[h] for h in set(new_map.keys()) - unchanged]
# Removed chunks: delete from index
to_delete = [old_map[h] for h in set(old_map.keys()) - unchanged]
# Execute changes
if to_delete:
await self.index.delete(ids=[c.id for c in to_delete])
if to_embed:
embeddings = await self.embedder.embed_batch(
[c.text for c in to_embed]
)
await self.index.upsert(to_embed, embeddings)
```
This is more complex but dramatically reduces embedding costs for large documents with frequent small changes (think wikis, living documents, frequently updated policies).
## Pipeline Architecture
A production document sync pipeline needs more than just change detection and indexing.
```
Source Systems (Confluence, SharePoint, S3, ...)
│
▼
Change Detector (webhooks + periodic reconciliation)
│
▼
Processing Queue (Redis, SQS, Kafka)
│
▼
Worker Pool
├── Document Fetcher (rate-limited per source)
├── Content Extractor (PDF, DOCX, HTML → text)
├── Chunker (structure-aware splitting)
├── Embedder (batched, retries, fallback)
└── Index Writer (upsert with metadata)
│
▼
Vector Index (Pinecone, Qdrant, Weaviate, ...)
│
▼
State Store (what's indexed, hashes, timestamps)
```
The queue is essential. It decouples change detection from processing, allows retries on failure, enables rate limiting per source system, and lets you scale workers independently.
## Reconciliation: The Safety Net
No matter how good your change detection is, drift happens. Webhooks get missed. Hashes get corrupted. State stores lose entries.
Run a full reconciliation periodically (weekly or monthly) that compares every document in every source against what's in your index.
```python
async def full_reconciliation(self):
"""Weekly safety net: compare everything."""
source_docs = await self.source.list_all_documents()
indexed_docs = await self.state.get_all_indexed()
source_map = {d.id: d for d in source_docs}
indexed_map = {d.id: d for d in indexed_docs}
# Find discrepancies
missing = set(source_map.keys()) - set(indexed_map.keys())
orphaned = set(indexed_map.keys()) - set(source_map.keys())
stale = [
doc_id for doc_id in source_map.keys() & indexed_map.keys()
if source_map[doc_id].content_hash != indexed_map[doc_id].content_hash
]
logger.info(f"Reconciliation: {len(missing)} missing, "
f"{len(orphaned)} orphaned, {len(stale)} stale")
# Fix discrepancies
for doc_id in missing:
await self.index_document(source_map[doc_id])
for doc_id in orphaned:
await self.remove_document(doc_id)
for doc_id in stale:
await self.reindex_document(source_map[doc_id])
```
## Monitoring: Know Before Users Complain
Track these metrics:
- **Sync lag:** Time between document modification and index update. Should be minutes, not hours.
- **Queue depth:** If the processing queue is growing, you're falling behind.
- **Failure rate:** What percentage of documents fail to index? Why?
- **Index coverage:** What percentage of source documents are actually indexed?
- **Freshness distribution:** How old are your indexed documents? Are any ancient?
A dashboard showing "98% of documents indexed within 15 minutes of modification" is more valuable than any architectural diagram.
## Start Simple
Don't build the full pipeline on day one. Start with:
1. Timestamp-based change detection with daily full sync
2. Simple delete-and-reinsert for modifications
3. A processing queue for reliability
4. Weekly full reconciliation
That covers 90% of freshness requirements. Add webhook-driven real-time sync, chunk-level diffing, and CDC when scale demands it.
The goal is fresh data. The path is incremental improvement. Just like the indexing itself.