Agentic Workflows ; Planning, Routing, and Branching
Building production agentic systems: task planning, workflow routing, parallel execution, human-in-the-loop patterns, and failure recovery that actually works.
Beyond the Single-Agent Loop
A single agent running a ReAct loop handles a wide range of tasks well. It reasons, uses tools, observes results, and continues until done. That pattern breaks down at scale: when tasks are too large for one context window, when subtasks can run in parallel, when different subtasks need different specializations, or when the agent needs human confirmation before taking an action it cannot undo.
Agentic workflows are how you address these limits. Instead of one agent doing everything, you compose agents and routing logic into a system that handles complex, multi-step work reliably.
Task Planning Patterns
Fixed pipelines: You know exactly what steps are needed. Define them statically and execute them in sequence. No LLM involved in planning, only in execution.
async def document_analysis_pipeline(document_path: str) -> dict:
# Step 1: Extract text
text = await extract_text_tool(document_path)
# Step 2: Classify document type
doc_type = await classify_document(text)
# Step 3: Route to specialist based on type
if doc_type == "legal":
analysis = await legal_analysis_agent(text)
elif doc_type == "financial":
analysis = await financial_analysis_agent(text)
else:
analysis = await general_analysis_agent(text)
# Step 4: Generate summary
summary = await summarize_agent(analysis)
return {"type": doc_type, "analysis": analysis, "summary": summary}
Fixed pipelines are fast, predictable, and easy to debug. Use them when the task structure is known in advance.
LLM-generated plans: The orchestrator LLM receives the task and generates a plan before executing it. More flexible, handles novel task structures, but adds latency and introduces failure modes if the plan is malformed.
import anthropic
import json
client = anthropic.Anthropic()
async def plan_and_execute(task: str, available_agents: list[str]) -> dict:
# Phase 1: Generate plan
planning_response = client.messages.create(
model="claude-opus-4-7",
max_tokens=2048,
system=f"""You are a task planning agent. Break down the given task into steps.
Available agents: {', '.join(available_agents)}
Respond with a JSON plan:
{{
"steps": [
{{"step": 1, "agent": "agent_name", "input": "what to send this agent", "depends_on": []}},
{{"step": 2, "agent": "agent_name", "input": "...", "depends_on": [1]}}
]
}}""",
messages=[{"role": "user", "content": f"Task: {task}"}]
)
plan = json.loads(planning_response.content[0].text)
# Phase 2: Execute plan
results = {}
for step in plan["steps"]:
# Wait for dependencies
deps_complete = all(dep in results for dep in step["depends_on"])
if not deps_complete:
await wait_for_dependencies(step["depends_on"], results)
# Build input incorporating dependency results
step_input = step["input"]
for dep in step["depends_on"]:
step_input += f"\n\nContext from step {dep}: {results[dep]}"
results[step["step"]] = await execute_agent(step["agent"], step_input)
return results
Hybrid: Fixed skeleton with LLM-filled steps. You define the high-level stages; the LLM decides what to do within each stage. More predictable than fully dynamic plans, more flexible than fully fixed pipelines.
Workflow Routing
Routing sends the task to the right agent or handler based on its content. Two approaches:
Classifier-based routing: A fast, cheap classifier (a smaller model or a fine-tuned classifier) determines the route. Zero latency from the primary model.
ROUTING_CLASSIFIER_PROMPT = """Classify this user request into exactly one category.
Categories: billing, technical_support, general_inquiry, escalation_required
Request: {request}
Respond with only the category name."""
async def route_request(request: str) -> str:
response = client.messages.create(
model="claude-haiku-4-5-20251001", # fast, cheap
max_tokens=20,
messages=[{
"role": "user",
"content": ROUTING_CLASSIFIER_PROMPT.format(request=request)
}]
)
return response.content[0].text.strip()
async def handle_request(request: str) -> str:
route = await route_request(request)
handlers = {
"billing": billing_agent,
"technical_support": tech_support_agent,
"general_inquiry": general_agent,
"escalation_required": escalation_handler,
}
handler = handlers.get(route, general_agent)
return await handler(request)
LLM-as-router: The primary model decides routing as part of its tool selection. Works when routing logic is too complex for a simple classifier.
Parallel Execution
When subtasks are independent, run them in parallel. This is the biggest latency win available in multi-agent systems.
import asyncio
async def parallel_research(topic: str) -> dict:
# These three can run simultaneously
search_task = asyncio.create_task(web_search_agent(topic))
arxiv_task = asyncio.create_task(arxiv_search_agent(topic))
news_task = asyncio.create_task(news_search_agent(topic))
# Wait for all to complete
search_results, arxiv_results, news_results = await asyncio.gather(
search_task, arxiv_task, news_task
)
# Synthesis requires all three results (sequential)
synthesis = await synthesis_agent({
"web": search_results,
"academic": arxiv_results,
"news": news_results,
})
return synthesis
Fan-out and gather: Send the same task to multiple agents with different approaches (different models, different prompts, different tools), then aggregate results. Useful for tasks where you want redundancy or multiple perspectives.
async def consensus_analysis(document: str) -> dict:
# Ask three different analyst agents
results = await asyncio.gather(
analyst_agent_a(document),
analyst_agent_b(document),
analyst_agent_c(document),
)
# Find consensus or flag disagreement
return await consensus_agent(results)
Human-in-the-Loop Patterns
Some actions should not execute without human confirmation. Irreversible operations, high-value transactions, actions outside the agent's normal scope.
Confirmation gate: Pause execution, surface the proposed action to the user, resume only on confirmation.
from dataclasses import dataclass
from typing import Callable, Any
import asyncio
@dataclass
class PendingAction:
action_id: str
description: str
execute_fn: Callable
args: dict
pending_actions: dict[str, PendingAction] = {}
approval_events: dict[str, asyncio.Event] = {}
approval_results: dict[str, bool] = {}
async def request_approval(action: PendingAction) -> bool:
pending_actions[action.action_id] = action
event = asyncio.Event()
approval_events[action.action_id] = event
# Notify user (via webhook, websocket, email, etc.)
await notify_user_of_pending_action(action)
# Wait for user decision (with timeout)
try:
await asyncio.wait_for(event.wait(), timeout=300) # 5 min timeout
return approval_results.get(action.action_id, False)
except asyncio.TimeoutError:
return False # deny on timeout
async def user_approves(action_id: str):
approval_results[action_id] = True
approval_events[action_id].set()
async def user_denies(action_id: str):
approval_results[action_id] = False
approval_events[action_id].set()
Checkpoint pattern: At defined points in a long workflow, pause and show the user the current state and proposed next steps.
Rollback support: For workflows that modify state, design for rollback. Before each state-changing action, record what will change and how to undo it.
Failure Recovery
Long-running workflows fail. The question is whether failure terminates the workflow or the workflow recovers.
Retry with backoff: For transient failures (API rate limits, network errors).
import asyncio
from typing import Callable, TypeVar
T = TypeVar("T")
async def retry_with_backoff(
fn: Callable,
max_attempts: int = 3,
base_delay: float = 1.0,
) -> T:
for attempt in range(max_attempts):
try:
return await fn()
except Exception as e:
if attempt == max_attempts - 1:
raise
delay = base_delay * (2 ** attempt)
await asyncio.sleep(delay)
Fallback agents: If the primary agent fails, route to a simpler fallback.
async def resilient_analysis(document: str) -> str:
try:
return await advanced_analysis_agent(document)
except AgentError:
# Fall back to simpler agent
return await basic_analysis_agent(document)
Checkpoint and resume: For workflows that may take hours, persist state at each step. On failure, resume from the last successful checkpoint rather than restarting from scratch.
import json
from pathlib import Path
async def checkpointed_workflow(workflow_id: str, task: dict) -> dict:
checkpoint_path = Path(f"/tmp/workflow_{workflow_id}.json")
# Load existing progress
if checkpoint_path.exists():
state = json.loads(checkpoint_path.read_text())
completed_steps = state.get("completed_steps", {})
else:
completed_steps = {}
steps = define_workflow_steps(task)
results = {}
for step_id, step_fn in steps.items():
if step_id in completed_steps:
results[step_id] = completed_steps[step_id]
continue # skip already-completed steps
result = await step_fn(results)
results[step_id] = result
# Save checkpoint after each step
completed_steps[step_id] = result
checkpoint_path.write_text(json.dumps({
"workflow_id": workflow_id,
"completed_steps": completed_steps,
}))
return results
Observability for Agentic Workflows
Multi-step workflows fail in subtle ways. A step produces wrong output, downstream steps use that output, the final result is wrong. Without tracing, you cannot see where it went wrong.
Instrument at the workflow level, not just the step level:
import time
import uuid
class WorkflowTracer:
def __init__(self, workflow_id: str):
self.workflow_id = workflow_id
self.trace: list[dict] = []
def record_step(self, step_id: str, input_summary: str,
output_summary: str, duration_ms: float, success: bool):
self.trace.append({
"workflow_id": self.workflow_id,
"step_id": step_id,
"input_summary": input_summary[:200],
"output_summary": output_summary[:200],
"duration_ms": duration_ms,
"success": success,
"timestamp": time.time(),
})
def export(self) -> list[dict]:
return self.trace
Log the workflow trace to your observability stack (Datadog, Grafana, or even just a structured log file). When something goes wrong, the trace shows exactly which step produced the wrong output and why.