Building production multi-agent systems from scratch. Covers ReAct, plan-and-execute, supervisor, and pipeline patterns with full Python implementations. Includes inter-agent communication, human-in-the-loop, memory systems, failure modes, and a real EdTech production architecture.
Tyler McDaniel
AI Engineer & IBM Business Partner
The word "agentic" has been stretched so far it means almost nothing. A chatbot that calls a function is not an agent. A RAG pipeline that retrieves documents is not an agent. An agent is a system that receives a goal, autonomously decides what steps to take, executes those steps using tools, observes the results, and adjusts its plan based on what it learned. The loop — plan, act, observe, revise — is the defining characteristic. Everything else is autocomplete with extra steps.
I've built multi-agent systems for three production use cases: an EdTech content pipeline that researches, writes, and fact-checks curriculum material; a codebase analysis system that maps dependencies, identifies risks, and proposes refactors; and a customer support triage system that classifies, routes, and drafts responses. The patterns that work in production look very different from the demo-day showcases, and this guide covers what actually survives contact with real data, real latency budgets, and real failure modes.
An agentic system has four properties:
A pipeline processes inputs through a fixed sequence of transformations. An agent navigates an open-ended problem space dynamically. Both are useful. Most production systems are pipelines with one or two agentic loops at critical decision points.
[ReAct](https://arxiv.org/abs/2210.03629) is the simplest agentic pattern. The LLM alternates between reasoning (thinking about what to do) and acting (calling a tool). Each cycle:
import json
from openai import OpenAI
client = OpenAI(base_url="http://localhost:8000/v1", api_key="your-key")
TOOLS = [
{
"type": "function",
"function": {
"name": "search_web",
"description": "Search the web for current information",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "Search query"}
},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read the contents of a local file",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path"}
},
"required": ["path"],
},
},
},
{
"type": "function",
"function": {
"name": "write_file",
"description": "Write content to a local file",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
"content": {"type": "string"},
},
"required": ["path", "content"],
},
},
},
]
def execute_tool(name: str, args: dict) -> str:
if name == "search_web":
# Real implementation would call a search API
return f"Search results for: {args['query']}\n[simulated results]"
elif name == "read_file":
with open(args["path"], "r") as f:
return f.read()
elif name == "write_file":
with open(args["path"], "w") as f:
f.write(args["content"])
return f"File written: {args['path']}"
return f"Unknown tool: {name}"
def react_agent(goal: str, max_steps: int = 10) -> str:
messages = [
{
"role": "system",
"content": (
"You are an autonomous agent. You have tools available. "
"Think step by step. Use tools to gather information and take actions. "
"When you have enough information to fully answer the goal, "
"respond with your final answer without calling any more tools."
),
},
{"role": "user", "content": goal},
]
for step in range(max_steps):
response = client.chat.completions.create(
model="llama3.1:8b",
messages=messages,
tools=TOOLS,
tool_choice="auto",
temperature=0.2, # Low temperature for consistent reasoning
)
choice = response.choices[0]
# If no tool calls, the agent is done
if not choice.message.tool_calls:
return choice.message.content
# Append the assistant's response (with tool calls)
messages.append(choice.message)
# Execute each tool call and add results
for tool_call in choice.message.tool_calls:
args = json.loads(tool_call.function.arguments)
result = execute_tool(tool_call.function.name, args)
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": result,
})
print(f"Step {step + 1}: {tool_call.function.name}({args}) → {result[:100]}...")
return "Agent reached maximum steps without completing the goal."
ReAct works well for single-goal tasks with 3-8 steps. It breaks down when the task requires long-horizon planning (20+ steps), because the model loses coherence as the context fills with intermediate observations.
For complex tasks, separate planning from execution:
def plan_and_execute(goal: str) -> str:
# Phase 1: Generate a plan
plan_response = client.chat.completions.create(
model="llama3.1:8b",
messages=[
{
"role": "system",
"content": (
"You are a planning agent. Given a goal, create a numbered step-by-step plan. "
"Each step should be a single, concrete action. "
"Output ONLY the numbered plan, nothing else."
),
},
{"role": "user", "content": f"Goal: {goal}"},
],
temperature=0.3,
)
plan = plan_response.choices[0].message.content
steps = [line.strip() for line in plan.split("\n") if line.strip()]
print(f"Plan:\n{plan}\n")
# Phase 2: Execute each step
results = []
for i, step in enumerate(steps):
print(f"\nExecuting step {i + 1}: {step}")
step_result = react_agent(
f"Execute this single step: {step}\n\n"
f"Context from previous steps:\n{chr(10).join(results[-3:])}",
max_steps=5,
)
results.append(f"Step {i + 1} result: {step_result}")
# Phase 3: Synthesize
synthesis_response = client.chat.completions.create(
model="llama3.1:8b",
messages=[
{
"role": "system",
"content": "Synthesize the results of the executed plan into a final answer.",
},
{
"role": "user",
"content": f"Goal: {goal}\n\nResults:\n{chr(10).join(results)}",
},
],
temperature=0.3,
)
return synthesis_response.choices[0].message.content
The plan provides structure that the ReAct loop alone can't maintain. Each step is a mini-agent interaction, scoped to a single sub-task. The context stays focused.
When a task requires different capabilities — research, analysis, writing, review — split it across specialized agents instead of one mega-agent.
One orchestrator agent delegates to specialized worker agents:
from dataclasses import dataclass
from enum import Enum
class AgentRole(Enum):
RESEARCHER = "researcher"
WRITER = "writer"
REVIEWER = "reviewer"
SUPERVISOR = "supervisor"
@dataclass
class AgentMessage:
sender: AgentRole
recipient: AgentRole
content: str
metadata: dict | None = None
class Agent:
def __init__(self, role: AgentRole, system_prompt: str, tools: list | None = None):
self.role = role
self.system_prompt = system_prompt
self.tools = tools or []
self.memory: list[AgentMessage] = []
async def process(self, message: AgentMessage) -> AgentMessage:
self.memory.append(message)
context = "\n".join(
f"[{m.sender.value}]: {m.content}" for m in self.memory[-10:]
)
response = client.chat.completions.create(
model="llama3.1:8b",
messages=[
{"role": "system", "content": self.system_prompt},
{"role": "user", "content": f"Context:\n{context}\n\nTask: {message.content}"},
],
tools=self.tools if self.tools else None,
temperature=0.3,
)
result = response.choices[0].message.content
reply = AgentMessage(
sender=self.role,
recipient=message.sender,
content=result,
)
self.memory.append(reply)
return reply
class MultiAgentSystem:
def __init__(self):
self.agents = {
AgentRole.RESEARCHER: Agent(
AgentRole.RESEARCHER,
"You are a research specialist. Given a topic, find relevant facts, "
"statistics, and sources. Be thorough and cite your sources.",
tools=TOOLS,
),
AgentRole.WRITER: Agent(
AgentRole.WRITER,
"You are a technical writer. Given research notes and an outline, "
"write clear, engaging content. Use specific examples and code where relevant.",
),
AgentRole.REVIEWER: Agent(
AgentRole.REVIEWER,
"You are a technical reviewer. Check content for accuracy, completeness, "
"and clarity. Point out specific issues and suggest improvements. "
"Rate the content on a 1-10 scale.",
),
}
self.message_log: list[AgentMessage] = []
async def run(self, goal: str, max_rounds: int = 5) -> str:
# Supervisor decides the workflow
supervisor = Agent(
AgentRole.SUPERVISOR,
"You are a supervisor coordinating a research, writing, and review team. "
"Decide which agent to delegate to next. Respond with JSON: "
'{"delegate_to": "researcher|writer|reviewer", "task": "specific instructions"} '
'or {"done": true, "final_output": "the completed work"}',
)
current_state = f"Goal: {goal}"
for round_num in range(max_rounds):
# Supervisor decides next action
supervisor_msg = AgentMessage(
sender=AgentRole.SUPERVISOR,
recipient=AgentRole.SUPERVISOR,
content=current_state,
)
decision = await supervisor.process(supervisor_msg)
try:
parsed = json.loads(decision.content)
except json.JSONDecodeError:
continue
if parsed.get("done"):
return parsed["final_output"]
# Delegate to the chosen agent
target_role = AgentRole(parsed["delegate_to"])
target_agent = self.agents[target_role]
task_msg = AgentMessage(
sender=AgentRole.SUPERVISOR,
recipient=target_role,
content=parsed["task"],
)
result = await target_agent.process(task_msg)
self.message_log.append(task_msg)
self.message_log.append(result)
current_state = (
f"Goal: {goal}\n\n"
f"Round {round_num + 1}: Delegated to {target_role.value}\n"
f"Task: {parsed['task']}\n"
f"Result: {result.content}\n\n"
f"Decide next step."
)
return "Multi-agent system reached maximum rounds."
For workflows with a fixed sequence, a pipeline is simpler and more reliable:
async def content_pipeline(topic: str) -> str:
system = MultiAgentSystem()
# Step 1: Research
research = await system.agents[AgentRole.RESEARCHER].process(
AgentMessage(
sender=AgentRole.SUPERVISOR,
recipient=AgentRole.RESEARCHER,
content=f"Research the following topic thoroughly: {topic}",
)
)
# Step 2: Write
draft = await system.agents[AgentRole.WRITER].process(
AgentMessage(
sender=AgentRole.SUPERVISOR,
recipient=AgentRole.WRITER,
content=f"Write a comprehensive article based on this research:\n{research.content}",
)
)
# Step 3: Review
review = await system.agents[AgentRole.REVIEWER].process(
AgentMessage(
sender=AgentRole.SUPERVISOR,
recipient=AgentRole.REVIEWER,
content=f"Review this article for accuracy and quality:\n{draft.content}",
)
)
# Step 4: Revise if needed (conditional agentic loop)
if "score: " in review.content.lower():
# Extract score, revise if below threshold
pass
return draft.content
The pipeline is deterministic — you know exactly what happens in what order. The supervisor pattern is adaptive — it can reorder steps, skip unnecessary work, or add rounds based on quality. In the production systems I've built, pipelines handle 90% of cases and the supervisor pattern handles the remaining 10% where the task is genuinely unpredictable.
How agents talk to each other determines the system's flexibility and debuggability. Three patterns emerge in practice:
Direct messaging. Agent A sends a message to Agent B and waits for a reply. Simple, synchronous, easy to trace. This is what the AgentMessage class above implements. Downside: tightly coupled. If you add Agent C, you need to update routing logic everywhere.
Shared blackboard. A central data store that all agents read from and write to. Each agent watches for changes relevant to its role and acts when triggered:
from threading import Lock
class Blackboard:
def __init__(self):
self._state: dict = {}
self._lock = Lock()
self._watchers: list = []
def write(self, key: str, value, author: AgentRole):
with self._lock:
self._state[key] = {
"value": value,
"author": author,
"version": self._state.get(key, {}).get("version", 0) + 1,
}
self._notify(key)
def read(self, key: str):
with self._lock:
entry = self._state.get(key)
return entry["value"] if entry else None
def watch(self, key_prefix: str, callback):
self._watchers.append((key_prefix, callback))
def _notify(self, key: str):
for prefix, callback in self._watchers:
if key.startswith(prefix):
callback(key, self._state[key])
The blackboard decouples agents — each agent only knows about the blackboard, not about other agents. This scales better when you have 5+ agents, but debugging is harder because causality is indirect. You see what changed on the blackboard but not always why agent B acted the way it did unless you trace back through the state history. Event bus / pub-sub. Agents publish events (e.g., "research_complete") and subscribe to events they care about. This is the most decoupled approach and works best for systems where agents run concurrently:
import asyncio
from collections import defaultdict
class EventBus:
def __init__(self):
self._subscribers: dict[str, list] = defaultdict(list)
self._event_log: list[dict] = []
def subscribe(self, event_type: str, handler):
self._subscribers[event_type].append(handler)
async def publish(self, event_type: str, data: dict, sender: AgentRole):
event = {
"type": event_type,
"data": data,
"sender": sender.value,
"timestamp": datetime.now().isoformat(),
}
self._event_log.append(event)
tasks = []
for handler in self._subscribers[event_type]:
tasks.append(asyncio.create_task(handler(event)))
if tasks:
await asyncio.gather(*tasks)
def get_audit_trail(self) -> list[dict]:
return self._event_log.copy()
In my content pipeline production system, I use direct messaging within a pipeline stage (researcher → supervisor → writer) and pub-sub between stages (the "content_reviewed" event triggers the publishing stage). Pick the pattern that matches your coordination complexity.
Fully autonomous agents sound exciting until they send a wrong email to a customer or delete the wrong database table. Human-in-the-loop (HITL) is not a fallback — it's a design pattern.
Three escalation strategies: Approval gates. The agent pauses at defined checkpoints and presents its proposed action for human approval. Use this for destructive actions, external communications, and financial transactions.
import asyncio
class ApprovalGate:
def __init__(self):
self._pending: dict[str, asyncio.Future] = {}
async def request_approval(self, action_desc: str, context: dict) -> bool:
"""Block the agent until a human approves or rejects."""
approval_id = f"approval_{len(self._pending)}"
future = asyncio.get_event_loop().create_future()
self._pending[approval_id] = {
"future": future,
"action": action_desc,
"context": context,
}
# In production: send to a Slack channel, UI dashboard, or email
print(f"\n[APPROVAL REQUIRED] {action_desc}")
print(f"Context: {json.dumps(context, indent=2)}")
approved = await future # Blocks until resolve_approval is called
del self._pending[approval_id]
return approved
def resolve_approval(self, approval_id: str, approved: bool):
"""Called by the human via UI/API."""
if approval_id in self._pending:
self._pending[approval_id]["future"].set_result(approved)
Confidence thresholds. The agent self-assesses its confidence. High confidence → act autonomously. Low confidence → escalate to a human. This requires calibrated confidence scoring, which is notoriously unreliable with LLMs. I add a second LLM call specifically for confidence estimation with structured output:
def assess_confidence(agent_output: str, original_goal: str) -> float:
response = client.chat.completions.create(
model="llama3.1:8b",
messages=[
{
"role": "system",
"content": (
"Rate the confidence that this output correctly achieves the goal. "
"Respond with ONLY a JSON object: {\"confidence\": 0.0-1.0, \"reasoning\": \"...\"}"
),
},
{
"role": "user",
"content": f"Goal: {original_goal}\n\nOutput: {agent_output}",
},
],
temperature=0.1,
)
try:
parsed = json.loads(response.choices[0].message.content)
return parsed["confidence"]
except (json.JSONDecodeError, KeyError):
return 0.0 # Fail safe: low confidence triggers escalation
Feedback loops. After the agent acts, a human reviews the output and provides corrections. The agent learns from these corrections via episodic memory (see the Episodic Memory section above). Over time, the agent makes fewer mistakes in similar situations. This is slower than approval gates but creates a learning system.In practice, I combine all three: approval gates for high-stakes actions, confidence thresholds for routing decisions, and feedback loops for continuous quality improvement.
Agents without memory repeat the same mistakes every conversation. There are three types of memory that matter:
This is the conversation history — the list of messages in the current interaction. It's limited by the model's context window. For Llama 3.1 8B at 8K context, that's roughly 6,000 tokens of usable working memory after the system prompt and tool definitions. Managing context overflow: When the working context approaches the limit, summarize older messages:
def compress_memory(messages: list[dict], max_tokens: int = 4000) -> list[dict]:
"""Keep system prompt + recent messages, summarize the middle."""
system = [m for m in messages if m["role"] == "system"]
recent = messages[-6:] # Keep last 3 exchanges
if len(messages) <= len(system) + len(recent):
return messages
middle = messages[len(system):-6]
middle_text = "\n".join(f"{m['role']}: {m['content'][:200]}" for m in middle)
summary_response = client.chat.completions.create(
model="llama3.1:8b",
messages=[
{"role": "system", "content": "Summarize this conversation history concisely."},
{"role": "user", "content": middle_text},
],
max_tokens=500,
)
summary = {
"role": "system",
"content": f"[Previous conversation summary]: {summary_response.choices[0].message.content}",
}
return system + [summary] + recent
Facts, preferences, and learned patterns that persist across sessions. Store in a vector database for retrieval:
import numpy as np
from datetime import datetime
class LongTermMemory:
def __init__(self, embedding_fn):
self.memories: list[dict] = []
self.embedding_fn = embedding_fn
def store(self, content: str, metadata: dict | None = None):
embedding = self.embedding_fn(content)
self.memories.append({
"content": content,
"embedding": embedding,
"timestamp": datetime.now().isoformat(),
"access_count": 0,
"metadata": metadata or {},
})
def recall(self, query: str, top_k: int = 5) -> list[str]:
query_embedding = self.embedding_fn(query)
scores = []
for mem in self.memories:
similarity = np.dot(query_embedding, mem["embedding"]) / (
np.linalg.norm(query_embedding) * np.linalg.norm(mem["embedding"])
)
# Boost recently accessed and frequently accessed memories
recency_bonus = 0.1 * (1.0 / (1 + mem["access_count"]))
scores.append((similarity + recency_bonus, mem))
scores.sort(key=lambda x: x[0], reverse=True)
results = []
for score, mem in scores[:top_k]:
mem["access_count"] += 1
results.append(mem["content"])
return results
For production vector storage, see [Vector Databases: A Practitioner's Comparison](https://tostupidtooquit.com/blog/vector-databases-practitioner-comparison) — the embedding store you choose matters a lot for agent recall quality.
Records of previous task executions — what worked, what failed, and why. This is the least common but most valuable memory type for production agents:
@dataclass
class Episode:
goal: str
steps: list[str]
outcome: str # "success" | "failure" | "partial"
lesson: str # What to do differently next time
timestamp: str
class EpisodicMemory:
def __init__(self):
self.episodes: list[Episode] = []
def record(self, episode: Episode):
self.episodes.append(episode)
def recall_similar(self, goal: str) -> list[Episode]:
# In production, use embedding similarity
# Simplified: keyword matching
relevant = []
goal_words = set(goal.lower().split())
for ep in self.episodes:
ep_words = set(ep.goal.lower().split())
overlap = len(goal_words & ep_words) / max(len(goal_words), 1)
if overlap > 0.3:
relevant.append(ep)
return relevant[-5:] # Most recent relevant episodes
Inject episodic memory into the system prompt:
episodes = episodic_memory.recall_similar(current_goal)
if episodes:
memory_context = "Lessons from similar past tasks:\n"
for ep in episodes:
memory_context += f"- Goal: {ep.goal} → {ep.outcome}: {ep.lesson}\n"
system_prompt += f"\n\n{memory_context}"
Production agents fail in ways that pure software doesn't. These are the failure modes I've encountered and how to handle them: Infinite loops. The agent calls the same tool with the same arguments repeatedly, expecting different results. Fix: track the last N tool calls, detect duplicates, and force a different action or terminate. Hallucinated tool calls. The agent invents tool names or argument formats that don't exist. Fix: validate tool calls against your schema before execution. Return a clear error message that guides the agent to use the correct format. Context poisoning. A tool returns unexpected data (HTML instead of JSON, an error page instead of search results) that confuses subsequent reasoning. Fix: sanitize tool outputs, truncate to a maximum length, and wrap errors in structured messages. Goal drift. Over many steps, the agent loses track of the original goal and starts pursuing tangential objectives. Fix: re-inject the original goal in the system prompt every N steps. Use the plan-and-execute pattern where the plan anchors behavior. Catastrophic tool use. The agent decides to delete all files, send emails to customers, or execute destructive database queries. Fix: tool-level permissions. Every tool has a permission level (read/write/destructive). The agent declares which permission levels it needs. Destructive actions require a human approval step.
class ToolPermission(Enum):
READ = "read"
WRITE = "write"
DESTRUCTIVE = "destructive"
def execute_tool_safe(name: str, args: dict, allowed: set[ToolPermission]) -> str:
tool_permissions = {
"search_web": ToolPermission.READ,
"read_file": ToolPermission.READ,
"write_file": ToolPermission.WRITE,
"delete_file": ToolPermission.DESTRUCTIVE,
"send_email": ToolPermission.DESTRUCTIVE,
}
required = tool_permissions.get(name, ToolPermission.DESTRUCTIVE)
if required not in allowed:
return f"PERMISSION DENIED: Tool '{name}' requires {required.value} permission."
return execute_tool(name, args)
| Feature | [LangGraph](https://langchain-ai.github.io/langgraph/) | [CrewAI](https://github.com/crewAIInc/crewAI) | [AutoGen](https://github.com/microsoft/autogen) | Custom (above) |
|---------|----------|--------|---------|--------|
| Architecture | Graph-based state machine | Role-based agents | Conversation-based | Your design |
| Learning curve | Steep (graph concepts) | Moderate (YAML config) | Moderate | Low (it's your code) |
| Multi-agent | Yes (nodes in graph) | Yes (crews + tasks) | Yes (group chat) | Yes |
| Streaming | Yes | Limited | Yes | Yes (implement it) |
| State management | Built-in checkpointing | Basic | Conversation history | Implement yourself |
| Debugging | LangSmith integration | Limited | Basic logging | Full control |
| Vendor lock-in | LangChain ecosystem | Moderate | Microsoft ecosystem | None |
| Production readiness | Good | Fair | Good | Depends on you |
| Observability | LangSmith, LangFuse | Limited | AutoGen Studio | Bring your own |
My take: For simple agent workflows (3-5 tools, single agent), write it yourself. The code above is ~100 lines and you understand every line. For complex multi-agent systems with branching logic, conditional routing, and state persistence, LangGraph is the most capable framework — but you'll spend a week learning its graph abstraction before you're productive.CrewAI is the fastest to prototype with (YAML config for agent roles), but its abstractions leak under production pressure. AutoGen is solid for conversational multi-agent patterns but heavily tied to the Microsoft/Azure ecosystem.
async def stream_agent_progress(goal: str):
"""Generator that yields status updates as the agent works."""
yield {"status": "planning", "message": f"Breaking down: {goal}"}
plan = await generate_plan(goal)
yield {"status": "planned", "message": f"Plan: {len(plan)} steps", "steps": plan}
for i, step in enumerate(plan):
yield {"status": "executing", "step": i + 1, "message": f"Working on: {step}"}
result = await execute_step(step)
yield {"status": "step_complete", "step": i + 1, "result_preview": result[:200]}
final = await synthesize_results()
yield {"status": "complete", "result": final}
Determinism is gone. The same input with the same model can produce different tool call sequences. For anything where reproducibility matters (auditing, compliance, debugging), log every decision point and tool call with full inputs and outputs. Set temperature=0.0 for routing and tool selection calls (you want consistent decisions), and only use nonzero temperature for generation steps where variation is acceptable.
Evaluation is hard. How do you test that an agent "works"? Unit tests check individual tool calls. Integration tests check specific scenarios. But the combinatorial space of possible agent behavior is intractable. Invest in golden dataset evaluation: 50-100 representative tasks with expected outcomes, run nightly, measure success rate. Here's a minimal evaluation harness:
import csv
@dataclass
class TestCase:
goal: str
expected_tools: list[str] # Tools that should be called
expected_output_contains: list[str] # Substrings in final output
max_steps: int
max_tokens: int
def evaluate_agent(agent_fn, test_cases: list[TestCase]) -> dict:
results = {"passed": 0, "failed": 0, "errors": 0, "details": []}
for tc in test_cases:
try:
output, trace = agent_fn(tc.goal, tc.max_steps)
tools_used = [s["tool"] for s in trace if s["type"] == "tool_call"]
total_tokens = sum(s.get("tokens", 0) for s in trace)
# Check expected tools were called
tools_ok = all(t in tools_used for t in tc.expected_tools)
# Check output contains expected content
output_ok = all(s in output for s in tc.expected_output_contains)
# Check token budget
budget_ok = total_tokens <= tc.max_tokens
passed = tools_ok and output_ok and budget_ok
results["passed" if passed else "failed"] += 1
results["details"].append({
"goal": tc.goal,
"passed": passed,
"tools_ok": tools_ok,
"output_ok": output_ok,
"budget_ok": budget_ok,
"tokens": total_tokens,
"steps": len(trace),
})
except Exception as e:
results["errors"] += 1
results["details"].append({"goal": tc.goal, "error": str(e)})
results["success_rate"] = results["passed"] / max(len(test_cases), 1)
return results
Run this nightly. Track the success rate over time. When it drops below 90%, investigate. Most regressions come from model updates (provider changed model weights), tool API changes (external service returned a different format), or prompt rot (the system prompt accumulated patches that now conflict). Error recovery has a budget. Retrying a failed agent step costs tokens, latency, and money. Set a retry budget per task — I use 3 retries per step, 2 replans per task. After that, the task fails with a structured error report and enters a human review queue. Unbounded retries are how you wake up to a $2,000 LLM bill.
Here's the architecture of the EdTech content pipeline I run in production. The system takes a curriculum topic and produces fact-checked, LTI-compatible lesson content that's delivered to an LMS via [LTI 1.3](https://tostupidtooquit.com/blog/understanding-lti-13-integration):
┌─────────────────────────────────────────────────────────────┐
│ Task Queue (Redis) │
│ topic: "Photosynthesis for AP Biology" │
└──────────────────────────┬──────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────────┐
│ Supervisor Agent (Llama 3.1 8B — cheap routing model) │
│ Decides: research → write → fact-check → format │
│ Monitors: token budget, step count, quality gate scores │
└────┬──────────────┬───────────────┬──────────────┬──────────┘
│ │ │ │
▼ ▼ ▼ ▼
┌─────────┐ ┌──────────┐ ┌──────────────┐ ┌──────────┐
│Researcher│ │ Writer │ │ Fact-Checker │ │Formatter │
│(GPT-4o) │ │(Claude) │ │(GPT-4o-mini) │ │(Llama 8B)│
│Tools: │ │Tools: │ │Tools: │ │Tools: │
│- search │ │- none │ │- search │ │- write │
│- scrape │ │(gen only)│ │- calculator │ │- validate│
└─────────┘ └──────────┘ └──────────────┘ └──────────┘
│
▼
┌────────────────────┐
│ Human Review Queue │
│ (if conf < 0.85) │
└────────────────────┘
Key design decisions:
This system processes 200-400 content pieces per day with a 94% accuracy rate on automated fact-checking and a 99.2% accuracy rate after human review. The total compute cost is approximately $25/day — cheaper than a single content writer and 10x faster.
You cannot debug an agent system without tracing every decision. Here's the minimum observability layer I put on every production agent:
import logging
import time
from contextvars import ContextVar
from uuid import uuid4
trace_id_var: ContextVar[str] = ContextVar("trace_id", default="")
class AgentTracer:
def __init__(self, logger: logging.Logger | None = None):
self.logger = logger or logging.getLogger("agent")
self.spans: list[dict] = []
def start_trace(self, goal: str) -> str:
trace_id = str(uuid4())
trace_id_var.set(trace_id)
self.logger.info(
"trace_start",
extra={"trace_id": trace_id, "goal": goal},
)
return trace_id
def record_llm_call(
self,
agent_role: str,
messages: list[dict],
response: str,
model: str,
tokens_used: int,
latency_ms: float,
):
span = {
"trace_id": trace_id_var.get(),
"type": "llm_call",
"agent": agent_role,
"model": model,
"input_preview": messages[-1]["content"][:200],
"output_preview": response[:200],
"tokens": tokens_used,
"latency_ms": latency_ms,
"timestamp": datetime.now().isoformat(),
}
self.spans.append(span)
self.logger.info("llm_call", extra=span)
def record_tool_call(
self,
agent_role: str,
tool_name: str,
args: dict,
result: str,
latency_ms: float,
):
span = {
"trace_id": trace_id_var.get(),
"type": "tool_call",
"agent": agent_role,
"tool": tool_name,
"args": json.dumps(args)[:500],
"result_preview": result[:300],
"latency_ms": latency_ms,
"timestamp": datetime.now().isoformat(),
}
self.spans.append(span)
self.logger.info("tool_call", extra=span)
def get_trace_summary(self, trace_id: str) -> dict:
trace_spans = [s for s in self.spans if s["trace_id"] == trace_id]
total_tokens = sum(s.get("tokens", 0) for s in trace_spans)
total_latency = sum(s["latency_ms"] for s in trace_spans)
llm_calls = sum(1 for s in trace_spans if s["type"] == "llm_call")
tool_calls = sum(1 for s in trace_spans if s["type"] == "tool_call")
return {
"trace_id": trace_id,
"total_spans": len(trace_spans),
"llm_calls": llm_calls,
"tool_calls": tool_calls,
"total_tokens": total_tokens,
"total_latency_ms": total_latency,
"estimated_cost_usd": total_tokens * 0.000005, # Adjust per model pricing
}
Pair this with [OpenTelemetry](https://opentelemetry.io/) for distributed tracing if your agents span multiple services. The trace ID propagates across agent boundaries, so you can follow a single task from the supervisor's decision through every worker agent's tool calls and back.
Ship the spans to whatever backend you use — Grafana Tempo, Jaeger, Datadog, or even a simple JSONL file for a small deployment. The non-negotiable data points are: which agent acted, what it decided, what it observed, how long it took, and how many tokens it burned. Without this, you're debugging by prayer.
I've watched teams spend six months building agent systems for problems that a well-structured prompt and a single API call would solve. Here's my heuristic: Use a pipeline (not an agent) when:
The biggest mistake I see is reaching for agents when a Python script with if/else and a database query would work. [TypeScript's type system](https://tostupidtooquit.com/blog/typescript-advanced-patterns) can enforce most business rules at compile time. Agents handle ambiguity well — but if your problem isn't ambiguous, they're burning money to appear intelligent.
The second biggest mistake is building a single monolithic agent with 40 tools and a 6,000 token system prompt. That agent will be mediocre at everything and excellent at nothing. Decompose into focused agents with 3-5 tools each. The supervisor uses routing, not raw capability.
For connecting these agents to external tools via a standard protocol, [MCP Protocol in LLM Applications](https://tostupidtooquit.com/blog/mcp-protocol-llm-applications) covers how MCP provides a clean abstraction between agents and the tools they use.
---
Building production MCP servers and clients with Python. Covers the JSON-RPC 2.0 wire protocol, transport layers (stdio, SSE, Streamable HTTP), filesystem tool implementation with path traversal protection, and connecting Claude to your custom tools.
Running quantized LLMs behind a FastAPI proxy with Ollama and vLLM backends. Covers model quantization tradeoffs, GGUF vs GPTQ vs AWQ, streaming responses, request queuing, Docker Compose deployment, and production monitoring.
Hands-on comparison of Pinecone, Qdrant, Weaviate, pgvector, and Chroma for production AI. Covers embedding fundamentals, indexing algorithms (HNSW, IVF, PQ), chunking strategies, reranking, and when each database fits.