Monitoring and Anomaly Detection for AI Systems
You cannot defend what you cannot see. Logging AI interactions, detecting attacks in production, alerting on agent behavior anomalies, and incident response.
The Observability Gap in AI Systems
Most organizations running AI systems have no idea what is happening inside them.
The request came in. The model produced output. The output went somewhere. Whether the model was manipulated, what data it accessed, what tools it called, whether the output was consistent with its intended behavior: unknown.
This is not unique to AI systems. Many traditional applications have the same observability gap. What makes it worse for AI is the non-determinism. The same input can produce different outputs. Behavior can shift with context in ways that are not obvious from logs. And when something goes wrong, the audit trail you need to reconstruct what happened may simply not exist.
Building monitoring for AI systems means deciding what to capture, how to detect anomalies in inherently variable behavior, and what to do when detection fires.
What to Log
Log at the interaction level, not just the application level.
Minimum viable AI interaction log:
import time
import uuid
from dataclasses import dataclass, asdict
from typing import Optional
import json
@dataclass
class AIInteractionLog:
interaction_id: str
session_id: str
user_id: Optional[str]
timestamp: float
# Request
model: str
input_tokens: int
system_prompt_hash: str # hash, not full content
user_input_length: int
user_input_hash: str # hash for correlation without storing content
# Response
output_tokens: int
stop_reason: str
latency_ms: float
# Tool use (if applicable)
tools_called: list[str]
tool_call_count: int
# Risk signals
input_risk_score: float
input_flags: list[str]
output_flags: list[str]
# Business context
endpoint: str
application: str
def log_interaction(
session_id: str,
user_id: Optional[str],
model: str,
system_prompt: str,
user_input: str,
response,
input_validation_result: dict,
output_validation_result: dict,
start_time: float,
) -> AIInteractionLog:
import hashlib
entry = AIInteractionLog(
interaction_id=str(uuid.uuid4()),
session_id=session_id,
user_id=user_id,
timestamp=time.time(),
model=model,
input_tokens=response.usage.input_tokens,
system_prompt_hash=hashlib.sha256(system_prompt.encode()).hexdigest()[:16],
user_input_length=len(user_input),
user_input_hash=hashlib.sha256(user_input.encode()).hexdigest()[:16],
output_tokens=response.usage.output_tokens,
stop_reason=response.stop_reason,
latency_ms=(time.time() - start_time) * 1000,
tools_called=[b.name for b in response.content if b.type == "tool_use"],
tool_call_count=sum(1 for b in response.content if b.type == "tool_use"),
input_risk_score=input_validation_result.get("risk_score", 0.0),
input_flags=input_validation_result.get("flags", []),
output_flags=output_validation_result.get("flags", []),
endpoint="/api/chat",
application="customer_support",
)
# Ship to your logging pipeline
print(json.dumps(asdict(entry)))
return entry
Do not log raw content in production unless you have strong data governance controls. Log hashes for correlation, metadata for anomaly detection. If you need full content for incident investigation, log encrypted and access-control it tightly.
Tool calls deserve their own log entries:
@dataclass
class ToolCallLog:
interaction_id: str
tool_name: str
params_hash: str # hash of params, not raw params (may contain sensitive data)
params_size: int
result_size: int
success: bool
error: Optional[str]
duration_ms: float
timestamp: float
Anomaly Detection Patterns
AI system behavior varies naturally. Effective anomaly detection works on patterns, not individual requests.
Token usage anomalies: Sudden spikes in input or output token counts can indicate context flooding or exfiltration attempts.
from collections import deque
import statistics
class TokenUsageMonitor:
def __init__(self, window_size: int = 100):
self.input_window = deque(maxlen=window_size)
self.output_window = deque(maxlen=window_size)
def record(self, input_tokens: int, output_tokens: int) -> dict:
self.input_window.append(input_tokens)
self.output_window.append(output_tokens)
alerts = []
if len(self.input_window) >= 10:
mean_input = statistics.mean(self.input_window)
if input_tokens > mean_input * 5:
alerts.append({
"type": "input_spike",
"current": input_tokens,
"baseline_mean": mean_input,
"ratio": input_tokens / mean_input,
})
if len(self.output_window) >= 10:
mean_output = statistics.mean(self.output_window)
if output_tokens > mean_output * 10:
alerts.append({
"type": "output_spike",
"current": output_tokens,
"baseline_mean": mean_output,
"ratio": output_tokens / mean_output,
})
return {"alerts": alerts}
Tool call sequence anomalies: Agents should have predictable tool call patterns for their function. Unexpected tool calls or unusual sequences indicate compromise.
EXPECTED_TOOL_PATTERNS = {
"customer_support": {
"allowed_tools": {"lookup_order", "lookup_product", "create_ticket"},
"max_tools_per_interaction": 5,
"unusual_sequences": [
["lookup_order", "send_email"], # support agent should not send email
["query_database", "web_request"], # unexpected combination
],
}
}
def check_tool_sequence(agent_type: str, tools_called: list[str]) -> list[str]:
config = EXPECTED_TOOL_PATTERNS.get(agent_type, {})
alerts = []
allowed = config.get("allowed_tools", set())
for tool in tools_called:
if allowed and tool not in allowed:
alerts.append(f"unexpected_tool:{tool}")
max_tools = config.get("max_tools_per_interaction", 20)
if len(tools_called) > max_tools:
alerts.append(f"excessive_tool_calls:{len(tools_called)}")
for unusual_seq in config.get("unusual_sequences", []):
if all(t in tools_called for t in unusual_seq):
alerts.append(f"unusual_sequence:{'+'.join(unusual_seq)}")
return alerts
Session-level behavioral drift: Compare current session behavior to baseline for the same user or use case.
def detect_session_drift(
current_session_metrics: dict,
baseline_metrics: dict,
thresholds: dict,
) -> list[str]:
alerts = []
for metric, current_value in current_session_metrics.items():
baseline = baseline_metrics.get(metric)
threshold = thresholds.get(metric, 3.0) # default 3x deviation triggers alert
if baseline and baseline > 0:
ratio = current_value / baseline
if ratio > threshold:
alerts.append(f"{metric}_drift:{ratio:.1f}x")
return alerts
Risk score trend: Rising risk scores over a session indicate an escalating attack.
class SessionRiskTracker:
def __init__(self, alert_threshold: float = 0.6):
self.scores: list[float] = []
self.alert_threshold = alert_threshold
def add_score(self, score: float) -> dict:
self.scores.append(score)
result = {"alerts": []}
# Single high-risk request
if score > 0.8:
result["alerts"].append({"type": "high_risk_request", "score": score})
# Trend detection (need at least 5 samples)
if len(self.scores) >= 5:
recent = self.scores[-5:]
avg_recent = sum(recent) / len(recent)
if avg_recent > self.alert_threshold:
result["alerts"].append({
"type": "elevated_session_risk",
"avg_score": avg_recent,
"sample_count": len(recent),
})
return result
Alerting and Escalation
Detection without response is just expensive logging.
Define escalation tiers:
| Tier | Condition | Response |
|---|---|---|
| Info | Single low-confidence flag | Log, no action |
| Warning | Pattern of flags, medium confidence | Log, rate limit session |
| Alert | High-confidence injection detected | Log, block session, notify |
| Critical | Confirmed tool abuse or data access anomaly | Log, terminate session, page on-call |
async def handle_detection_event(event: DetectionEvent):
if event.severity == "info":
await log_event(event)
elif event.severity == "warning":
await log_event(event)
await rate_limit_session(event.session_id, multiplier=0.5)
elif event.severity == "alert":
await log_event(event)
await block_session(event.session_id, reason=event.description)
await notify_security_team(event, channel="slack-ai-alerts")
elif event.severity == "critical":
await log_event(event)
await terminate_session(event.session_id)
await rollback_agent_state(event.session_id) # if your system supports it
await page_on_call(event)
Incident Response for AI Systems
When an AI incident occurs, the questions are different from traditional incidents.
Standard incident questions: What was compromised? Who accessed it? What did the attacker do?
AI-specific additional questions:
- Was the model manipulated, or did it behave as intended with a bad outcome?
- What was the injection vector? Direct user input or indirect from retrieved content?
- Did the model's actions persist beyond the session (database writes, emails sent)?
- Is the same injection possible at other entry points in the system?
- Is there contamination in memory or knowledge bases that needs to be flushed?
Containment actions specific to AI systems:
- Suspend the affected agent (not just the user session)
- Flush and rebuild any memory stores the agent wrote to during the incident
- Review all tool calls from the session with explicit authorization
- Roll back any database writes or external actions taken during the incident window
- Check for secondary agents that received output from the compromised agent
The full interaction log you built is the artifact you use for this reconstruction. Without it, you are guessing.