By Latitude · April 9, 2026
Key Takeaways
AI observability is built on OpenTelemetry — if you already have OTel infrastructure, adding AI trace collection is an extension, not a replacement.
The critical requirement that differs from traditional OTel: session-level trace grouping. Agent spans must be connected by session ID so the full interaction is reconstructable, not just individual calls.
Full content capture (input and output values, not just metadata) is required for quality analysis. Design your data pipeline with this in mind — content volumes are significantly larger than metadata-only tracing.
AI observability sits alongside existing monitoring stacks — route AI spans to both your existing backend and the AI observability platform via OTel collector configuration.
PII redaction and data residency requirements apply to AI traces the same way they apply to any user data. Build these into the pipeline at the collection layer, not downstream.
For platform engineering teams, AI observability is primarily an instrumentation and data pipeline problem. The semantic analysis — issue clustering, annotation queues, eval generation — lives in the observability platform. Your job is to ensure the right data flows there reliably, completely, and with appropriate privacy controls.
This guide covers the instrumentation architecture, data pipeline design considerations, and integration patterns for platform teams building the foundation for AI quality management.
The Instrumentation Layer
OpenTelemetry as the standard
AI observability has converged on OpenTelemetry as the trace format standard. The GenAI semantic conventions define standardized attribute names for LLM calls, making it possible to build instrumentation that works across models and frameworks without vendor lock-in.
Key attributes to capture on every LLM span:
# Core GenAI semantic convention attributes
gen_ai.system # "openai", "anthropic", "google", etc.
gen_ai.request.model # "gpt-4o", "claude-3-5-sonnet", etc.
gen_ai.usage.input_tokens
gen_ai.usage.output_tokens
gen_ai.response.finish_reasons # "stop", "length", "tool_calls", etc.
# Content capture (required for quality analysis)
input.value # Full prompt / message array
output.value # Full completion text
# Cost tracking
gen_ai.usage.input_token_cost
gen_ai.usage.output_token_cost
# Core GenAI semantic convention attributes
gen_ai.system # "openai", "anthropic", "google", etc.
gen_ai.request.model # "gpt-4o", "claude-3-5-sonnet", etc.
gen_ai.usage.input_tokens
gen_ai.usage.output_tokens
gen_ai.response.finish_reasons # "stop", "length", "tool_calls", etc.
# Content capture (required for quality analysis)
input.value # Full prompt / message array
output.value # Full completion text
# Cost tracking
gen_ai.usage.input_token_cost
gen_ai.usage.output_token_cost
# Core GenAI semantic convention attributes
gen_ai.system # "openai", "anthropic", "google", etc.
gen_ai.request.model # "gpt-4o", "claude-3-5-sonnet", etc.
gen_ai.usage.input_tokens
gen_ai.usage.output_tokens
gen_ai.response.finish_reasons # "stop", "length", "tool_calls", etc.
# Content capture (required for quality analysis)
input.value # Full prompt / message array
output.value # Full completion text
# Cost tracking
gen_ai.usage.input_token_cost
gen_ai.usage.output_token_cost
For agent tool calls, add these to child spans:
# Tool call attributes
tool.name # Name of the tool/function called
tool.input # Full tool call arguments (JSON)
tool.output # Full tool response
tool.success # Boolean — did the tool call succeed?
tool.error # Error message if tool failed
# Session grouping (critical for agents)
session.id # Unique identifier connecting all spans in a session
# Tool call attributes
tool.name # Name of the tool/function called
tool.input # Full tool call arguments (JSON)
tool.output # Full tool response
tool.success # Boolean — did the tool call succeed?
tool.error # Error message if tool failed
# Session grouping (critical for agents)
session.id # Unique identifier connecting all spans in a session
# Tool call attributes
tool.name # Name of the tool/function called
tool.input # Full tool call arguments (JSON)
tool.output # Full tool response
tool.success # Boolean — did the tool call succeed?
tool.error # Error message if tool failed
# Session grouping (critical for agents)
session.id # Unique identifier connecting all spans in a session
Instrumentation patterns by framework
Direct OpenAI/Anthropic SDK: Wrap the client at the module level so every call is automatically captured without requiring instrumentation at every call site:
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
import functools
import time
def create_instrumented_client(base_client, tracer):
"""
Wraps an LLM client to automatically trace all completions.
Returns a proxy object with the same interface.
"""
class InstrumentedClient:
def __init__(self):
self.chat = InstrumentedChat()
class InstrumentedChat:
class completions:
@staticmethod
def create(**kwargs):
model = kwargs.get("model", "unknown")
with tracer.start_as_current_span(f"llm.{model}") as span:
span.set_attribute("gen_ai.system", "openai")
span.set_attribute("gen_ai.request.model", model)
span.set_attribute("input.value", str(kwargs.get("messages", [])))
start = time.time()
response = base_client.chat.completions.create(**kwargs)
latency_ms = (time.time() - start) * 1000
span.set_attribute("gen_ai.usage.input_tokens", response.usage.prompt_tokens)
span.set_attribute("gen_ai.usage.output_tokens", response.usage.completion_tokens)
span.set_attribute("output.value", response.choices[0].message.content or "")
span.set_attribute("gen_ai.response.finish_reasons", response.choices[0].finish_reason)
span.set_attribute("latency_ms", latency_ms)
return response
return InstrumentedClient()from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
import functools
import time
def create_instrumented_client(base_client, tracer):
"""
Wraps an LLM client to automatically trace all completions.
Returns a proxy object with the same interface.
"""
class InstrumentedClient:
def __init__(self):
self.chat = InstrumentedChat()
class InstrumentedChat:
class completions:
@staticmethod
def create(**kwargs):
model = kwargs.get("model", "unknown")
with tracer.start_as_current_span(f"llm.{model}") as span:
span.set_attribute("gen_ai.system", "openai")
span.set_attribute("gen_ai.request.model", model)
span.set_attribute("input.value", str(kwargs.get("messages", [])))
start = time.time()
response = base_client.chat.completions.create(**kwargs)
latency_ms = (time.time() - start) * 1000
span.set_attribute("gen_ai.usage.input_tokens", response.usage.prompt_tokens)
span.set_attribute("gen_ai.usage.output_tokens", response.usage.completion_tokens)
span.set_attribute("output.value", response.choices[0].message.content or "")
span.set_attribute("gen_ai.response.finish_reasons", response.choices[0].finish_reason)
span.set_attribute("latency_ms", latency_ms)
return response
return InstrumentedClient()from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.exporter.otlp.proto.http.trace_exporter import OTLPSpanExporter
from opentelemetry.sdk.trace.export import BatchSpanProcessor
import functools
import time
def create_instrumented_client(base_client, tracer):
"""
Wraps an LLM client to automatically trace all completions.
Returns a proxy object with the same interface.
"""
class InstrumentedClient:
def __init__(self):
self.chat = InstrumentedChat()
class InstrumentedChat:
class completions:
@staticmethod
def create(**kwargs):
model = kwargs.get("model", "unknown")
with tracer.start_as_current_span(f"llm.{model}") as span:
span.set_attribute("gen_ai.system", "openai")
span.set_attribute("gen_ai.request.model", model)
span.set_attribute("input.value", str(kwargs.get("messages", [])))
start = time.time()
response = base_client.chat.completions.create(**kwargs)
latency_ms = (time.time() - start) * 1000
span.set_attribute("gen_ai.usage.input_tokens", response.usage.prompt_tokens)
span.set_attribute("gen_ai.usage.output_tokens", response.usage.completion_tokens)
span.set_attribute("output.value", response.choices[0].message.content or "")
span.set_attribute("gen_ai.response.finish_reasons", response.choices[0].finish_reason)
span.set_attribute("latency_ms", latency_ms)
return response
return InstrumentedClient()LangChain: Use the OpenTelemetry callback handler, which automatically instruments all chain, agent, and tool calls:
from opentelemetry.instrumentation.langchain import LangchainInstrumentor
LangchainInstrumentor().instrument()
from opentelemetry.instrumentation.langchain import LangchainInstrumentor
LangchainInstrumentor().instrument()
from opentelemetry.instrumentation.langchain import LangchainInstrumentor
LangchainInstrumentor().instrument()
Custom agent frameworks: Instrument at the agent execution layer using context propagation to connect child spans across async operations:
from opentelemetry.propagate import inject, extract
from opentelemetry import context
import asyncio
class InstrumentedAgent:
def __init__(self, tracer, session_id: str):
self.tracer = tracer
self.session_id = session_id
async def run_session(self, initial_message: str) -> str:
"""Root span for the full agent session."""
with self.tracer.start_as_current_span("agent_session") as session_span:
session_span.set_attribute("session.id", self.session_id)
session_span.set_attribute("session.initial_message", initial_message)
result = await self._run_turns(initial_message, session_span)
session_span.set_attribute("session.turn_count", result["turn_count"])
session_span.set_attribute("session.completed", result["completed"])
return result["final_response"]
async def run_tool(self, tool_name: str, tool_input: dict) -> dict:
"""Child span for each tool call within a session."""
with self.tracer.start_as_current_span("tool_call") as tool_span:
tool_span.set_attribute("session.id", self.session_id)
tool_span.set_attribute("tool.name", tool_name)
tool_span.set_attribute("tool.input", str(tool_input))
try:
result = await self._execute_tool(tool_name, tool_input)
tool_span.set_attribute("tool.success", True)
tool_span.set_attribute("tool.output", str(result))
return result
except Exception as e:
tool_span.set_attribute("tool.success", False)
tool_span.set_attribute("tool.error", str(e))
raisefrom opentelemetry.propagate import inject, extract
from opentelemetry import context
import asyncio
class InstrumentedAgent:
def __init__(self, tracer, session_id: str):
self.tracer = tracer
self.session_id = session_id
async def run_session(self, initial_message: str) -> str:
"""Root span for the full agent session."""
with self.tracer.start_as_current_span("agent_session") as session_span:
session_span.set_attribute("session.id", self.session_id)
session_span.set_attribute("session.initial_message", initial_message)
result = await self._run_turns(initial_message, session_span)
session_span.set_attribute("session.turn_count", result["turn_count"])
session_span.set_attribute("session.completed", result["completed"])
return result["final_response"]
async def run_tool(self, tool_name: str, tool_input: dict) -> dict:
"""Child span for each tool call within a session."""
with self.tracer.start_as_current_span("tool_call") as tool_span:
tool_span.set_attribute("session.id", self.session_id)
tool_span.set_attribute("tool.name", tool_name)
tool_span.set_attribute("tool.input", str(tool_input))
try:
result = await self._execute_tool(tool_name, tool_input)
tool_span.set_attribute("tool.success", True)
tool_span.set_attribute("tool.output", str(result))
return result
except Exception as e:
tool_span.set_attribute("tool.success", False)
tool_span.set_attribute("tool.error", str(e))
raisefrom opentelemetry.propagate import inject, extract
from opentelemetry import context
import asyncio
class InstrumentedAgent:
def __init__(self, tracer, session_id: str):
self.tracer = tracer
self.session_id = session_id
async def run_session(self, initial_message: str) -> str:
"""Root span for the full agent session."""
with self.tracer.start_as_current_span("agent_session") as session_span:
session_span.set_attribute("session.id", self.session_id)
session_span.set_attribute("session.initial_message", initial_message)
result = await self._run_turns(initial_message, session_span)
session_span.set_attribute("session.turn_count", result["turn_count"])
session_span.set_attribute("session.completed", result["completed"])
return result["final_response"]
async def run_tool(self, tool_name: str, tool_input: dict) -> dict:
"""Child span for each tool call within a session."""
with self.tracer.start_as_current_span("tool_call") as tool_span:
tool_span.set_attribute("session.id", self.session_id)
tool_span.set_attribute("tool.name", tool_name)
tool_span.set_attribute("tool.input", str(tool_input))
try:
result = await self._execute_tool(tool_name, tool_input)
tool_span.set_attribute("tool.success", True)
tool_span.set_attribute("tool.output", str(result))
return result
except Exception as e:
tool_span.set_attribute("tool.success", False)
tool_span.set_attribute("tool.error", str(e))
raiseData Pipeline Design
Content volume considerations
AI traces that capture full input/output content are significantly larger than metadata-only traces. A single GPT-4o call with a 2,000-token prompt and 500-token response generates approximately 10KB of trace data. At 100,000 traces per day, that's ~1GB/day flowing through your trace pipeline — before you factor in multi-turn agent sessions, which can be 5–10x larger per session.
Design your pipeline with this in mind:
Use async span export (BatchSpanProcessor, not SimpleSpanProcessor) to avoid blocking on export
Implement sampling at the collection layer for high-volume, low-risk trace categories (e.g., sample 20% of successful nominal sessions but 100% of sessions with anomaly signals)
Separate the content storage path from the metadata path — metadata can flow to your existing backend; content can flow to the AI observability platform with its own retention policy
Routing traces to multiple backends
AI observability doesn't replace existing monitoring — it runs alongside it. Configure your OTel collector to route traces appropriately:
# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
# Route AI spans separately from infrastructure spans
filter/ai_spans:
traces:
span:
- 'attributes["gen_ai.system"] != nil'
filter/infra_spans:
traces:
span:
- 'attributes["gen_ai.system"] == nil'
# PII redaction for AI content
transform/redact_pii:
trace_statements:
- context: span
statements:
- replace_pattern(attributes["input.value"], "\\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\\.[A-Z]{2,}\\b", "[EMAIL]")
- replace_pattern(attributes["input.value"], "\\b\\d{3}-\\d{2}-\\d{4}\\b", "[SSN]")
exporters:
# Existing backend for infrastructure monitoring
otlp/datadog:
endpoint: ${DATADOG_OTLP_ENDPOINT}
# AI observability platform for quality analysis
otlp/latitude:
endpoint: https:
headers:
Authorization: "Bearer ${LATITUDE_API_KEY}"
service:
pipelines:
traces/infra:
receivers: [otlp]
processors: [filter/infra_spans]
exporters: [otlp/datadog]
traces/ai:
receivers: [otlp]
processors: [filter/ai_spans, transform/redact_pii]
exporters: [otlp/latitude, otlp/datadog]# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
# Route AI spans separately from infrastructure spans
filter/ai_spans:
traces:
span:
- 'attributes["gen_ai.system"] != nil'
filter/infra_spans:
traces:
span:
- 'attributes["gen_ai.system"] == nil'
# PII redaction for AI content
transform/redact_pii:
trace_statements:
- context: span
statements:
- replace_pattern(attributes["input.value"], "\\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\\.[A-Z]{2,}\\b", "[EMAIL]")
- replace_pattern(attributes["input.value"], "\\b\\d{3}-\\d{2}-\\d{4}\\b", "[SSN]")
exporters:
# Existing backend for infrastructure monitoring
otlp/datadog:
endpoint: ${DATADOG_OTLP_ENDPOINT}
# AI observability platform for quality analysis
otlp/latitude:
endpoint: https:
headers:
Authorization: "Bearer ${LATITUDE_API_KEY}"
service:
pipelines:
traces/infra:
receivers: [otlp]
processors: [filter/infra_spans]
exporters: [otlp/datadog]
traces/ai:
receivers: [otlp]
processors: [filter/ai_spans, transform/redact_pii]
exporters: [otlp/latitude, otlp/datadog]# otel-collector-config.yaml
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
# Route AI spans separately from infrastructure spans
filter/ai_spans:
traces:
span:
- 'attributes["gen_ai.system"] != nil'
filter/infra_spans:
traces:
span:
- 'attributes["gen_ai.system"] == nil'
# PII redaction for AI content
transform/redact_pii:
trace_statements:
- context: span
statements:
- replace_pattern(attributes["input.value"], "\\b[A-Z0-9._%+-]+@[A-Z0-9.-]+\\.[A-Z]{2,}\\b", "[EMAIL]")
- replace_pattern(attributes["input.value"], "\\b\\d{3}-\\d{2}-\\d{4}\\b", "[SSN]")
exporters:
# Existing backend for infrastructure monitoring
otlp/datadog:
endpoint: ${DATADOG_OTLP_ENDPOINT}
# AI observability platform for quality analysis
otlp/latitude:
endpoint: https:
headers:
Authorization: "Bearer ${LATITUDE_API_KEY}"
service:
pipelines:
traces/infra:
receivers: [otlp]
processors: [filter/infra_spans]
exporters: [otlp/datadog]
traces/ai:
receivers: [otlp]
processors: [filter/ai_spans, transform/redact_pii]
exporters: [otlp/latitude, otlp/datadog]PII and data residency
AI traces often contain user-generated content, which may include PII. Handle this at the collection layer — redact or hash PII before traces leave your infrastructure, not downstream in the observability platform. This ensures compliance regardless of what the observability platform does with the data.
For organizations with strict data residency requirements, check whether the AI observability platform supports self-hosted deployment. Latitude's self-hosted option is fully featured and free — it runs in your own infrastructure, so traces never leave your environment.
Agent Framework Integration Checklist
Before declaring instrumentation complete for an agent workflow, verify:
Session ID propagation: Every span belonging to the same agent session shares the same session identifier. Verify by pulling traces for a known multi-turn session and confirming all spans are connected.
Tool call capture: Every tool call creates a child span with tool name, full input, and full output. Don't truncate tool outputs — the full content is needed for tool response misinterpretation analysis.
Async context propagation: In async frameworks, trace context must be explicitly propagated across async boundaries. Verify that turns within a session are connected even when individual LLM calls are awaited.
Error capture: Exceptions within spans should be captured via span.record_exception() and span.set_status(StatusCode.ERROR). Verify error traces are appearing correctly in the observability platform.
Sampling configuration: Verify that your sampling strategy is correct — 100% sampling for anomaly-flagged sessions, reduced sampling for nominal sessions. Confirm that sampling decisions are made at the session level (not turn level), so partial sessions aren't ingested.
Frequently Asked Questions
How do platform engineering teams instrument AI for observability?
Platform engineering teams instrument AI observability using the OpenTelemetry (OTel) standard: each LLM call and agent action is captured as a span with standardized attributes (model, token counts, input/output values), and spans belonging to the same agent session are connected via a trace context. The key requirements specific to AI: (1) session-level trace grouping — agent spans must be connected by a session identifier so the full interaction is reconstructable; (2) full content capture — input and output values must be captured, not just metadata, because content is necessary for quality analysis; (3) tool call instrumentation — each tool call should be a child span with tool name, input parameters, and output captured.
How does AI observability integrate with existing observability stacks?
AI observability sits alongside, not replacing, existing application monitoring. The standard integration pattern: (1) Existing OTel infrastructure continues to send spans to your existing backend (Datadog, Honeycomb, Grafana, etc.) for infrastructure-level monitoring. (2) AI-specific spans are also routed — via an OTel collector or a parallel exporter — to an AI observability platform that has the semantic analysis capabilities standard observability tools don't provide. The split happens at the collector level: infrastructure spans stay in the existing backend; LLM and agent spans also flow to the AI observability platform.
Latitude accepts OTLP format traces natively and integrates with existing OTel infrastructure without requiring changes to your current monitoring stack. Self-hosted option available for data residency requirements. See documentation → or start for free →