advanced55 min6 min read

Monitoring and Anomaly Detection for AI Systems

You cannot defend what you cannot see. Logging AI interactions, detecting attacks in production, alerting on agent behavior anomalies, and incident response.

monitoringdetectionlogginganomalyincident-response

The Observability Gap in AI Systems

Most organizations running AI systems have no idea what is happening inside them.

The request came in. The model produced output. The output went somewhere. Whether the model was manipulated, what data it accessed, what tools it called, whether the output was consistent with its intended behavior: unknown.

This is not unique to AI systems. Many traditional applications have the same observability gap. What makes it worse for AI is the non-determinism. The same input can produce different outputs. Behavior can shift with context in ways that are not obvious from logs. And when something goes wrong, the audit trail you need to reconstruct what happened may simply not exist.

Building monitoring for AI systems means deciding what to capture, how to detect anomalies in inherently variable behavior, and what to do when detection fires.

What to Log

Log at the interaction level, not just the application level.

Minimum viable AI interaction log:

import time
import uuid
from dataclasses import dataclass, asdict
from typing import Optional
import json

@dataclass
class AIInteractionLog:
    interaction_id: str
    session_id: str
    user_id: Optional[str]
    timestamp: float

    # Request
    model: str
    input_tokens: int
    system_prompt_hash: str  # hash, not full content
    user_input_length: int
    user_input_hash: str     # hash for correlation without storing content

    # Response
    output_tokens: int
    stop_reason: str
    latency_ms: float

    # Tool use (if applicable)
    tools_called: list[str]
    tool_call_count: int

    # Risk signals
    input_risk_score: float
    input_flags: list[str]
    output_flags: list[str]

    # Business context
    endpoint: str
    application: str

def log_interaction(
    session_id: str,
    user_id: Optional[str],
    model: str,
    system_prompt: str,
    user_input: str,
    response,
    input_validation_result: dict,
    output_validation_result: dict,
    start_time: float,
) -> AIInteractionLog:
    import hashlib

    entry = AIInteractionLog(
        interaction_id=str(uuid.uuid4()),
        session_id=session_id,
        user_id=user_id,
        timestamp=time.time(),
        model=model,
        input_tokens=response.usage.input_tokens,
        system_prompt_hash=hashlib.sha256(system_prompt.encode()).hexdigest()[:16],
        user_input_length=len(user_input),
        user_input_hash=hashlib.sha256(user_input.encode()).hexdigest()[:16],
        output_tokens=response.usage.output_tokens,
        stop_reason=response.stop_reason,
        latency_ms=(time.time() - start_time) * 1000,
        tools_called=[b.name for b in response.content if b.type == "tool_use"],
        tool_call_count=sum(1 for b in response.content if b.type == "tool_use"),
        input_risk_score=input_validation_result.get("risk_score", 0.0),
        input_flags=input_validation_result.get("flags", []),
        output_flags=output_validation_result.get("flags", []),
        endpoint="/api/chat",
        application="customer_support",
    )

    # Ship to your logging pipeline
    print(json.dumps(asdict(entry)))

    return entry

Do not log raw content in production unless you have strong data governance controls. Log hashes for correlation, metadata for anomaly detection. If you need full content for incident investigation, log encrypted and access-control it tightly.

Tool calls deserve their own log entries:

@dataclass
class ToolCallLog:
    interaction_id: str
    tool_name: str
    params_hash: str       # hash of params, not raw params (may contain sensitive data)
    params_size: int
    result_size: int
    success: bool
    error: Optional[str]
    duration_ms: float
    timestamp: float

Anomaly Detection Patterns

AI system behavior varies naturally. Effective anomaly detection works on patterns, not individual requests.

Token usage anomalies: Sudden spikes in input or output token counts can indicate context flooding or exfiltration attempts.

from collections import deque
import statistics

class TokenUsageMonitor:
    def __init__(self, window_size: int = 100):
        self.input_window = deque(maxlen=window_size)
        self.output_window = deque(maxlen=window_size)

    def record(self, input_tokens: int, output_tokens: int) -> dict:
        self.input_window.append(input_tokens)
        self.output_window.append(output_tokens)

        alerts = []

        if len(self.input_window) >= 10:
            mean_input = statistics.mean(self.input_window)
            if input_tokens > mean_input * 5:
                alerts.append({
                    "type": "input_spike",
                    "current": input_tokens,
                    "baseline_mean": mean_input,
                    "ratio": input_tokens / mean_input,
                })

        if len(self.output_window) >= 10:
            mean_output = statistics.mean(self.output_window)
            if output_tokens > mean_output * 10:
                alerts.append({
                    "type": "output_spike",
                    "current": output_tokens,
                    "baseline_mean": mean_output,
                    "ratio": output_tokens / mean_output,
                })

        return {"alerts": alerts}

Tool call sequence anomalies: Agents should have predictable tool call patterns for their function. Unexpected tool calls or unusual sequences indicate compromise.

EXPECTED_TOOL_PATTERNS = {
    "customer_support": {
        "allowed_tools": {"lookup_order", "lookup_product", "create_ticket"},
        "max_tools_per_interaction": 5,
        "unusual_sequences": [
            ["lookup_order", "send_email"],   # support agent should not send email
            ["query_database", "web_request"], # unexpected combination
        ],
    }
}

def check_tool_sequence(agent_type: str, tools_called: list[str]) -> list[str]:
    config = EXPECTED_TOOL_PATTERNS.get(agent_type, {})
    alerts = []

    allowed = config.get("allowed_tools", set())
    for tool in tools_called:
        if allowed and tool not in allowed:
            alerts.append(f"unexpected_tool:{tool}")

    max_tools = config.get("max_tools_per_interaction", 20)
    if len(tools_called) > max_tools:
        alerts.append(f"excessive_tool_calls:{len(tools_called)}")

    for unusual_seq in config.get("unusual_sequences", []):
        if all(t in tools_called for t in unusual_seq):
            alerts.append(f"unusual_sequence:{'+'.join(unusual_seq)}")

    return alerts

Session-level behavioral drift: Compare current session behavior to baseline for the same user or use case.

def detect_session_drift(
    current_session_metrics: dict,
    baseline_metrics: dict,
    thresholds: dict,
) -> list[str]:
    alerts = []

    for metric, current_value in current_session_metrics.items():
        baseline = baseline_metrics.get(metric)
        threshold = thresholds.get(metric, 3.0)  # default 3x deviation triggers alert

        if baseline and baseline > 0:
            ratio = current_value / baseline
            if ratio > threshold:
                alerts.append(f"{metric}_drift:{ratio:.1f}x")

    return alerts

Risk score trend: Rising risk scores over a session indicate an escalating attack.

class SessionRiskTracker:
    def __init__(self, alert_threshold: float = 0.6):
        self.scores: list[float] = []
        self.alert_threshold = alert_threshold

    def add_score(self, score: float) -> dict:
        self.scores.append(score)
        result = {"alerts": []}

        # Single high-risk request
        if score > 0.8:
            result["alerts"].append({"type": "high_risk_request", "score": score})

        # Trend detection (need at least 5 samples)
        if len(self.scores) >= 5:
            recent = self.scores[-5:]
            avg_recent = sum(recent) / len(recent)

            if avg_recent > self.alert_threshold:
                result["alerts"].append({
                    "type": "elevated_session_risk",
                    "avg_score": avg_recent,
                    "sample_count": len(recent),
                })

        return result

Alerting and Escalation

Detection without response is just expensive logging.

Define escalation tiers:

Tier Condition Response
Info Single low-confidence flag Log, no action
Warning Pattern of flags, medium confidence Log, rate limit session
Alert High-confidence injection detected Log, block session, notify
Critical Confirmed tool abuse or data access anomaly Log, terminate session, page on-call
async def handle_detection_event(event: DetectionEvent):
    if event.severity == "info":
        await log_event(event)

    elif event.severity == "warning":
        await log_event(event)
        await rate_limit_session(event.session_id, multiplier=0.5)

    elif event.severity == "alert":
        await log_event(event)
        await block_session(event.session_id, reason=event.description)
        await notify_security_team(event, channel="slack-ai-alerts")

    elif event.severity == "critical":
        await log_event(event)
        await terminate_session(event.session_id)
        await rollback_agent_state(event.session_id)  # if your system supports it
        await page_on_call(event)

Incident Response for AI Systems

When an AI incident occurs, the questions are different from traditional incidents.

Standard incident questions: What was compromised? Who accessed it? What did the attacker do?

AI-specific additional questions:

  • Was the model manipulated, or did it behave as intended with a bad outcome?
  • What was the injection vector? Direct user input or indirect from retrieved content?
  • Did the model's actions persist beyond the session (database writes, emails sent)?
  • Is the same injection possible at other entry points in the system?
  • Is there contamination in memory or knowledge bases that needs to be flushed?

Containment actions specific to AI systems:

  • Suspend the affected agent (not just the user session)
  • Flush and rebuild any memory stores the agent wrote to during the incident
  • Review all tool calls from the session with explicit authorization
  • Roll back any database writes or external actions taken during the incident window
  • Check for secondary agents that received output from the compromised agent

The full interaction log you built is the artifact you use for this reconstruction. Without it, you are guessing.