By Latitude · April 9, 2026
Key Takeaways
For platform teams, AI evaluation is an infrastructure problem: eval data pipeline, eval execution service, and CI/CD integration — each with its own design requirements.
Eval datasets should be versioned like code: snapshot date, model version, failure mode coverage. A dataset that changes without tracking makes eval result attribution impossible.
The eval execution service needs to handle both rule-based and LLM-as-judge eval types, manage LLM API rate limits for judge calls, and return structured results in a format CI can parse.
Connecting the eval pipeline to the production observability stack — so failing production sessions automatically surface for annotation — is the integration that makes the eval suite grow from production data rather than requiring manual curation.
Sampling strategy belongs in the eval data pipeline, not in individual evaluators. Centralized sampling logic prevents inconsistent sampling and makes it easy to adjust rates across all evals.
For platform engineering teams, AI evaluation is an infrastructure build problem. The ML team specifies what needs to be evaluated; the platform team builds the data pipeline, execution service, and CI integration that makes those evaluations runnable at scale.
This guide covers the three infrastructure layers required for production AI evaluation — what each layer needs to do, the design decisions involved, and how they connect.
Layer 1: The Eval Data Pipeline
The eval data pipeline connects production traces to the eval dataset. It has four components:
1. Trace sampling and filtering
Not all production traces belong in the eval dataset. The sampling logic should:
Sample proportionally by failure mode category (if you have failure mode labels from the annotation pipeline)
Include traces selected by anomaly signals for failure mode discovery
Maintain a nominal sample (traces that represent good performance) to catch false positive regressions
Exclude traces with incomplete instrumentation (missing tool call spans, truncated sessions)
from dataclasses import dataclass
from typing import Optional
import random
@dataclass
class EvalSamplingConfig:
nominal_sample_rate: float = 0.02 # 2% of nominal sessions
anomaly_sample_rate: float = 0.50 # 50% of anomaly-flagged sessions
failure_mode_sample_rate: float = 1.0 # 100% of annotated failure examples
max_dataset_size: int = 2000
min_per_failure_mode: int = 20 # Ensure minimum coverage per category
def sample_for_eval_dataset(
traces: list[dict],
config: EvalSamplingConfig,
existing_dataset_counts: dict[str, int]
) -> list[dict]:
"""
Sample traces for eval dataset with stratified sampling by failure mode.
Prioritizes failure examples, then anomalies, then nominal.
"""
sampled = []
# Priority 1: annotated failure examples
failure_traces = [t for t in traces if t.get("annotation_label") == "fail"]
sampled.extend(failure_traces)
# Priority 2: anomaly-flagged traces not yet annotated
anomaly_traces = [
t for t in traces
if t.get("anomaly_score", 0) > 0.7
and t.get("annotation_label") is None
]
anomaly_sampled = random.sample(
anomaly_traces,
min(len(anomaly_traces), int(len(anomaly_traces) * config.anomaly_sample_rate))
)
sampled.extend(anomaly_sampled)
# Priority 3: nominal sample for false positive control
nominal_traces = [
t for t in traces
if t.get("annotation_label") is None
and t.get("anomaly_score", 0) <= 0.3
]
nominal_sampled = random.sample(
nominal_traces,
min(len(nominal_traces), int(len(nominal_traces) * config.nominal_sample_rate))
)
sampled.extend(nominal_sampled)
return sampled[:config.max_dataset_size]from dataclasses import dataclass
from typing import Optional
import random
@dataclass
class EvalSamplingConfig:
nominal_sample_rate: float = 0.02 # 2% of nominal sessions
anomaly_sample_rate: float = 0.50 # 50% of anomaly-flagged sessions
failure_mode_sample_rate: float = 1.0 # 100% of annotated failure examples
max_dataset_size: int = 2000
min_per_failure_mode: int = 20 # Ensure minimum coverage per category
def sample_for_eval_dataset(
traces: list[dict],
config: EvalSamplingConfig,
existing_dataset_counts: dict[str, int]
) -> list[dict]:
"""
Sample traces for eval dataset with stratified sampling by failure mode.
Prioritizes failure examples, then anomalies, then nominal.
"""
sampled = []
# Priority 1: annotated failure examples
failure_traces = [t for t in traces if t.get("annotation_label") == "fail"]
sampled.extend(failure_traces)
# Priority 2: anomaly-flagged traces not yet annotated
anomaly_traces = [
t for t in traces
if t.get("anomaly_score", 0) > 0.7
and t.get("annotation_label") is None
]
anomaly_sampled = random.sample(
anomaly_traces,
min(len(anomaly_traces), int(len(anomaly_traces) * config.anomaly_sample_rate))
)
sampled.extend(anomaly_sampled)
# Priority 3: nominal sample for false positive control
nominal_traces = [
t for t in traces
if t.get("annotation_label") is None
and t.get("anomaly_score", 0) <= 0.3
]
nominal_sampled = random.sample(
nominal_traces,
min(len(nominal_traces), int(len(nominal_traces) * config.nominal_sample_rate))
)
sampled.extend(nominal_sampled)
return sampled[:config.max_dataset_size]from dataclasses import dataclass
from typing import Optional
import random
@dataclass
class EvalSamplingConfig:
nominal_sample_rate: float = 0.02 # 2% of nominal sessions
anomaly_sample_rate: float = 0.50 # 50% of anomaly-flagged sessions
failure_mode_sample_rate: float = 1.0 # 100% of annotated failure examples
max_dataset_size: int = 2000
min_per_failure_mode: int = 20 # Ensure minimum coverage per category
def sample_for_eval_dataset(
traces: list[dict],
config: EvalSamplingConfig,
existing_dataset_counts: dict[str, int]
) -> list[dict]:
"""
Sample traces for eval dataset with stratified sampling by failure mode.
Prioritizes failure examples, then anomalies, then nominal.
"""
sampled = []
# Priority 1: annotated failure examples
failure_traces = [t for t in traces if t.get("annotation_label") == "fail"]
sampled.extend(failure_traces)
# Priority 2: anomaly-flagged traces not yet annotated
anomaly_traces = [
t for t in traces
if t.get("anomaly_score", 0) > 0.7
and t.get("annotation_label") is None
]
anomaly_sampled = random.sample(
anomaly_traces,
min(len(anomaly_traces), int(len(anomaly_traces) * config.anomaly_sample_rate))
)
sampled.extend(anomaly_sampled)
# Priority 3: nominal sample for false positive control
nominal_traces = [
t for t in traces
if t.get("annotation_label") is None
and t.get("anomaly_score", 0) <= 0.3
]
nominal_sampled = random.sample(
nominal_traces,
min(len(nominal_traces), int(len(nominal_traces) * config.nominal_sample_rate))
)
sampled.extend(nominal_sampled)
return sampled[:config.max_dataset_size]2. Dataset versioning
Eval datasets should be versioned by snapshot date, model version, and failure mode coverage. Store datasets as immutable, versioned artifacts — not mutable database tables. A dataset that changes without tracking makes it impossible to attribute changes in eval pass rates to model changes versus dataset changes.
import json
import hashlib
from datetime import datetime
from pathlib import Path
def snapshot_eval_dataset(
traces: list[dict],
model_version: str,
failure_modes_covered: list[str],
storage_path: Path
) -> dict:
"""
Create an immutable, versioned snapshot of an eval dataset.
Returns metadata dict with snapshot ID and storage path.
"""
snapshot_id = hashlib.sha256(
json.dumps({
"model_version": model_version,
"timestamp": datetime.utcnow().isoformat(),
"trace_ids": sorted(t["id"] for t in traces)
}, sort_keys=True).encode()
).hexdigest()[:12]
snapshot_dir = storage_path / f"snapshot-{snapshot_id}"
snapshot_dir.mkdir(parents=True, exist_ok=True)
# Write dataset
with open(snapshot_dir / "traces.jsonl", "w") as f:
for trace in traces:
f.write(json.dumps(trace) + "\n")
# Write metadata
metadata = {
"snapshot_id": snapshot_id,
"created_at": datetime.utcnow().isoformat(),
"model_version": model_version,
"failure_modes_covered": failure_modes_covered,
"trace_count": len(traces),
"path": str(snapshot_dir)
}
with open(snapshot_dir / "metadata.json", "w") as f:
json.dump(metadata, f, indent=2)
return metadataimport json
import hashlib
from datetime import datetime
from pathlib import Path
def snapshot_eval_dataset(
traces: list[dict],
model_version: str,
failure_modes_covered: list[str],
storage_path: Path
) -> dict:
"""
Create an immutable, versioned snapshot of an eval dataset.
Returns metadata dict with snapshot ID and storage path.
"""
snapshot_id = hashlib.sha256(
json.dumps({
"model_version": model_version,
"timestamp": datetime.utcnow().isoformat(),
"trace_ids": sorted(t["id"] for t in traces)
}, sort_keys=True).encode()
).hexdigest()[:12]
snapshot_dir = storage_path / f"snapshot-{snapshot_id}"
snapshot_dir.mkdir(parents=True, exist_ok=True)
# Write dataset
with open(snapshot_dir / "traces.jsonl", "w") as f:
for trace in traces:
f.write(json.dumps(trace) + "\n")
# Write metadata
metadata = {
"snapshot_id": snapshot_id,
"created_at": datetime.utcnow().isoformat(),
"model_version": model_version,
"failure_modes_covered": failure_modes_covered,
"trace_count": len(traces),
"path": str(snapshot_dir)
}
with open(snapshot_dir / "metadata.json", "w") as f:
json.dump(metadata, f, indent=2)
return metadataimport json
import hashlib
from datetime import datetime
from pathlib import Path
def snapshot_eval_dataset(
traces: list[dict],
model_version: str,
failure_modes_covered: list[str],
storage_path: Path
) -> dict:
"""
Create an immutable, versioned snapshot of an eval dataset.
Returns metadata dict with snapshot ID and storage path.
"""
snapshot_id = hashlib.sha256(
json.dumps({
"model_version": model_version,
"timestamp": datetime.utcnow().isoformat(),
"trace_ids": sorted(t["id"] for t in traces)
}, sort_keys=True).encode()
).hexdigest()[:12]
snapshot_dir = storage_path / f"snapshot-{snapshot_id}"
snapshot_dir.mkdir(parents=True, exist_ok=True)
# Write dataset
with open(snapshot_dir / "traces.jsonl", "w") as f:
for trace in traces:
f.write(json.dumps(trace) + "\n")
# Write metadata
metadata = {
"snapshot_id": snapshot_id,
"created_at": datetime.utcnow().isoformat(),
"model_version": model_version,
"failure_modes_covered": failure_modes_covered,
"trace_count": len(traces),
"path": str(snapshot_dir)
}
with open(snapshot_dir / "metadata.json", "w") as f:
json.dump(metadata, f, indent=2)
return metadataLayer 2: The Eval Execution Service
The eval execution service accepts a dataset snapshot and an eval suite configuration, runs each evaluator against each dataset item, and returns structured results.
Design requirements
Eval type support: Must handle rule-based evals (synchronous, fast) and LLM-as-judge evals (async, rate-limited). Don't block the execution queue on LLM calls.
Rate limit management: LLM-as-judge evals call an external LLM API. The service must implement backoff, retry, and rate limit management without exposing this complexity to eval authors.
Determinism for rule-based evals: Rule-based evals on the same input should always produce the same output. The service should cache rule-based results for the same (trace_id, eval_id) pair to avoid redundant computation.
Structured output: Results must be in a format that CI can parse — JSON with clear pass/fail verdicts and score values per eval per trace item.
import asyncio
import json
from typing import Union
from dataclasses import dataclass, asdict
@dataclass
class EvalResult:
trace_id: str
eval_id: str
verdict: bool # True = pass, False = fail
score: float # 0-1 confidence score
eval_type: str # "rule" or "llm_judge"
latency_ms: float
@dataclass
class EvalSuiteResult:
snapshot_id: str
model_version: str
eval_results: list[EvalResult]
summary: dict
def to_ci_format(self) -> dict:
"""Format results for CI consumption."""
evals_by_id = {}
for r in self.eval_results:
if r.eval_id not in evals_by_id:
evals_by_id[r.eval_id] = []
evals_by_id[r.eval_id].append(r.verdict)
return {
"passed": self.summary["overall_passed"],
"blocking_failures": self.summary.get("blocking_failures", []),
"eval_pass_rates": {
eval_id: sum(verdicts) / len(verdicts)
for eval_id, verdicts in evals_by_id.items()
},
"snapshot_id": self.snapshot_id,
"model_version": self.model_version
}
async def run_eval_suite(
dataset_path: str,
eval_configs: list[dict],
model_version: str,
max_concurrent_llm_calls: int = 5
) -> EvalSuiteResult:
"""
Run eval suite against a dataset snapshot.
Handles both rule-based and LLM-as-judge evals with appropriate concurrency.
"""
with open(dataset_path) as f:
traces = [json.loads(line) for line in f]
results = []
semaphore = asyncio.Semaphore(max_concurrent_llm_calls)
async def run_single_eval(trace, eval_config):
import time
start = time.time()
if eval_config["type"] == "rule":
# Rule-based: synchronous, no rate limiting needed
verdict = eval_config["fn"](trace)
score = 1.0 if verdict else 0.0
else:
# LLM judge: async with rate limiting
async with semaphore:
verdict, score = await eval_config["fn"](trace)
return EvalResult(
trace_id=trace["id"],
eval_id=eval_config["id"],
verdict=verdict,
score=score,
eval_type=eval_config["type"],
latency_ms=(time.time() - start) * 1000
)
tasks = [
run_single_eval(trace, eval_cfg)
for trace in traces
for eval_cfg in eval_configs
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions (log them separately)
valid_results = [r for r in results if isinstance(r, EvalResult)]
return EvalSuiteResult(
snapshot_id=dataset_path.split("/")[-2],
model_version=model_version,
eval_results=valid_results,
summary=compute_summary(valid_results, eval_configs)
)import asyncio
import json
from typing import Union
from dataclasses import dataclass, asdict
@dataclass
class EvalResult:
trace_id: str
eval_id: str
verdict: bool # True = pass, False = fail
score: float # 0-1 confidence score
eval_type: str # "rule" or "llm_judge"
latency_ms: float
@dataclass
class EvalSuiteResult:
snapshot_id: str
model_version: str
eval_results: list[EvalResult]
summary: dict
def to_ci_format(self) -> dict:
"""Format results for CI consumption."""
evals_by_id = {}
for r in self.eval_results:
if r.eval_id not in evals_by_id:
evals_by_id[r.eval_id] = []
evals_by_id[r.eval_id].append(r.verdict)
return {
"passed": self.summary["overall_passed"],
"blocking_failures": self.summary.get("blocking_failures", []),
"eval_pass_rates": {
eval_id: sum(verdicts) / len(verdicts)
for eval_id, verdicts in evals_by_id.items()
},
"snapshot_id": self.snapshot_id,
"model_version": self.model_version
}
async def run_eval_suite(
dataset_path: str,
eval_configs: list[dict],
model_version: str,
max_concurrent_llm_calls: int = 5
) -> EvalSuiteResult:
"""
Run eval suite against a dataset snapshot.
Handles both rule-based and LLM-as-judge evals with appropriate concurrency.
"""
with open(dataset_path) as f:
traces = [json.loads(line) for line in f]
results = []
semaphore = asyncio.Semaphore(max_concurrent_llm_calls)
async def run_single_eval(trace, eval_config):
import time
start = time.time()
if eval_config["type"] == "rule":
# Rule-based: synchronous, no rate limiting needed
verdict = eval_config["fn"](trace)
score = 1.0 if verdict else 0.0
else:
# LLM judge: async with rate limiting
async with semaphore:
verdict, score = await eval_config["fn"](trace)
return EvalResult(
trace_id=trace["id"],
eval_id=eval_config["id"],
verdict=verdict,
score=score,
eval_type=eval_config["type"],
latency_ms=(time.time() - start) * 1000
)
tasks = [
run_single_eval(trace, eval_cfg)
for trace in traces
for eval_cfg in eval_configs
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions (log them separately)
valid_results = [r for r in results if isinstance(r, EvalResult)]
return EvalSuiteResult(
snapshot_id=dataset_path.split("/")[-2],
model_version=model_version,
eval_results=valid_results,
summary=compute_summary(valid_results, eval_configs)
)import asyncio
import json
from typing import Union
from dataclasses import dataclass, asdict
@dataclass
class EvalResult:
trace_id: str
eval_id: str
verdict: bool # True = pass, False = fail
score: float # 0-1 confidence score
eval_type: str # "rule" or "llm_judge"
latency_ms: float
@dataclass
class EvalSuiteResult:
snapshot_id: str
model_version: str
eval_results: list[EvalResult]
summary: dict
def to_ci_format(self) -> dict:
"""Format results for CI consumption."""
evals_by_id = {}
for r in self.eval_results:
if r.eval_id not in evals_by_id:
evals_by_id[r.eval_id] = []
evals_by_id[r.eval_id].append(r.verdict)
return {
"passed": self.summary["overall_passed"],
"blocking_failures": self.summary.get("blocking_failures", []),
"eval_pass_rates": {
eval_id: sum(verdicts) / len(verdicts)
for eval_id, verdicts in evals_by_id.items()
},
"snapshot_id": self.snapshot_id,
"model_version": self.model_version
}
async def run_eval_suite(
dataset_path: str,
eval_configs: list[dict],
model_version: str,
max_concurrent_llm_calls: int = 5
) -> EvalSuiteResult:
"""
Run eval suite against a dataset snapshot.
Handles both rule-based and LLM-as-judge evals with appropriate concurrency.
"""
with open(dataset_path) as f:
traces = [json.loads(line) for line in f]
results = []
semaphore = asyncio.Semaphore(max_concurrent_llm_calls)
async def run_single_eval(trace, eval_config):
import time
start = time.time()
if eval_config["type"] == "rule":
# Rule-based: synchronous, no rate limiting needed
verdict = eval_config["fn"](trace)
score = 1.0 if verdict else 0.0
else:
# LLM judge: async with rate limiting
async with semaphore:
verdict, score = await eval_config["fn"](trace)
return EvalResult(
trace_id=trace["id"],
eval_id=eval_config["id"],
verdict=verdict,
score=score,
eval_type=eval_config["type"],
latency_ms=(time.time() - start) * 1000
)
tasks = [
run_single_eval(trace, eval_cfg)
for trace in traces
for eval_cfg in eval_configs
]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Filter out exceptions (log them separately)
valid_results = [r for r in results if isinstance(r, EvalResult)]
return EvalSuiteResult(
snapshot_id=dataset_path.split("/")[-2],
model_version=model_version,
eval_results=valid_results,
summary=compute_summary(valid_results, eval_configs)
)Layer 3: CI/CD Integration
The eval service should be callable from CI as a standard step. The CI step needs to:
Pull the latest eval dataset snapshot
Identify the candidate model version (from the current deployment)
Call the eval execution service
Compare results to the baseline (previous deployment's eval results)
Fail the build if blocking regressions are detected
Store results with the deployment metadata for historical comparison
# .github/workflows/eval-gate.yml
name: AI Eval Gate
on:
push:
paths:
- 'ai/**'
- 'prompts/**'
jobs:
eval:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run eval suite
env:
LATITUDE_API_KEY: ${{ secrets.LATITUDE_API_KEY }}
MODEL_VERSION: ${{ github.sha }}
run: |
python ci/run_evals.py \
--model-version $MODEL_VERSION \
--dataset-snapshot latest \
--output eval-results.json
- name: Check regression
run: |
python ci/check_regression.py \
--results eval-results.json \
--baseline-version $(cat .baseline-model-version) \
--fail-on blocking
- name: Store results
if: always()
run: |
python ci/store_eval_results.py \
--results eval-results.json \
--model-version $MODEL_VERSION# .github/workflows/eval-gate.yml
name: AI Eval Gate
on:
push:
paths:
- 'ai/**'
- 'prompts/**'
jobs:
eval:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run eval suite
env:
LATITUDE_API_KEY: ${{ secrets.LATITUDE_API_KEY }}
MODEL_VERSION: ${{ github.sha }}
run: |
python ci/run_evals.py \
--model-version $MODEL_VERSION \
--dataset-snapshot latest \
--output eval-results.json
- name: Check regression
run: |
python ci/check_regression.py \
--results eval-results.json \
--baseline-version $(cat .baseline-model-version) \
--fail-on blocking
- name: Store results
if: always()
run: |
python ci/store_eval_results.py \
--results eval-results.json \
--model-version $MODEL_VERSION# .github/workflows/eval-gate.yml
name: AI Eval Gate
on:
push:
paths:
- 'ai/**'
- 'prompts/**'
jobs:
eval:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Run eval suite
env:
LATITUDE_API_KEY: ${{ secrets.LATITUDE_API_KEY }}
MODEL_VERSION: ${{ github.sha }}
run: |
python ci/run_evals.py \
--model-version $MODEL_VERSION \
--dataset-snapshot latest \
--output eval-results.json
- name: Check regression
run: |
python ci/check_regression.py \
--results eval-results.json \
--baseline-version $(cat .baseline-model-version) \
--fail-on blocking
- name: Store results
if: always()
run: |
python ci/store_eval_results.py \
--results eval-results.json \
--model-version $MODEL_VERSIONFrequently Asked Questions
How do platform engineering teams build AI evaluation infrastructure?
Platform engineering teams building AI evaluation infrastructure need to deliver three capabilities: (1) Eval data pipeline — a system that samples production traces, applies filtering and sampling logic, stores eval datasets versioned by time and model version, and provides a query interface for ML engineers. (2) Eval execution infrastructure — a service that accepts an eval dataset and model configuration, runs each evaluator, and returns structured results. This service needs to handle both rule-based and LLM-as-judge eval types and manage API rate limits. (3) CI/CD integration — a CI step that calls the eval execution service, compares results to baseline, and fails the build if blocking regressions are detected.
How do you version and manage eval datasets for AI systems?
Eval datasets for AI systems should be versioned by: (1) snapshot date — when were these traces collected? (2) model version — which model was running when these traces were generated? (3) failure mode coverage — which categories are represented? Versioning by these dimensions lets you reconstruct historical evaluations and compare results across model versions on a fixed dataset. Dataset mutations should be tracked with git or similar so the dataset's history is auditable. A dataset that changes without tracking makes it impossible to attribute changes in eval pass rates to model changes vs. dataset changes.
Latitude provides managed infrastructure for the eval pipeline described in this guide — including production trace collection, anomaly-prioritized annotation queues, GEPA eval generation, and CI integration — so platform teams don't have to build it from scratch. See documentation → or start for free →
Related Blog Posts