Monitoring LLM Outputs in Production: Anomaly Detection, Latency Alerting, and Output Drift
How to build a production observability stack for LLM outputs — covering anomaly detection pipelines, latency threshold alerting, output drift signals, and concrete alerting logic you can deploy today.
Deploying a large language model to production is not the end of the security engineering work — it is the beginning of a different kind of work. Static controls like input guardrails and output filters operate on individual requests. Production monitoring operates across the full distribution of requests over time, catching the anomalies that no single-request check can see: a sudden spike in guardrail triggers that signals a coordinated attack, a slow drift in output length that suggests the system prompt is degrading under context pressure, or a latency regression that reveals a new prompt pattern hitting an expensive code path.
This post covers the three pillars of LLM output monitoring — anomaly detection, latency alerting, and output drift detection — with concrete implementation patterns for each.
Why LLM Monitoring Is Structurally Different
Traditional application monitoring tracks metrics with stable distributions: request rate, error rate, response time. LLM monitoring deals with distributions that shift continuously and often intentionally. The model’s outputs are high-dimensional (full text), semantically rich, and generated probabilistically — summary statistics like mean response length or average toxicity score are meaningful, but their baselines shift whenever the underlying model is updated, the system prompt changes, or the user population evolves.
This means LLM monitoring has two separate monitoring problems layered on top of each other:
- Detecting anomalies relative to the current baseline (something unusual is happening right now)
- Detecting when the baseline itself has shifted (the model’s behavior has changed over time)
Both require different tooling and different alert thresholds.
Pillar 1: Per-Request Anomaly Detection
The first monitoring layer fires per-request, in real time or with a short delay. Its job is to flag individual outputs that fall outside expected bounds.
Anomaly Detection Pipeline
import time
import statistics
from dataclasses import dataclass, field
from collections import deque
from typing import Optional
@dataclass
class OutputMetrics:
request_id: str
session_id: str
timestamp: float
input_tokens: int
output_tokens: int
latency_ms: float
guardrail_triggered: bool
guardrail_reason: Optional[str]
toxicity_score: float
output_length_chars: int
contains_code: bool
refusal_detected: bool
class RollingWindowStats:
"""Maintain rolling statistics for anomaly detection."""
def __init__(self, window_size: int = 1000):
self.window_size = window_size
self._values: deque[float] = deque(maxlen=window_size)
def add(self, value: float) -> None:
self._values.append(value)
def mean(self) -> float:
return statistics.mean(self._values) if self._values else 0.0
def stdev(self) -> float:
return statistics.stdev(self._values) if len(self._values) >= 2 else 0.0
def zscore(self, value: float) -> float:
s = self.stdev()
if s == 0:
return 0.0
return (value - self.mean()) / s
class AnomalyDetector:
def __init__(self, zscore_threshold: float = 3.0):
self.zscore_threshold = zscore_threshold
self.latency_stats = RollingWindowStats()
self.output_length_stats = RollingWindowStats()
self.toxicity_stats = RollingWindowStats()
def observe(self, metrics: OutputMetrics) -> list[str]:
"""Add an observation and return any anomaly flags."""
anomalies = []
# Check latency
latency_z = self.latency_stats.zscore(metrics.latency_ms)
self.latency_stats.add(metrics.latency_ms)
if latency_z > self.zscore_threshold:
anomalies.append(f"latency_spike:z={latency_z:.2f}")
# Check output length
length_z = self.output_length_stats.zscore(metrics.output_length_chars)
self.output_length_stats.add(metrics.output_length_chars)
if length_z > self.zscore_threshold:
anomalies.append(f"output_length_spike:z={length_z:.2f}")
# Check toxicity
toxicity_z = self.toxicity_stats.zscore(metrics.toxicity_score)
self.toxicity_stats.add(metrics.toxicity_score)
if toxicity_z > self.zscore_threshold:
anomalies.append(f"toxicity_spike:z={toxicity_z:.2f}")
# Hard threshold: any guardrail trigger is an anomaly event
if metrics.guardrail_triggered:
anomalies.append(f"guardrail_trigger:{metrics.guardrail_reason}")
return anomalies
The rolling z-score approach is simple and effective for most production volumes. A z-score above 3.0 means an observation is more than three standard deviations from the recent mean — rare under normal conditions but common during attacks or model regressions.
Guardrail Trigger Rate Monitoring
Individual guardrail triggers are expected. A sustained elevation in trigger rate is the signal that matters. Track trigger rate as a rolling percentage, not an absolute count:
from collections import deque
import time
class GuardrailRateMonitor:
def __init__(self, window_seconds: int = 300, alert_threshold: float = 0.15):
"""Alert if more than 15% of requests trigger a guardrail in any 5-minute window."""
self.window_seconds = window_seconds
self.alert_threshold = alert_threshold
self._events: deque[tuple[float, bool]] = deque() # (timestamp, triggered)
def record(self, triggered: bool) -> Optional[str]:
now = time.time()
self._events.append((now, triggered))
# Drop events outside the window
cutoff = now - self.window_seconds
while self._events and self._events[0][0] < cutoff:
self._events.popleft()
if len(self._events) < 50:
return None # Not enough data
trigger_rate = sum(1 for _, t in self._events if t) / len(self._events)
if trigger_rate > self.alert_threshold:
return f"ALERT: guardrail_rate={trigger_rate:.1%} over last {self.window_seconds}s"
return None
A 15% guardrail trigger rate sustained over 5 minutes is a strong signal of either a coordinated attack (many users sending adversarial inputs) or a model regression (the model’s behavior has shifted such that legitimate requests are now triggering filters). The alert surfaces the condition; root cause analysis distinguishes between them.
Pillar 2: Latency Alerting
LLM latency is a security signal, not just a performance metric. Several attack patterns produce characteristic latency signatures:
- Context overflow probing: very long inputs that push the model near its context limit produce latency spikes
- Repeated retries by a filtered attacker: a burst of short-latency rejections followed by longer-latency attempts as the attacker iterates
- Prompt stuffing: adversarial inputs that include large chunks of text to dilute system prompt context produce elevated TTFT (time to first token)
import asyncio
from dataclasses import dataclass
@dataclass
class LatencyAlert:
alert_type: str
value_ms: float
threshold_ms: float
session_id: str
timestamp: float
class LatencyAlerter:
def __init__(
self,
p95_threshold_ms: float = 5000,
p99_threshold_ms: float = 10000,
ttft_threshold_ms: float = 2000,
):
self.p95_threshold_ms = p95_threshold_ms
self.p99_threshold_ms = p99_threshold_ms
self.ttft_threshold_ms = ttft_threshold_ms
self._latencies: deque[float] = deque(maxlen=500)
def record_request(
self,
total_latency_ms: float,
ttft_ms: float,
session_id: str,
) -> list[LatencyAlert]:
alerts = []
now = time.time()
self._latencies.append(total_latency_ms)
# Compute rolling p95 and p99
if len(self._latencies) >= 20:
sorted_lat = sorted(self._latencies)
p95 = sorted_lat[int(len(sorted_lat) * 0.95)]
p99 = sorted_lat[int(len(sorted_lat) * 0.99)]
if p95 > self.p95_threshold_ms:
alerts.append(LatencyAlert("p95_breach", p95, self.p95_threshold_ms, session_id, now))
if p99 > self.p99_threshold_ms:
alerts.append(LatencyAlert("p99_breach", p99, self.p99_threshold_ms, session_id, now))
# Per-request TTFT check
if ttft_ms > self.ttft_threshold_ms:
alerts.append(LatencyAlert("ttft_spike", ttft_ms, self.ttft_threshold_ms, session_id, now))
return alerts
TTFT (time to first token) deserves special attention because it correlates specifically with input processing load. A normal request has a TTFT proportional to its input length. An anomalously high TTFT for a short input often indicates the model received a much larger context than the application logged — a sign that injection succeeded in injecting additional content into the prompt.
Pillar 3: Output Drift Detection
Output drift is slower than anomaly detection — it operates over hours or days, not seconds. The goal is to detect when the distribution of model outputs has shifted from its established baseline, which can indicate:
- A system prompt change that altered model behavior unintentionally
- A model version update that changed output style or safety behavior
- A slow injection campaign that is gradually shifting the model’s outputs
- Seasonal/population shifts in user input that require prompt recalibration
import numpy as np
from scipy import stats
class OutputDriftDetector:
"""
Detects drift in output distribution using a sliding window comparison.
Compares the current window against a reference window using statistical tests.
"""
def __init__(
self,
reference_window: int = 5000,
comparison_window: int = 500,
drift_pvalue_threshold: float = 0.01,
):
self.reference_window = reference_window
self.comparison_window = comparison_window
self.drift_pvalue_threshold = drift_pvalue_threshold
# Track multiple output dimensions
self._reference_lengths: deque[float] = deque(maxlen=reference_window)
self._reference_toxicity: deque[float] = deque(maxlen=reference_window)
self._reference_refusal_rate: deque[float] = deque(maxlen=reference_window)
self._current_lengths: deque[float] = deque(maxlen=comparison_window)
self._current_toxicity: deque[float] = deque(maxlen=comparison_window)
self._current_refusal_rate: deque[float] = deque(maxlen=comparison_window)
def add_to_reference(self, metrics: OutputMetrics) -> None:
self._reference_lengths.append(metrics.output_length_chars)
self._reference_toxicity.append(metrics.toxicity_score)
self._reference_refusal_rate.append(1.0 if metrics.refusal_detected else 0.0)
def observe(self, metrics: OutputMetrics) -> list[str]:
self._current_lengths.append(metrics.output_length_chars)
self._current_toxicity.append(metrics.toxicity_score)
self._current_refusal_rate.append(1.0 if metrics.refusal_detected else 0.0)
if (
len(self._reference_lengths) < 100
or len(self._current_lengths) < self.comparison_window
):
return []
drift_signals = []
# Kolmogorov-Smirnov test for distribution shift in each dimension
for label, ref, cur in [
("output_length", self._reference_lengths, self._current_lengths),
("toxicity", self._reference_toxicity, self._current_toxicity),
("refusal_rate", self._reference_refusal_rate, self._current_refusal_rate),
]:
ks_stat, pvalue = stats.ks_2samp(list(ref), list(cur))
if pvalue < self.drift_pvalue_threshold:
ref_mean = np.mean(list(ref))
cur_mean = np.mean(list(cur))
drift_signals.append(
f"drift_detected:{label} ks={ks_stat:.3f} p={pvalue:.4f} "
f"ref_mean={ref_mean:.2f} cur_mean={cur_mean:.2f}"
)
return drift_signals
The Kolmogorov-Smirnov test compares two samples from the same distribution. A low p-value means the current window’s distribution is statistically unlikely to have come from the same distribution as the reference — that’s the drift signal. Track refusal rate drift carefully: a rising refusal rate without a corresponding guardrail trigger spike often means legitimate users are rephrasing requests in ways that trigger safety filters, which may indicate a system prompt that is becoming miscalibrated.
Putting It Together: Alert Routing
import logging
from enum import Enum
class AlertSeverity(Enum):
INFO = "info"
WARNING = "warning"
CRITICAL = "critical"
def route_alert(signal: str, value: float) -> AlertSeverity:
"""Map monitoring signals to severity levels for alert routing."""
if "guardrail_rate" in signal and value > 0.30:
return AlertSeverity.CRITICAL # >30% trigger rate: likely active attack
elif "guardrail_rate" in signal and value > 0.15:
return AlertSeverity.WARNING
elif "p99_breach" in signal:
return AlertSeverity.CRITICAL
elif "drift_detected" in signal and "toxicity" in signal:
return AlertSeverity.CRITICAL # Toxicity drift is always high priority
elif "drift_detected" in signal:
return AlertSeverity.WARNING
elif "anomaly" in signal:
return AlertSeverity.WARNING
return AlertSeverity.INFO
async def emit_alert(signal: str, severity: AlertSeverity, context: dict) -> None:
logger = logging.getLogger("llm.monitoring")
log_entry = {"signal": signal, "severity": severity.value, **context}
if severity == AlertSeverity.CRITICAL:
logger.critical(log_entry)
await notify_oncall(log_entry) # PagerDuty / OpsGenie
elif severity == AlertSeverity.WARNING:
logger.warning(log_entry)
await notify_slack(log_entry)
else:
logger.info(log_entry)
The observability patterns for LLM monitoring integrate well with standard MLOps tooling — the guardml.io ↗ middleware libraries include pre-built metric exporters compatible with Prometheus and OpenTelemetry, making it straightforward to surface these signals in existing Grafana dashboards alongside your application metrics.
What to Log (and What Not To)
Logging strategy for LLM monitoring requires care because the content involved may be sensitive:
Log always: request ID, session ID, timestamp, input/output token counts, latency, guardrail trigger flag and reason code, toxicity score (numeric), output length, model version, system prompt version hash.
Log conditionally (behind a feature flag, with retention limits): truncated input/output for debugging guardrail false positives — never the full text in perpetuity.
Never log: full PII that appeared in inputs or was redacted from outputs, blocked content in its original form (logging blocked injections in full creates a log that contains the attack payloads), API keys or secrets even if the model reproduced them.
A hash of the session’s conversation content, stored alongside the numeric metrics, lets you correlate anomalies with specific conversations for investigation without storing the content itself. This balances debuggability against the security and compliance requirements of limiting where sensitive text is stored.
Effective monitoring does not prevent attacks — it makes attacks visible quickly enough to respond before significant harm occurs. Combined with the static filtering and architectural controls described elsewhere in this series, it gives you a complete production defense posture.
AI Defense — in your inbox
Defensive AI engineering — guardrails, hardening, response. — delivered when there's something worth your inbox.
No spam. Unsubscribe anytime.
Related
Output Filtering Architecture for Production LLMs: Semantic Classifiers, Regex Guards, and LLM-as-Judge
A deep-dive into layered output filtering for production LLMs — combining semantic classifiers, regex scrubbing, and LLM-as-judge techniques to catch harmful, policy-violating, and hallucinated content before it reaches users or downstream systems.
Prompt Injection Prevention: System Prompt Hardening, Instruction Hierarchy, and Privilege Separation
A technical guide to preventing prompt injection attacks in production LLMs — covering system prompt hardening, privilege-separated architectures, instruction hierarchy, and defense-in-depth patterns with vulnerable vs. hardened code examples.
Red-Team Your Own LLM Before Attackers Do: Building an Internal Adversarial Testing Pipeline
How to build an internal adversarial testing pipeline for LLM applications using garak, promptfoo, and custom probes — with a CI integration pattern that catches security regressions before they reach production.