AI Defense
LLM output monitoring dashboard showing anomaly detection and drift signals
Defense

Monitoring LLM Outputs in Production: Anomaly Detection, Latency Alerting, and Output Drift

How to build a production observability stack for LLM outputs — covering anomaly detection pipelines, latency threshold alerting, output drift signals, and concrete alerting logic you can deploy today.

By Elena Vasquez · · 8 min read

Deploying a large language model to production is not the end of the security engineering work — it is the beginning of a different kind of work. Static controls like input guardrails and output filters operate on individual requests. Production monitoring operates across the full distribution of requests over time, catching the anomalies that no single-request check can see: a sudden spike in guardrail triggers that signals a coordinated attack, a slow drift in output length that suggests the system prompt is degrading under context pressure, or a latency regression that reveals a new prompt pattern hitting an expensive code path.

This post covers the three pillars of LLM output monitoring — anomaly detection, latency alerting, and output drift detection — with concrete implementation patterns for each.

Why LLM Monitoring Is Structurally Different

Traditional application monitoring tracks metrics with stable distributions: request rate, error rate, response time. LLM monitoring deals with distributions that shift continuously and often intentionally. The model’s outputs are high-dimensional (full text), semantically rich, and generated probabilistically — summary statistics like mean response length or average toxicity score are meaningful, but their baselines shift whenever the underlying model is updated, the system prompt changes, or the user population evolves.

This means LLM monitoring has two separate monitoring problems layered on top of each other:

  1. Detecting anomalies relative to the current baseline (something unusual is happening right now)
  2. Detecting when the baseline itself has shifted (the model’s behavior has changed over time)

Both require different tooling and different alert thresholds.

Pillar 1: Per-Request Anomaly Detection

The first monitoring layer fires per-request, in real time or with a short delay. Its job is to flag individual outputs that fall outside expected bounds.

Anomaly Detection Pipeline

import time
import statistics
from dataclasses import dataclass, field
from collections import deque
from typing import Optional

@dataclass
class OutputMetrics:
    request_id: str
    session_id: str
    timestamp: float
    input_tokens: int
    output_tokens: int
    latency_ms: float
    guardrail_triggered: bool
    guardrail_reason: Optional[str]
    toxicity_score: float
    output_length_chars: int
    contains_code: bool
    refusal_detected: bool

class RollingWindowStats:
    """Maintain rolling statistics for anomaly detection."""

    def __init__(self, window_size: int = 1000):
        self.window_size = window_size
        self._values: deque[float] = deque(maxlen=window_size)

    def add(self, value: float) -> None:
        self._values.append(value)

    def mean(self) -> float:
        return statistics.mean(self._values) if self._values else 0.0

    def stdev(self) -> float:
        return statistics.stdev(self._values) if len(self._values) >= 2 else 0.0

    def zscore(self, value: float) -> float:
        s = self.stdev()
        if s == 0:
            return 0.0
        return (value - self.mean()) / s

class AnomalyDetector:
    def __init__(self, zscore_threshold: float = 3.0):
        self.zscore_threshold = zscore_threshold
        self.latency_stats = RollingWindowStats()
        self.output_length_stats = RollingWindowStats()
        self.toxicity_stats = RollingWindowStats()

    def observe(self, metrics: OutputMetrics) -> list[str]:
        """Add an observation and return any anomaly flags."""
        anomalies = []

        # Check latency
        latency_z = self.latency_stats.zscore(metrics.latency_ms)
        self.latency_stats.add(metrics.latency_ms)
        if latency_z > self.zscore_threshold:
            anomalies.append(f"latency_spike:z={latency_z:.2f}")

        # Check output length
        length_z = self.output_length_stats.zscore(metrics.output_length_chars)
        self.output_length_stats.add(metrics.output_length_chars)
        if length_z > self.zscore_threshold:
            anomalies.append(f"output_length_spike:z={length_z:.2f}")

        # Check toxicity
        toxicity_z = self.toxicity_stats.zscore(metrics.toxicity_score)
        self.toxicity_stats.add(metrics.toxicity_score)
        if toxicity_z > self.zscore_threshold:
            anomalies.append(f"toxicity_spike:z={toxicity_z:.2f}")

        # Hard threshold: any guardrail trigger is an anomaly event
        if metrics.guardrail_triggered:
            anomalies.append(f"guardrail_trigger:{metrics.guardrail_reason}")

        return anomalies

The rolling z-score approach is simple and effective for most production volumes. A z-score above 3.0 means an observation is more than three standard deviations from the recent mean — rare under normal conditions but common during attacks or model regressions.

Guardrail Trigger Rate Monitoring

Individual guardrail triggers are expected. A sustained elevation in trigger rate is the signal that matters. Track trigger rate as a rolling percentage, not an absolute count:

from collections import deque
import time

class GuardrailRateMonitor:
    def __init__(self, window_seconds: int = 300, alert_threshold: float = 0.15):
        """Alert if more than 15% of requests trigger a guardrail in any 5-minute window."""
        self.window_seconds = window_seconds
        self.alert_threshold = alert_threshold
        self._events: deque[tuple[float, bool]] = deque()  # (timestamp, triggered)

    def record(self, triggered: bool) -> Optional[str]:
        now = time.time()
        self._events.append((now, triggered))

        # Drop events outside the window
        cutoff = now - self.window_seconds
        while self._events and self._events[0][0] < cutoff:
            self._events.popleft()

        if len(self._events) < 50:
            return None  # Not enough data

        trigger_rate = sum(1 for _, t in self._events if t) / len(self._events)
        if trigger_rate > self.alert_threshold:
            return f"ALERT: guardrail_rate={trigger_rate:.1%} over last {self.window_seconds}s"
        return None

A 15% guardrail trigger rate sustained over 5 minutes is a strong signal of either a coordinated attack (many users sending adversarial inputs) or a model regression (the model’s behavior has shifted such that legitimate requests are now triggering filters). The alert surfaces the condition; root cause analysis distinguishes between them.

Pillar 2: Latency Alerting

LLM latency is a security signal, not just a performance metric. Several attack patterns produce characteristic latency signatures:

import asyncio
from dataclasses import dataclass

@dataclass
class LatencyAlert:
    alert_type: str
    value_ms: float
    threshold_ms: float
    session_id: str
    timestamp: float

class LatencyAlerter:
    def __init__(
        self,
        p95_threshold_ms: float = 5000,
        p99_threshold_ms: float = 10000,
        ttft_threshold_ms: float = 2000,
    ):
        self.p95_threshold_ms = p95_threshold_ms
        self.p99_threshold_ms = p99_threshold_ms
        self.ttft_threshold_ms = ttft_threshold_ms
        self._latencies: deque[float] = deque(maxlen=500)

    def record_request(
        self,
        total_latency_ms: float,
        ttft_ms: float,
        session_id: str,
    ) -> list[LatencyAlert]:
        alerts = []
        now = time.time()

        self._latencies.append(total_latency_ms)

        # Compute rolling p95 and p99
        if len(self._latencies) >= 20:
            sorted_lat = sorted(self._latencies)
            p95 = sorted_lat[int(len(sorted_lat) * 0.95)]
            p99 = sorted_lat[int(len(sorted_lat) * 0.99)]

            if p95 > self.p95_threshold_ms:
                alerts.append(LatencyAlert("p95_breach", p95, self.p95_threshold_ms, session_id, now))
            if p99 > self.p99_threshold_ms:
                alerts.append(LatencyAlert("p99_breach", p99, self.p99_threshold_ms, session_id, now))

        # Per-request TTFT check
        if ttft_ms > self.ttft_threshold_ms:
            alerts.append(LatencyAlert("ttft_spike", ttft_ms, self.ttft_threshold_ms, session_id, now))

        return alerts

TTFT (time to first token) deserves special attention because it correlates specifically with input processing load. A normal request has a TTFT proportional to its input length. An anomalously high TTFT for a short input often indicates the model received a much larger context than the application logged — a sign that injection succeeded in injecting additional content into the prompt.

Pillar 3: Output Drift Detection

Output drift is slower than anomaly detection — it operates over hours or days, not seconds. The goal is to detect when the distribution of model outputs has shifted from its established baseline, which can indicate:

import numpy as np
from scipy import stats

class OutputDriftDetector:
    """
    Detects drift in output distribution using a sliding window comparison.
    Compares the current window against a reference window using statistical tests.
    """

    def __init__(
        self,
        reference_window: int = 5000,
        comparison_window: int = 500,
        drift_pvalue_threshold: float = 0.01,
    ):
        self.reference_window = reference_window
        self.comparison_window = comparison_window
        self.drift_pvalue_threshold = drift_pvalue_threshold

        # Track multiple output dimensions
        self._reference_lengths: deque[float] = deque(maxlen=reference_window)
        self._reference_toxicity: deque[float] = deque(maxlen=reference_window)
        self._reference_refusal_rate: deque[float] = deque(maxlen=reference_window)

        self._current_lengths: deque[float] = deque(maxlen=comparison_window)
        self._current_toxicity: deque[float] = deque(maxlen=comparison_window)
        self._current_refusal_rate: deque[float] = deque(maxlen=comparison_window)

    def add_to_reference(self, metrics: OutputMetrics) -> None:
        self._reference_lengths.append(metrics.output_length_chars)
        self._reference_toxicity.append(metrics.toxicity_score)
        self._reference_refusal_rate.append(1.0 if metrics.refusal_detected else 0.0)

    def observe(self, metrics: OutputMetrics) -> list[str]:
        self._current_lengths.append(metrics.output_length_chars)
        self._current_toxicity.append(metrics.toxicity_score)
        self._current_refusal_rate.append(1.0 if metrics.refusal_detected else 0.0)

        if (
            len(self._reference_lengths) < 100
            or len(self._current_lengths) < self.comparison_window
        ):
            return []

        drift_signals = []

        # Kolmogorov-Smirnov test for distribution shift in each dimension
        for label, ref, cur in [
            ("output_length", self._reference_lengths, self._current_lengths),
            ("toxicity", self._reference_toxicity, self._current_toxicity),
            ("refusal_rate", self._reference_refusal_rate, self._current_refusal_rate),
        ]:
            ks_stat, pvalue = stats.ks_2samp(list(ref), list(cur))
            if pvalue < self.drift_pvalue_threshold:
                ref_mean = np.mean(list(ref))
                cur_mean = np.mean(list(cur))
                drift_signals.append(
                    f"drift_detected:{label} ks={ks_stat:.3f} p={pvalue:.4f} "
                    f"ref_mean={ref_mean:.2f} cur_mean={cur_mean:.2f}"
                )

        return drift_signals

The Kolmogorov-Smirnov test compares two samples from the same distribution. A low p-value means the current window’s distribution is statistically unlikely to have come from the same distribution as the reference — that’s the drift signal. Track refusal rate drift carefully: a rising refusal rate without a corresponding guardrail trigger spike often means legitimate users are rephrasing requests in ways that trigger safety filters, which may indicate a system prompt that is becoming miscalibrated.

Putting It Together: Alert Routing

import logging
from enum import Enum

class AlertSeverity(Enum):
    INFO = "info"
    WARNING = "warning"
    CRITICAL = "critical"

def route_alert(signal: str, value: float) -> AlertSeverity:
    """Map monitoring signals to severity levels for alert routing."""
    if "guardrail_rate" in signal and value > 0.30:
        return AlertSeverity.CRITICAL   # >30% trigger rate: likely active attack
    elif "guardrail_rate" in signal and value > 0.15:
        return AlertSeverity.WARNING
    elif "p99_breach" in signal:
        return AlertSeverity.CRITICAL
    elif "drift_detected" in signal and "toxicity" in signal:
        return AlertSeverity.CRITICAL   # Toxicity drift is always high priority
    elif "drift_detected" in signal:
        return AlertSeverity.WARNING
    elif "anomaly" in signal:
        return AlertSeverity.WARNING
    return AlertSeverity.INFO

async def emit_alert(signal: str, severity: AlertSeverity, context: dict) -> None:
    logger = logging.getLogger("llm.monitoring")
    log_entry = {"signal": signal, "severity": severity.value, **context}

    if severity == AlertSeverity.CRITICAL:
        logger.critical(log_entry)
        await notify_oncall(log_entry)      # PagerDuty / OpsGenie
    elif severity == AlertSeverity.WARNING:
        logger.warning(log_entry)
        await notify_slack(log_entry)
    else:
        logger.info(log_entry)

The observability patterns for LLM monitoring integrate well with standard MLOps tooling — the guardml.io middleware libraries include pre-built metric exporters compatible with Prometheus and OpenTelemetry, making it straightforward to surface these signals in existing Grafana dashboards alongside your application metrics.

What to Log (and What Not To)

Logging strategy for LLM monitoring requires care because the content involved may be sensitive:

Log always: request ID, session ID, timestamp, input/output token counts, latency, guardrail trigger flag and reason code, toxicity score (numeric), output length, model version, system prompt version hash.

Log conditionally (behind a feature flag, with retention limits): truncated input/output for debugging guardrail false positives — never the full text in perpetuity.

Never log: full PII that appeared in inputs or was redacted from outputs, blocked content in its original form (logging blocked injections in full creates a log that contains the attack payloads), API keys or secrets even if the model reproduced them.

A hash of the session’s conversation content, stored alongside the numeric metrics, lets you correlate anomalies with specific conversations for investigation without storing the content itself. This balances debuggability against the security and compliance requirements of limiting where sensitive text is stored.

Effective monitoring does not prevent attacks — it makes attacks visible quickly enough to respond before significant harm occurs. Combined with the static filtering and architectural controls described elsewhere in this series, it gives you a complete production defense posture.

#llm-monitoring #mlops #anomaly-detection #output-drift #ai-defense
Subscribe

AI Defense — in your inbox

Defensive AI engineering — guardrails, hardening, response. — delivered when there's something worth your inbox.

No spam. Unsubscribe anytime.

Related

Comments