Building a Document Processing Pipeline with AI Agents
By Diesel
tutorialdocumentspipelineextraction
Every organisation I've worked with has the same problem. Documents pile up. Invoices need data extracted. Contracts need clauses identified. Reports need summaries. And somewhere, a human is doing all of this manually, eight hours a day, slowly losing the will to live.
AI agents are perfect for this. Not because the extraction is hard (it's not), but because the pipeline needs to make decisions. Is this an invoice or a receipt? Which fields are missing? Does this contract clause trigger a review? That's agent territory.
## The Architecture
```
Document → Classify → Extract → Validate → Route → Store
```
Six stages. Each one is a function the agent can call. The agent decides how to chain them based on what it finds.
## Setup
```bash
pip install anthropic pypdf2 pydantic python-magic pillow
```
We're using Claude's native API with tool use. No framework dependency. Just the Anthropic SDK.
## Document Intake
First, get the document into a format Claude can read.
```python
import anthropic
import base64
from pathlib import Path
from PyPDF2 import PdfReader
client = anthropic.Anthropic()
def load_document(file_path: str) -> dict:
"""Load a document and return its content in a processable format."""
path = Path(file_path)
suffix = path.suffix.lower()
if suffix == ".pdf":
reader = PdfReader(file_path)
text = "\n\n".join(page.extract_text() or "" for page in reader.pages)
return {
"type": "pdf",
"text": text,
"pages": len(reader.pages),
"filename": path.name,
}
elif suffix in (".png", ".jpg", ".jpeg", ".webp"):
with open(file_path, "rb") as f:
image_data = base64.standard_b64encode(f.read()).decode("utf-8")
media_type = {
".png": "image/png",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".webp": "image/webp",
}[suffix]
return {
"type": "image",
"data": image_data,
"media_type": media_type,
"filename": path.name,
}
elif suffix in (".txt", ".md", ".csv"):
return {
"type": "text",
"text": path.read_text(),
"filename": path.name,
}
else:
raise ValueError(f"Unsupported file type: {suffix}")
```
## Document Classification
The first decision point. What kind of document is this? The answer determines the entire downstream pipeline.
```python
from pydantic import BaseModel
from enum import Enum
class DocumentType(str, Enum):
INVOICE = "invoice"
CONTRACT = "contract"
REPORT = "report"
RECEIPT = "receipt"
LETTER = "letter"
FORM = "form"
UNKNOWN = "unknown"
class Classification(BaseModel):
document_type: DocumentType
confidence: float
language: str
summary: str
def classify_document(doc: dict) -> Classification:
"""Classify the document type using Claude."""
if doc["type"] == "image":
content = [
{"type": "image", "source": {
"type": "base64",
"media_type": doc["media_type"],
"data": doc["data"],
}},
{"type": "text", "text": "Classify this document."}
]
else:
content = f"Classify this document:\n\n{doc['text'][:3000]}" The related post on [document classification](/blog/document-classification-enterprise) goes further on this point.
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=1024,
system="""You are a document classifier. Analyze the document and return
a JSON object with these fields:
- document_type: one of "invoice", "contract", "report", "receipt", "letter", "form", "unknown"
- confidence: 0.0 to 1.0
- language: ISO 639-1 code (e.g., "en", "ar", "fr")
- summary: one sentence describing the document
Return ONLY valid JSON. No markdown, no explanation.""",
messages=[{"role": "user", "content": content}],
)
import json
data = json.loads(response.content[0].text)
return Classification(**data)
```
## Extraction Schemas
Different document types need different extraction schemas. Define them upfront.
```python
class InvoiceData(BaseModel):
invoice_number: str | None = None
date: str | None = None
due_date: str | None = None
vendor_name: str | None = None
vendor_address: str | None = None
total_amount: float | None = None
currency: str = "USD"
tax_amount: float | None = None
line_items: list[dict] = []
payment_terms: str | None = None
class ContractData(BaseModel):
parties: list[str] = []
effective_date: str | None = None
expiry_date: str | None = None
contract_type: str | None = None
key_terms: list[str] = []
obligations: list[dict] = []
termination_clause: str | None = None
governing_law: str | None = None
total_value: float | None = None
class ReportData(BaseModel):
title: str | None = None
author: str | None = None
date: str | None = None
executive_summary: str | None = None
key_findings: list[str] = []
recommendations: list[str] = []
data_points: list[dict] = []
```
## The Extraction Engine
One function handles all extraction. The schema changes based on document type.
```python
import json
EXTRACTION_PROMPTS = {
DocumentType.INVOICE: """Extract all invoice data. Return JSON matching this schema:
{
"invoice_number": "string or null",
"date": "YYYY-MM-DD or null",
"due_date": "YYYY-MM-DD or null",
"vendor_name": "string or null",
"vendor_address": "string or null",
"total_amount": number or null,
"currency": "USD/EUR/GBP/etc",
"tax_amount": number or null,
"line_items": [{"description": "string", "quantity": number, "unit_price": number, "total": number}],
"payment_terms": "string or null"
}""",
DocumentType.CONTRACT: """Extract all contract data. Return JSON matching this schema:
{
"parties": ["party names"],
"effective_date": "YYYY-MM-DD or null",
"expiry_date": "YYYY-MM-DD or null",
"contract_type": "string or null",
"key_terms": ["important terms"],
"obligations": [{"party": "name", "obligation": "description"}],
"termination_clause": "summary or null",
"governing_law": "jurisdiction or null",
"total_value": number or null
}""",
DocumentType.REPORT: """Extract all report data. Return JSON matching this schema:
{
"title": "string or null",
"author": "string or null",
"date": "YYYY-MM-DD or null",
"executive_summary": "string or null",
"key_findings": ["findings"],
"recommendations": ["recommendations"],
"data_points": [{"metric": "name", "value": "value", "context": "explanation"}]
}""",
}
def extract_data(doc: dict, doc_type: DocumentType) -> dict:
"""Extract structured data from the document."""
prompt = EXTRACTION_PROMPTS.get(doc_type)
if not prompt:
return {"error": f"No extraction schema for type: {doc_type}"}
text = doc.get("text", "")
if doc["type"] == "image":
messages_content = [
{"type": "image", "source": {
"type": "base64",
"media_type": doc["media_type"],
"data": doc["data"],
}},
{"type": "text", "text": prompt}
]
else:
messages_content = f"{prompt}\n\nDocument:\n{text}"
response = client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
system="You are a precise data extraction system. Return ONLY valid JSON. No markdown fences, no explanations.",
messages=[{"role": "user", "content": messages_content}],
) It is worth reading about [multi-modal RAG pipelines](/blog/multi-modal-rag-enterprise) alongside this.
return json.loads(response.content[0].text)
```
## Validation
Extraction is never perfect. Validate and flag issues.
```python
from dataclasses import dataclass, field
@dataclass
class ValidationResult:
is_valid: bool
errors: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
confidence: float = 1.0
def validate_invoice(data: dict) -> ValidationResult:
"""Validate extracted invoice data."""
errors = []
warnings = []
# Required fields
required = ["invoice_number", "date", "vendor_name", "total_amount"]
for field_name in required:
if not data.get(field_name):
errors.append(f"Missing required field: {field_name}")
# Line item math
line_items = data.get("line_items", [])
if line_items and data.get("total_amount"):
calculated_total = sum(item.get("total", 0) for item in line_items)
tax = data.get("tax_amount", 0) or 0
expected = calculated_total + tax
if abs(expected - data["total_amount"]) > 0.01:
warnings.append(
f"Line items total ({calculated_total}) + tax ({tax}) = {expected}, "
f"but invoice total is {data['total_amount']}"
)
# Date sanity
if data.get("due_date") and data.get("date"):
if data["due_date"] < data["date"]:
errors.append("Due date is before invoice date")
confidence = 1.0 - (len(errors) * 0.2) - (len(warnings) * 0.1)
return ValidationResult(
is_valid=len(errors) == 0,
errors=errors,
warnings=warnings,
confidence=max(0.0, confidence),
)
def validate_extraction(data: dict, doc_type: DocumentType) -> ValidationResult:
"""Route validation to the appropriate validator."""
validators = {
DocumentType.INVOICE: validate_invoice,
# Add more validators as needed
}
validator = validators.get(doc_type)
if not validator:
return ValidationResult(is_valid=True, warnings=["No validator for this type"])
return validator(data)
```
## Routing
Based on classification and validation, route the document to the right place.
```python
@dataclass
class RoutingDecision:
destination: str
priority: str # low, medium, high, urgent
requires_review: bool
reason: str
def route_document(
doc_type: DocumentType,
data: dict,
validation: ValidationResult
) -> RoutingDecision:
"""Decide where the document goes next."""
# Validation failures need human review
if not validation.is_valid:
return RoutingDecision(
destination="human_review_queue",
priority="high",
requires_review=True,
reason=f"Validation errors: {validation.errors}"
)
# Route by type
if doc_type == DocumentType.INVOICE:
amount = data.get("total_amount", 0)
if amount and amount > 10000:
return RoutingDecision(
destination="finance_approval",
priority="high",
requires_review=True,
reason=f"Invoice exceeds $10,000 threshold (${amount})"
)
return RoutingDecision(
destination="accounts_payable",
priority="medium",
requires_review=False,
reason="Standard invoice processing"
)
elif doc_type == DocumentType.CONTRACT:
return RoutingDecision(
destination="legal_review",
priority="high",
requires_review=True,
reason="All contracts require legal review"
)
return RoutingDecision(
destination="general_inbox",
priority="low",
requires_review=False,
reason="Standard document filing"
)
```
## The Full Pipeline
Tie it all together.
```python
@dataclass
class ProcessingResult:
filename: str
classification: Classification
extracted_data: dict
validation: ValidationResult
routing: RoutingDecision
def process_document(file_path: str) -> ProcessingResult:
"""Run the full document processing pipeline."""
# 1. Load
print(f"Loading: {file_path}")
doc = load_document(file_path)
# 2. Classify
print("Classifying...")
classification = classify_document(doc)
print(f" Type: {classification.document_type} ({classification.confidence:.0%})")
# 3. Extract
print("Extracting data...")
data = extract_data(doc, classification.document_type)
# 4. Validate
print("Validating...")
validation = validate_extraction(data, classification.document_type)
if validation.errors:
print(f" Errors: {validation.errors}")
if validation.warnings:
print(f" Warnings: {validation.warnings}")
# 5. Route
print("Routing...")
routing = route_document(classification.document_type, data, validation)
print(f" Destination: {routing.destination} (priority: {routing.priority})")
return ProcessingResult(
filename=doc["filename"],
classification=classification,
extracted_data=data,
validation=validation,
routing=routing,
)
```
## Batch Processing
Real pipelines process hundreds of documents. Batch them.
```python
import asyncio
from pathlib import Path
async def process_batch(directory: str, max_concurrent: int = 5) -> list[ProcessingResult]:
"""Process all documents in a directory with concurrency control."""
semaphore = asyncio.Semaphore(max_concurrent)
results = []
supported = {".pdf", ".png", ".jpg", ".jpeg", ".txt", ".csv"}
files = [
f for f in Path(directory).iterdir()
if f.suffix.lower() in supported
] The related post on [chunking extracted content](/blog/chunking-strategies-at-scale) goes further on this point.
async def process_one(file_path: str):
async with semaphore:
try:
result = process_document(file_path)
return result
except Exception as e:
print(f"Failed: {file_path}: {e}")
return None
tasks = [process_one(str(f)) for f in files]
completed = await asyncio.gather(*tasks)
results = [r for r in completed if r is not None]
# Summary
print(f"\nProcessed: {len(results)}/{len(files)}")
for doc_type in DocumentType:
count = sum(1 for r in results if r.classification.document_type == doc_type)
if count:
print(f" {doc_type.value}: {count}")
return results
```
Five concurrent API calls. More than that and you'll hit rate limits. Less and you're wasting time on I/O bound work.
## The Agent Layer
All of the above is a pipeline. To make it an agent, add decision-making. The agent uses the pipeline stages as tools and decides what to do based on what it finds.
```python
tools = [
{
"name": "classify",
"description": "Classify a document's type and language",
"input_schema": {
"type": "object",
"properties": {"file_path": {"type": "string"}},
"required": ["file_path"]
}
},
{
"name": "extract",
"description": "Extract structured data from a classified document",
"input_schema": {
"type": "object",
"properties": {
"file_path": {"type": "string"},
"document_type": {"type": "string"}
},
"required": ["file_path", "document_type"]
}
},
{
"name": "validate",
"description": "Validate extracted data for completeness and accuracy",
"input_schema": {
"type": "object",
"properties": {
"data": {"type": "object"},
"document_type": {"type": "string"}
},
"required": ["data", "document_type"]
}
},
{
"name": "route",
"description": "Decide where to send the processed document",
"input_schema": {
"type": "object",
"properties": {
"document_type": {"type": "string"},
"data": {"type": "object"},
"is_valid": {"type": "boolean"}
},
"required": ["document_type", "data", "is_valid"]
}
}
]
```
The agent can now reason about the process. "This invoice failed validation. Let me re-extract with more context." Or "this document is classified as unknown. Let me try extracting it as a report and see if the fields match." That adaptive behaviour is what a static pipeline can't do.
## What You've Built
A system that takes any document, figures out what it is, pulls out the structured data, validates it, and sends it where it needs to go. The pipeline does the heavy lifting. The agent handles the edge cases.
Most documents go straight through. The interesting ones, the ones that fail validation, that have low confidence classification, that contain unexpected content, those get agent attention. That's the sweet spot. Automate the 80%, let the agent handle the 20% that needs reasoning.