intermediate55 min7 min read

Agentic Workflows ; Planning, Routing, and Branching

Building production agentic systems: task planning, workflow routing, parallel execution, human-in-the-loop patterns, and failure recovery that actually works.

workflowplanningroutingparallelhitl

Beyond the Single-Agent Loop

A single agent running a ReAct loop handles a wide range of tasks well. It reasons, uses tools, observes results, and continues until done. That pattern breaks down at scale: when tasks are too large for one context window, when subtasks can run in parallel, when different subtasks need different specializations, or when the agent needs human confirmation before taking an action it cannot undo.

Agentic workflows are how you address these limits. Instead of one agent doing everything, you compose agents and routing logic into a system that handles complex, multi-step work reliably.

Task Planning Patterns

Fixed pipelines: You know exactly what steps are needed. Define them statically and execute them in sequence. No LLM involved in planning, only in execution.

async def document_analysis_pipeline(document_path: str) -> dict:
    # Step 1: Extract text
    text = await extract_text_tool(document_path)

    # Step 2: Classify document type
    doc_type = await classify_document(text)

    # Step 3: Route to specialist based on type
    if doc_type == "legal":
        analysis = await legal_analysis_agent(text)
    elif doc_type == "financial":
        analysis = await financial_analysis_agent(text)
    else:
        analysis = await general_analysis_agent(text)

    # Step 4: Generate summary
    summary = await summarize_agent(analysis)

    return {"type": doc_type, "analysis": analysis, "summary": summary}

Fixed pipelines are fast, predictable, and easy to debug. Use them when the task structure is known in advance.

LLM-generated plans: The orchestrator LLM receives the task and generates a plan before executing it. More flexible, handles novel task structures, but adds latency and introduces failure modes if the plan is malformed.

import anthropic
import json

client = anthropic.Anthropic()

async def plan_and_execute(task: str, available_agents: list[str]) -> dict:
    # Phase 1: Generate plan
    planning_response = client.messages.create(
        model="claude-opus-4-7",
        max_tokens=2048,
        system=f"""You are a task planning agent. Break down the given task into steps.
Available agents: {', '.join(available_agents)}

Respond with a JSON plan:
{{
  "steps": [
    {{"step": 1, "agent": "agent_name", "input": "what to send this agent", "depends_on": []}},
    {{"step": 2, "agent": "agent_name", "input": "...", "depends_on": [1]}}
  ]
}}""",
        messages=[{"role": "user", "content": f"Task: {task}"}]
    )

    plan = json.loads(planning_response.content[0].text)

    # Phase 2: Execute plan
    results = {}
    for step in plan["steps"]:
        # Wait for dependencies
        deps_complete = all(dep in results for dep in step["depends_on"])
        if not deps_complete:
            await wait_for_dependencies(step["depends_on"], results)

        # Build input incorporating dependency results
        step_input = step["input"]
        for dep in step["depends_on"]:
            step_input += f"\n\nContext from step {dep}: {results[dep]}"

        results[step["step"]] = await execute_agent(step["agent"], step_input)

    return results

Hybrid: Fixed skeleton with LLM-filled steps. You define the high-level stages; the LLM decides what to do within each stage. More predictable than fully dynamic plans, more flexible than fully fixed pipelines.

Workflow Routing

Routing sends the task to the right agent or handler based on its content. Two approaches:

Classifier-based routing: A fast, cheap classifier (a smaller model or a fine-tuned classifier) determines the route. Zero latency from the primary model.

ROUTING_CLASSIFIER_PROMPT = """Classify this user request into exactly one category.
Categories: billing, technical_support, general_inquiry, escalation_required

Request: {request}

Respond with only the category name."""

async def route_request(request: str) -> str:
    response = client.messages.create(
        model="claude-haiku-4-5-20251001",  # fast, cheap
        max_tokens=20,
        messages=[{
            "role": "user",
            "content": ROUTING_CLASSIFIER_PROMPT.format(request=request)
        }]
    )
    return response.content[0].text.strip()

async def handle_request(request: str) -> str:
    route = await route_request(request)

    handlers = {
        "billing": billing_agent,
        "technical_support": tech_support_agent,
        "general_inquiry": general_agent,
        "escalation_required": escalation_handler,
    }

    handler = handlers.get(route, general_agent)
    return await handler(request)

LLM-as-router: The primary model decides routing as part of its tool selection. Works when routing logic is too complex for a simple classifier.

Parallel Execution

When subtasks are independent, run them in parallel. This is the biggest latency win available in multi-agent systems.

import asyncio

async def parallel_research(topic: str) -> dict:
    # These three can run simultaneously
    search_task = asyncio.create_task(web_search_agent(topic))
    arxiv_task = asyncio.create_task(arxiv_search_agent(topic))
    news_task = asyncio.create_task(news_search_agent(topic))

    # Wait for all to complete
    search_results, arxiv_results, news_results = await asyncio.gather(
        search_task, arxiv_task, news_task
    )

    # Synthesis requires all three results (sequential)
    synthesis = await synthesis_agent({
        "web": search_results,
        "academic": arxiv_results,
        "news": news_results,
    })

    return synthesis

Fan-out and gather: Send the same task to multiple agents with different approaches (different models, different prompts, different tools), then aggregate results. Useful for tasks where you want redundancy or multiple perspectives.

async def consensus_analysis(document: str) -> dict:
    # Ask three different analyst agents
    results = await asyncio.gather(
        analyst_agent_a(document),
        analyst_agent_b(document),
        analyst_agent_c(document),
    )

    # Find consensus or flag disagreement
    return await consensus_agent(results)

Human-in-the-Loop Patterns

Some actions should not execute without human confirmation. Irreversible operations, high-value transactions, actions outside the agent's normal scope.

Confirmation gate: Pause execution, surface the proposed action to the user, resume only on confirmation.

from dataclasses import dataclass
from typing import Callable, Any
import asyncio

@dataclass
class PendingAction:
    action_id: str
    description: str
    execute_fn: Callable
    args: dict

pending_actions: dict[str, PendingAction] = {}
approval_events: dict[str, asyncio.Event] = {}
approval_results: dict[str, bool] = {}

async def request_approval(action: PendingAction) -> bool:
    pending_actions[action.action_id] = action
    event = asyncio.Event()
    approval_events[action.action_id] = event

    # Notify user (via webhook, websocket, email, etc.)
    await notify_user_of_pending_action(action)

    # Wait for user decision (with timeout)
    try:
        await asyncio.wait_for(event.wait(), timeout=300)  # 5 min timeout
        return approval_results.get(action.action_id, False)
    except asyncio.TimeoutError:
        return False  # deny on timeout

async def user_approves(action_id: str):
    approval_results[action_id] = True
    approval_events[action_id].set()

async def user_denies(action_id: str):
    approval_results[action_id] = False
    approval_events[action_id].set()

Checkpoint pattern: At defined points in a long workflow, pause and show the user the current state and proposed next steps.

Rollback support: For workflows that modify state, design for rollback. Before each state-changing action, record what will change and how to undo it.

Failure Recovery

Long-running workflows fail. The question is whether failure terminates the workflow or the workflow recovers.

Retry with backoff: For transient failures (API rate limits, network errors).

import asyncio
from typing import Callable, TypeVar

T = TypeVar("T")

async def retry_with_backoff(
    fn: Callable,
    max_attempts: int = 3,
    base_delay: float = 1.0,
) -> T:
    for attempt in range(max_attempts):
        try:
            return await fn()
        except Exception as e:
            if attempt == max_attempts - 1:
                raise
            delay = base_delay * (2 ** attempt)
            await asyncio.sleep(delay)

Fallback agents: If the primary agent fails, route to a simpler fallback.

async def resilient_analysis(document: str) -> str:
    try:
        return await advanced_analysis_agent(document)
    except AgentError:
        # Fall back to simpler agent
        return await basic_analysis_agent(document)

Checkpoint and resume: For workflows that may take hours, persist state at each step. On failure, resume from the last successful checkpoint rather than restarting from scratch.

import json
from pathlib import Path

async def checkpointed_workflow(workflow_id: str, task: dict) -> dict:
    checkpoint_path = Path(f"/tmp/workflow_{workflow_id}.json")

    # Load existing progress
    if checkpoint_path.exists():
        state = json.loads(checkpoint_path.read_text())
        completed_steps = state.get("completed_steps", {})
    else:
        completed_steps = {}

    steps = define_workflow_steps(task)
    results = {}

    for step_id, step_fn in steps.items():
        if step_id in completed_steps:
            results[step_id] = completed_steps[step_id]
            continue  # skip already-completed steps

        result = await step_fn(results)
        results[step_id] = result

        # Save checkpoint after each step
        completed_steps[step_id] = result
        checkpoint_path.write_text(json.dumps({
            "workflow_id": workflow_id,
            "completed_steps": completed_steps,
        }))

    return results

Observability for Agentic Workflows

Multi-step workflows fail in subtle ways. A step produces wrong output, downstream steps use that output, the final result is wrong. Without tracing, you cannot see where it went wrong.

Instrument at the workflow level, not just the step level:

import time
import uuid

class WorkflowTracer:
    def __init__(self, workflow_id: str):
        self.workflow_id = workflow_id
        self.trace: list[dict] = []

    def record_step(self, step_id: str, input_summary: str,
                    output_summary: str, duration_ms: float, success: bool):
        self.trace.append({
            "workflow_id": self.workflow_id,
            "step_id": step_id,
            "input_summary": input_summary[:200],
            "output_summary": output_summary[:200],
            "duration_ms": duration_ms,
            "success": success,
            "timestamp": time.time(),
        })

    def export(self) -> list[dict]:
        return self.trace

Log the workflow trace to your observability stack (Datadog, Grafana, or even just a structured log file). When something goes wrong, the trace shows exactly which step produced the wrong output and why.