HarnessAgent
Production-grade, multi-tenant agent harness for building, running, observing, and self-improving AI agents. HarnessAgent wraps any agent framework (LangGraph, CrewAI, AutoGen) or can run native SQL/Code agents with a full production lifecycle: paged context management, semantic memory, tool safety, distributed tracing, and automated self-improvement.
What it gives you
Run lifecycle
Create → execute → checkpoint → resume → emit SSE events → stream to client. Every run is tracked in Redis with full state including HITL pauses and budget enforcement.
Context engineering
Automatic page-based offloading, LLM/extractive compression, semantic cold-page retrieval, per-skill namespace isolation, action scoring, and sub-agent context bridging.
Full span tracing
Every run produces a hierarchical span tree (RUN → LLM/TOOL/GUARDRAIL) stored in Redis + JSONL, queryable via GET /runs/{id}/trace.
Self-improvement
Hermes loop: collect failures → LLM generates patch → eval on replay → auto-apply + regression rollback. Online monitor tracks per-version metrics.
Architecture
HarnessAgent is structured as a layered service. External requests enter through the FastAPI layer and pass through the orchestration layer to individual agents, which interact with memory, tools, and observability systems.
Quick Start
Install
pip install harnessagent[vector,observe,mcp]
# Or with all extras:
pip install harnessagent[all]bash
Run a single native agent
from harness.core.context import AgentContext
from harness.agents.sql_agent import SQLAgent
from harness.memory.manager import MemoryManager
from harness.llm.router import LLMRouter
from pathlib import Path
# Build components
memory = await MemoryManager.create(config)
llm = LLMRouter()
llm.register(anthropic_provider, priority=0)
agent = SQLAgent(
llm_router=llm,
memory_manager=memory,
tool_registry=tool_registry,
safety_pipeline=None,
step_tracer=None,
mlflow_tracer=None,
failure_tracker=failure_tracker,
audit_logger=audit_logger,
event_bus=event_bus,
cost_tracker=cost_tracker,
checkpoint_manager=checkpoint_manager,
trace_recorder=trace_recorder, # ← new: durable span tree
)
ctx = AgentContext.create(
tenant_id="acme",
agent_type="sql",
task="List all tables and their row counts",
memory=memory,
workspace_path=Path("/workspaces/acme/run1"),
)
result = await agent.run(ctx)
print(result.output, result.cost_usd, result.steps)python
Wrap an existing LangGraph agent
import harness
from langgraph.graph import StateGraph
graph = StateGraph(...) # your existing graph
adapter = harness.wrap(graph)
adapter.attach_harness(
safety_pipeline=my_pipeline,
cost_tracker=cost_tracker,
audit_logger=audit_logger,
)
async for event in adapter.run_with_harness(ctx, {"input": "analyze sales data"}):
print(event.event_type, event.payload)python
Start the API server
uvicorn harness.api.main:create_app --factory --host 0.0.0.0 --port 8000
# Or via Makefile:
make apibash
Configuration
All settings are loaded from environment variables or a .env file via harness.core.config.Settings (Pydantic BaseSettings). Access the singleton via get_config().
| Variable | Default | Description |
|---|---|---|
ANTHROPIC_API_KEY | — | Anthropic API key (required for Claude models) |
OPENAI_API_KEY | — | OpenAI API key (optional) |
REDIS_URL | redis://localhost:6379 | Redis connection URL for all state |
VECTOR_BACKEND | chroma | Vector store: chroma | qdrant | weaviate |
GRAPH_BACKEND | networkx | Graph store: networkx | neo4j |
EMBEDDING_MODEL | all-MiniLM-L6-v2 | SentenceTransformer model name |
MLFLOW_TRACKING_URI | sqlite:///mlflow.db | MLflow tracking server URI |
OTEL_EXPORTER_ENDPOINT | — | OTLP gRPC endpoint (optional) |
HERMES_AUTO_APPLY | false | Auto-apply Hermes patches without human review |
HERMES_INTERVAL_SECONDS | 3600 | How often Hermes cycle runs (seconds) |
HERMES_MIN_ERRORS_TO_TRIGGER | 5 | Min failures before Hermes generates a patch |
HERMES_PATCH_SCORE_THRESHOLD | 0.7 | Min eval score to apply a patch (0–1) |
JWT_SECRET_KEY | — | JWT signing secret for API auth |
ENVIRONMENT | dev | dev | staging | prod |
WORKSPACE_BASE_PATH | /workspaces | Base path for agent workspaces |
SQL_CONNECTION_STRING | — | SQLAlchemy connection string for SQL agent |
Core — Context & Events
Defined in harness/core/context.py. These are the fundamental data objects that flow through every part of the system.
AgentContext
Mutable per-run state shared across all components. Created once per run and passed everywhere.
| Field | Type | Description |
|---|---|---|
run_id | str | Auto-generated UUID hex, unique per run |
tenant_id | str | Tenant identifier for multi-tenancy |
agent_type | str | "sql" | "code" | custom |
task | str | Natural language task description |
trace_id | str | UUID hex for distributed trace correlation |
step_count | int | Current step number (incremented by tick()) |
token_count | int | Cumulative tokens used |
max_steps | int | Step budget (default 50) |
max_tokens | int | Token budget (default 100,000) |
timeout_seconds | float | Wall-clock timeout (default 300 s) |
failed | bool | Set to True when run fails |
failure_class | str|None | FailureClass value when failed |
metadata | dict | Arbitrary caller metadata; also used for HITL manager injection |
StepEvent
Emitted at each significant moment in an agent run. Published to EventBus (Redis Pub/Sub) for SSE streaming.
| event_type | Payload fields | When emitted |
|---|---|---|
started | task, agent_type | Run begins |
llm_call | model, provider, input_tokens, output_tokens, cached, tool_calls count | After each LLM response |
tool_call | tool_id, tool_name, args, is_error, error | After each tool execution |
token_delta | delta (string chunk) | During streaming LLM response |
completed | output, elapsed_seconds | Successful completion |
failed | error, failure_class, elapsed_seconds | Any fatal error |
budget_exceeded | failure_class | Step/token/time budget hit |
Core — Protocols
Defined in harness/core/protocols.py. Structural Protocol ABCs that all pluggable components must satisfy. Use isinstance(x, LLMProvider) for runtime checking.
LLMProvider
provider_name: str, model: strVectorStore
ToolExecutor
name, description, input_schema, timeout_secondsGraphStore
Core — Error Hierarchy
All harness errors extend HarnessError and carry a canonical FailureClass enum value. The failure class drives HTTP status codes, Prometheus labels, and Hermes sampling.
| FailureClass | Exception | HTTP Status |
|---|---|---|
LLM_ERROR / LLM_TIMEOUT / LLM_RATE_LIMIT | LLMError | 502 / 504 / 429 |
TOOL_NOT_FOUND / TOOL_SCHEMA_ERROR | ToolError | 400 |
TOOL_EXEC_ERROR / TOOL_TIMEOUT | ToolError | 500 |
MCP_CONNECT_ERROR / MCP_TOOL_ERROR | HarnessError | 502 |
SAFETY_INPUT / SAFETY_STEP / SAFETY_OUTPUT | SafetyViolation | 400 |
BUDGET_STEPS / BUDGET_TOKENS / BUDGET_TIME | BudgetExceeded | 429 |
MEMORY_REDIS / MEMORY_VECTOR / MEMORY_GRAPH | HarnessError | 500 |
INTER_AGENT_TIMEOUT / INTER_AGENT_REJECT | HITLRejected | 403 |
UNKNOWN | HarnessError | 500 |
Using failure classes: StepFailure.from_exception(exc, run_id, step_number, agent_type) automatically captures the stack trace and classifies the exception into the correct FailureClass.
BaseAgent — Run Lifecycle
Defined in harness/agents/base.py. All concrete agents inherit from BaseAgent. The run(ctx) method orchestrates the full production lifecycle.
Constructor Parameters
| Parameter | Type | Required | Purpose |
|---|---|---|---|
llm_router | LLMRouter | Yes | Health-aware, circuit-breaking LLM dispatch |
memory_manager | MemoryManager | Yes | Conversation, vector, graph memory |
tool_registry | ToolRegistry | Yes | Tool lookup, validation, safety, execution |
safety_pipeline | SafetyPipeline | No | Input/output safety checking |
step_tracer | StepTracer | No | OpenTelemetry span export |
trace_recorder | TraceRecorder | No | Durable span tree in Redis + JSONL |
failure_tracker | FailureTracker | No | Failure classification + Hermes signal |
event_bus | EventBus | No | Redis Pub/Sub for SSE streaming |
cost_tracker | CostTracker | No | Per-call USD cost recording |
checkpoint_manager | CheckpointManager | No | Atomic save/restore for crash recovery |
Run loop steps
- Open
RUNtrace span, emitstartedevent, begin MLflow run - Resume from checkpoint if one exists (
_maybe_resume_checkpoint) - Loop while
ctx.is_budget_ok():- Fit history to context window (compress if > 40 messages)
- GraphRAG/vector retrieve — build retrieval context string
- Build LLM messages + system prompt
- Open
LLMtrace span → call LLM → register token usage → close span - Open
GUARDRAILspan → run safety pipeline → close span - If no tool calls → extract final answer → break
- HITL approval check (sequential)
- Execute all tool calls in parallel, each in a
TOOLspan - Checkpoint every 10 steps
- Close
RUNspan (OK or ERROR), emitcompleted/failedevent - Return
AgentResult
Methods to override in subclasses
CodeAgent
harness/agents/code_agent.py — Specialized for Python code writing, debugging, and iterative improvement.
agent_type: "code"
Workflow: Understand → Write clean Python with type hints → Lint with ruff → Run code → Iterate on failures.
Default tools: run_python, lint_code, read_file, write_file, apply_patch, list_workspace
SQLAgent
harness/agents/sql_agent.py — Specialized for SQL queries, schema exploration, and data analysis.
agent_type: "sql"
Schema pre-population: Before the main loop, _populate_schema(ctx) introspects the database and stores table/column nodes in the knowledge graph. This enables GraphRAG to surface schema context automatically in subsequent queries.
Default tools: list_tables, describe_table, sample_rows, execute_sql
Framework Adapters
harness/adapters/ — Wraps external agent frameworks with the harness production lifecycle.
import harness
# Auto-detect from object type
adapter = harness.wrap(my_langgraph_graph) # → LangGraphAdapter
adapter = harness.wrap(my_crew) # → CrewAIAdapter
adapter = harness.wrap(my_autogen_agent) # → AutoGenAdapter
# Inject production components
adapter.attach_harness(safety_pipeline, cost_tracker, audit_logger)
adapter.attach_mcp(mcp_client, tool_names=["search", "execute"])python
| Adapter | Detects | Execution model |
|---|---|---|
LangGraphAdapter | LangGraph StateGraph or CompiledGraph | astream() — yields events per node execution |
CrewAIAdapter | CrewAI Crew object | kickoff_async() — yields events per agent turn |
AutoGenAdapter | AutoGen ConversableAgent | initiate_chat() — yields events per message |
MemoryManager
harness/memory/manager.py — Unified interface to all memory tiers. Injected into every agent via AgentContext.memory.
Memory tiers
Short-term (Redis LIST)
Per-run conversation history. LPUSH newest-first. Configurable last_n window.
Context engine (Redis + Vector)
Paged context with semantic cold-page retrieval. Offloads when hot window exceeds 80% capacity.
Vector store (long-term)
PII-redacted text embeddings. Chroma / Qdrant / Weaviate backends. Hybrid search (alpha-blended).
Knowledge graph
RDF-style triples (subject, predicate, object). NetworkX (dev) or Neo4j (prod). Used by GraphRAG.
Factory
memory = await MemoryManager.create(config, llm_provider=llm)
# Builds: ShortTermMemory + EmbeddingProvider + VectorStore
# + GraphMemory + ContextWindowManager + ContextEnginepython
Key methods
ContextEngine
harness/memory/context_engine.py — Paged, skill-isolated context management with automatic offload and semantic retrieval.
Core idea: The hot window (Redis LIST) holds recent messages per (run_id, skill_ns). When it exceeds 80% of max_hot_tokens, the oldest ~2 000 tokens are compressed, embedded, and evicted to the vector store as a ContextPage. Before each LLM call, relevant cold pages are semantically retrieved and re-injected as system messages.
Offload pipeline
- Check hot window size (O(1) Redis LLEN)
- If count <
_HOT_MAX_MSGS(200), skip - Compute hot token usage (scan all raw items)
- If < 80% of max_hot_tokens, skip
- Walk from oldest to newest until ~2 000 tokens collected
- Compress (LLM preferred, extractive fallback)
- Embed summary → store in vector DB with metadata
- Store ContextPage in Redis HASH + ZSET (scored by importance)
- LTRIM hot list to remove offloaded messages
Importance scoring
Cold pages with higher importance are kept longer. Score starts at 0.5 and bumps for:
- +0.10 tool messages
- +0.15 error/failed content
- +0.08 result/found/success content
Action scoring formula
composite = 0.5 × goal_progress
+ 0.3 × tool_relevance
+ 0.2 × confidence
goal_progress = min(1, keyword_overlap(response, goal) × 2)
tool_relevance = 0.9 (success + result > 20 chars)
| 0.6 (success, no result)
| 0.5 (no tool)
| 0.1 (error)
confidence = 0.6 − 0.08 per hedging phrase
+ 0.08 per affirming phraseformula
Key constants
| Constant | Value | Description |
|---|---|---|
_PAGE_TOKEN_TARGET | 2,000 | Target tokens per offloaded page |
_PAGE_TTL | 86,400 s | 24h TTL for cold pages in Redis |
_HOT_MAX_MSGS | 200 | Message count that triggers offload check |
_ACTIONS_TTL | 86,400 s | 24h TTL for action records |
GraphRAG Engine
harness/memory/graph_rag.py — Weighted multi-hop graph retrieval with vector bridging. Used by SQLAgent (schema) and any knowledge-intensive workload.
Edge weights
| Edge type | Weight | Meaning |
|---|---|---|
joins | 1.5 | SQL JOIN relationship between tables |
used_by_query | 1.2 | Table used in a past successful query |
references | 1.0 | FK reference between tables |
has_column | 0.8 | Table → column relationship |
occurred_in | 0.6 | Error occurred in query |
| (default) | 0.5 | Generic relationship |
Retrieval strategy (priority order)
- Extract entities from query (regex patterns for table/column names)
- Find matching graph nodes (
find_nodes(entities)) - If no anchors found → vector-to-graph bridging: embed query, find vector hits, use as graph seeds
- BFS from anchor nodes, score paths by cumulative edge weight
- Render top-N paths as compact context:
[SCHEMA],[JOINS],[PAST QUERIES],[PAST ERRORS] - Supplement with raw vector hits if graph coverage is thin
Semantic LLM Cache
harness/llm/cache.py — Cosine-similarity cache for LLM responses, keyed by message embedding.
| Parameter | Default | Description |
|---|---|---|
threshold | 0.97 | Cosine similarity threshold for cache hit |
ttl | 3600 s | Cache entry lifetime |
| Redis key | harness:llm_cache:{tenant}:{hash} | HASH per entry |
Trace System — Overview
HarnessAgent produces a hierarchical span tree for every agent run. Spans are stored in Redis (48 h TTL) and appended to logs/runs/{run_id}/trace.jsonl.
Span hierarchy
The span stack is tracked automatically per run_id inside TraceRecorder._stacks. Callers never need to pass parent_span_id — it is always the top of the stack.
TraceSpan Schema
harness/observability/trace_schema.py
| Field | Type | Description |
|---|---|---|
trace_id | str | Matches AgentContext.trace_id — shared across all spans in one run tree |
span_id | str | 16 hex chars, unique per span |
parent_span_id | str|None | None for root span; set automatically by recorder stack |
run_id | str | AgentContext.run_id |
kind | SpanKind | RUN | AGENT | LLM | TOOL | GUARDRAIL | MEMORY | HANDOFF | EVAL |
name | str | e.g. "llm:claude-3-5-sonnet", "tool:execute_sql" |
status | SpanStatus | RUNNING | OK | ERROR |
start_time | datetime | UTC timestamp |
end_time | datetime|None | Null while RUNNING |
duration_ms | float|None | Wall-clock milliseconds |
input_preview | str | First 500 chars of input |
output_preview | str | First 500 chars of output |
error | str|None | Error message on ERROR status |
input_tokens | int | LLM input tokens (LLM spans only) |
output_tokens | int | LLM output tokens (LLM spans only) |
cost_usd | float | USD cost (LLM spans only) |
cached | bool | True if response came from cache |
agent_type | str | Copied from AgentContext |
step | int | AgentContext.step_count at span open time |
TraceRecorder
harness/observability/trace_recorder.py
Persistence sinks
| Sink | Key | TTL | Purpose |
|---|---|---|---|
| Redis HASH | harness:span:{span_id} | 48 h | Live query by span_id |
| Redis ZSET | harness:trace:{run_id} | 48 h | Span index ordered by start_time |
| JSONL file | logs/runs/{run_id}/trace.jsonl | Forever | Durable audit trail |
Usage
recorder = TraceRecorder.create(redis_url=cfg.redis_url, log_dir="logs")
# Context manager (recommended)
async with recorder.span(run_id, SpanKind.TOOL, "tool:execute_sql", ctx,
input_preview=str(args)) as span_id:
result = await registry.execute(ctx, call)
# Manual control
span_id = await recorder.start_span(run_id, SpanKind.LLM, "llm:call", ctx)
recorder.set_llm_usage(span_id, input_tokens=200, output_tokens=100, cost_usd=0.003)
await recorder.end_span(run_id, span_id)
# Query
trace = await recorder.get_trace(run_id) # → TraceView | None
span = await recorder.get_span(span_id) # → TraceSpan | Nonepython
set_llm_usage must be called before the span context manager exits, or before end_span. It registers pending token/cost data that end_span picks up. If not called, token counts default to 0.
Metrics & Audit
Prometheus metrics (harness/observability/metrics.py)
| Metric | Type | Labels |
|---|---|---|
harness_agent_steps_total | Counter | agent_type, tenant_id, status |
harness_agent_tokens_total | Counter | agent_type, provider, token_type |
harness_tool_calls_total | Counter | tool_name, agent_type, status |
harness_safety_blocks_total | Counter | guard, agent_type, stage |
harness_active_runs | Gauge | agent_type |
harness_llm_request_duration_seconds | Histogram | provider, model |
harness_cost_usd_total | Counter | tenant_id, model |
harness_hermes_patches_total | Counter | agent_type, status |
AuditLogger (harness/observability/audit.py)
Append-only log for compliance. Writes to {workspace_base}/audit/{tenant_id}/{date}.jsonl and Redis stream harness:audit.
Actions tracked: tool_call, memory_write/read, llm_call, agent_start/complete, hitl_request/resolve, safety_block, patch_apply
PII safety: payloads are SHA-256 hashed before storage — content is never stored, only fingerprints.
Event Bus (SSE)
harness/observability/event_bus.py — Redis Pub/Sub channels for real-time streaming via Server-Sent Events.
Channel: harness:events:{run_id}
# BaseAgent emits
await event_bus.publish(StepEvent.llm_called(ctx, response))
# Client subscribes (used by SSE route)
async for event in event_bus.subscribe(run_id):
print(event.event_type, event.payload)
# From the browser:
const source = new EventSource(`/runs/${runId}/stream`)
source.onmessage = (e) => console.log(JSON.parse(e.data))python / js
ToolRegistry
harness/tools/registry.py — Central tool management with validation, safety, timeout, and audit.
Execution pipeline
- Lookup — find tool by name; raise
TOOL_NOT_FOUNDif missing - Schema validation — validate args against
input_schema; raiseTOOL_SCHEMA_ERRORif invalid - Safety check — run through safety_pipeline; raise
SAFETY_STEPif blocked - Timeout — wrap execution in
asyncio.timeout(tool.timeout_seconds) - Execute — call
tool.execute(ctx, args) - Audit log — record in AuditLogger
- Metrics — increment
tool_calls_total
Built-in tools
| Tool name | Module | Description |
|---|---|---|
run_python | code_tools | Execute Python in DockerSandbox or RestrictedPython |
lint_code | code_tools | Run ruff linter on code string |
read_file | file_tools | Read file from workspace (path traversal blocked) |
write_file | file_tools | Write file to workspace |
list_tables | sql_tools | List all tables in connected database |
describe_table | sql_tools | Get column schema and types for a table |
execute_sql | sql_tools | Execute SELECT query (read-only mode enforced) |
sample_rows | sql_tools | Get first N rows from a table |
MCP Client
harness/tools/mcp_client.py — Connect to any MCP-compatible server (stdio or SSE) and expose its tools in the ToolRegistry.
from harness.tools.mcp_client import MCPToolAdapter, load_mcp_servers_from_config
# From YAML config
adapters = load_mcp_servers_from_config("mcp_servers.yaml")
for adapter in adapters:
tools = await adapter.connect()
for tool in tools:
registry.register(tool)
# Manual
adapter = MCPToolAdapter(MCPServerConfig(
name="search",
transport="stdio",
command=["npx", "-y", "@modelcontextprotocol/server-brave-search"],
env={"BRAVE_API_KEY": "${BRAVE_API_KEY}"},
))
tools = await adapter.connect()python
YAML config format
servers:
search:
transport: stdio
command: ["npx", "-y", "@modelcontextprotocol/server-brave-search"]
env: {BRAVE_API_KEY: "${BRAVE_API_KEY}"}
filesystem:
transport: stdio
command: ["npx", "-y", "@modelcontextprotocol/server-filesystem", "/data"]yaml
Skills
harness/tools/skills.py — Named, versioned, composable agent capabilities. Skills bundle a system prompt fragment, required tools, and usage examples.
from harness.tools.skills import Skill, SkillRegistry
registry = SkillRegistry()
registry.register(Skill(
name="sql_analysis",
version="1.0.0",
description="Analyze SQL databases",
system_prompt="You are an expert SQL analyst...",
required_tools=["list_tables", "describe_table", "execute_sql"],
tags=["sql", "analytics"],
))
# Compose multiple skills
combined = registry.compose("sql_analysis", "data_visualization")
# Filter by tag
analytics_skills = registry.list_for_tags(["analytics"])python
Execution Sandbox
harness/filesystem/sandbox.py — Isolated code execution. Docker is the primary sandbox; RestrictedPython is the fallback.
DockerSandbox security controls
| Control | Setting | Effect |
|---|---|---|
| Memory limit | 256 MiB | OOM kill if exceeded |
| CPU limit | 1 core | No runaway CPU consumption |
| Network | Disabled by default | No outbound connections |
| User | nobody | Non-root execution |
| Privileges | --no-new-privileges | Cannot escalate |
| Timeout | 30 s | Terminated if exceeded |
| Filesystem | Workspace mount only | Cannot access host filesystem |
AgentRunner
harness/orchestrator/runner.py — Creates, executes, and tracks agent runs. The main component called by API routes.
RunRecord states
↘ failed • cancelled
Planner & Scheduler
Multi-agent DAG decomposition and parallel execution.
Planner (harness/orchestrator/planner.py)
Takes a complex task, calls an LLM to decompose it into a JSON array of SubTask objects with dependency declarations. Validates the DAG (cycle detection via Kahn's algorithm).
planner = Planner(llm_provider=llm)
plan = await planner.plan(
task="Build a sales dashboard: fetch data, analyze it, generate report",
available_agents=["sql", "code", "research"],
)
# Returns TaskPlan with SubTasks:
# {id: "fetch", agent_type: "sql", depends_on: []}
# {id: "analyze", agent_type: "code", depends_on: ["fetch"]}
# {id: "report", agent_type: "code", depends_on: ["analyze"]}python
Scheduler (harness/orchestrator/scheduler.py)
Executes the plan by repeatedly finding ready tasks (deps satisfied) and running them in parallel with asyncio.gather + semaphore back-pressure.
- Parallelism:
max_concurrent=10semaphore - Retry:
max_retries=1per subtask, exponential backoff - Handoff enrichment: predecessor outputs injected into dependent task context
- Failure propagation: tasks with failed deps are skipped and marked failed
Human-in-the-Loop (HITL)
harness/orchestrator/hitl.py — Pause agent execution and wait for human approval before executing sensitive tool calls.
# In agent context metadata
ctx.metadata["hitl_manager"] = hitl_manager
ctx.metadata["policy"] = policy # policy.requires_hitl(tool_name) → bool
# During tool execution, BaseAgent calls:
request = await hitl_manager.request_approval(ctx, tool_name, tool_args)
decision = await hitl_manager.await_decision(request.request_id, timeout=3600.0)
# "approved" → continue; "rejected"/"expired" → raise HITLRejectedpython
Pending requests are stored in Redis sorted set harness:hitl:pending and exposed via GET /hitl/pending.
Inter-Agent Messaging
harness/messaging/bus.py — Redis Streams-based messaging between agents in multi-agent systems.
| Message type | Direction | Description |
|---|---|---|
task | Parent → Child | Delegate work to child agent |
result | Child → Parent | Return completed work |
error | Any | Error notification |
query | Any | Request information |
status | Any | Progress update |
heartbeat | Any | Keep-alive signal |
LLM Router
harness/llm/router.py — Health-aware, context-window-aware routing across multiple LLM providers with per-provider circuit breaking.
Routing algorithm
- Sort registered providers by priority (lower = higher priority)
- Skip providers where
context_window < required_context - Skip providers with open circuit breakers
- Try each provider in priority order
- On retryable error (LLM_RATE_LIMIT, LLM_TIMEOUT, LLM_ERROR): try next provider
- Return response from first successful provider
Circuit breaker
| State | Behavior | Transition |
|---|---|---|
| Closed | Normal operation | → Open after N failures |
| Open | All requests rejected | → Half-open after recovery_timeout |
| Half-open | Test requests allowed | → Closed after M successes |
Registering providers
from harness.llm.router import LLMRouter
from harness.llm.anthropic import AnthropicProvider
from harness.llm.openai_provider import OpenAIProvider
router = LLMRouter()
router.register(AnthropicProvider(api_key=cfg.anthropic_api_key),
priority=0, context_window=200_000)
router.register(OpenAIProvider(api_key=cfg.openai_api_key, model="gpt-4o-mini"),
priority=10, context_window=128_000)
# Falls back to OpenAI if Anthropic is unavailablepython
Hermes — Self-Improvement Loop
Hermes is the self-healing system that automatically detects failure patterns, generates prompt patches, evaluates them, and optionally applies them — with regression detection and auto-rollback.
2. Count failures in rolling window → skip if < min_errors
3. Sample representative failures (semantic search)
4. LLM generates prompt patch proposal
5. Evaluator replays failing tasks with patched config → composite score
6. If score ≥ threshold AND auto_apply: apply patch via PromptStore
7. Schedule rollback check for next cycle
8. Log metrics to MLflow + Prometheus
Scoring formula
score = success_rate
− 0.01 × avg_steps_delta
− 0.001 × avg_tokens_deltaformula
Regression detection (OnlineLearningMonitor)
After every run, metrics are recorded per prompt version. If the post-patch error count exceeds baseline × (1 + regression_threshold), the patch is automatically rolled back to the previous version.
| Config key | Default | Meaning |
|---|---|---|
HERMES_AUTO_APPLY | false | Apply patches without human review |
HERMES_INTERVAL_SECONDS | 3600 | Time between Hermes cycles |
HERMES_MIN_ERRORS_TO_TRIGGER | 5 | Min failures to generate a patch |
HERMES_PATCH_SCORE_THRESHOLD | 0.7 | Min score to apply patch |
regression_threshold | 0.30 | 30% error rate increase triggers rollback |
Evaluation Framework
harness/eval/runner.py and harness/eval/diagnostics.py
from harness.eval.runner import EvalRunner
from harness.eval.datasets import EvalDataset, EvalCase
dataset = EvalDataset(
name="sql_smoke",
agent_type="sql",
cases=[
EvalCase(case_id="c1", task="List all tables", expected_output="users, orders, products"),
EvalCase(case_id="c2", task="Count users", expected_output="42"),
],
)
runner = EvalRunner(agent_runner=agent_runner)
report = await runner.run(dataset, concurrency=3)
print(report.success_rate, report.avg_cost_usd)
print(report.to_markdown())python
EvalDiagnostics
Each EvalReport includes rich per-case diagnostics:
- failure_stage — where the agent failed:
llm | tool | safety | timeout | budget | quality - Recommendations — auto-generated hints based on failure patterns
- By-agent aggregates — token usage, cost, tool errors, guardrail hits per agent type
REST API Reference
Runs
| Method | Path | Description |
|---|---|---|
| POST | /runs | Create and enqueue a run. Body: {agent_type, task, metadata} |
| GET | /runs/{run_id} | Retrieve run record by ID |
| GET | /runs | List runs for tenant. Query: limit, offset |
| DELETE | /runs/{run_id} | Cancel a pending or running run |
| GET | /runs/{run_id}/stream | SSE stream of StepEvents. Terminates on completed/failed |
Traces
| Method | Path | Description |
|---|---|---|
| GET | /runs/{run_id}/trace | Full span hierarchy. Returns TraceView with aggregated totals. 404 if expired (48 h TTL) |
| GET | /runs/spans/{span_id} | Single span by ID. 404 if not found or expired |
Evaluations
| Method | Path | Description |
|---|---|---|
| POST | /evals/smoke/run | Run single-agent smoke suite |
| POST | /evals/multi/run | Run multi-agent regression suite |
| POST | /evals/compare | Compare two prompt versions |
Improvement
| Method | Path | Description |
|---|---|---|
| GET | /improvement/patches | List patches by status (pending/applied/rejected) |
| POST | /improvement/patches/{id}/approve | Approve and apply a patch |
| POST | /improvement/patches/{id}/reject | Reject a patch |
| GET | /improvement/errors | List recent agent failures |
Authentication
Pass an API key in the X-API-Key header. In dev mode, anonymous requests default to tenant "default". JWT bearer tokens are also supported.
Run response schema
{
"run_id": "3f2a8c1e4d...",
"tenant_id": "acme",
"agent_type": "sql",
"task": "List all tables",
"status": "completed",
"created_at": "2024-01-01T00:00:00Z",
"completed_at": "2024-01-01T00:00:12Z",
"result": {
"output": "Found 7 tables: users, orders...",
"steps": 4,
"tokens": 1250,
"cost_usd": 0.00031,
"success": true,
"elapsed_seconds": 11.8
}
}json
Trace response schema
{
"trace_id": "ddda858ebe8f...",
"run_id": "3f2a8c1e4d...",
"agent_type": "sql",
"status": "ok",
"duration_ms": 11840,
"total_input_tokens": 980,
"total_output_tokens": 270,
"total_cost_usd": 0.00031,
"span_count": 6,
"spans": [
{
"span_id": "01f04e9413851d7f",
"parent_span_id": null,
"kind": "run",
"name": "run:sql",
"status": "ok",
"duration_ms": 11840,
"input_preview": "List all tables",
"output_preview": "Found 7 tables..."
},
...
]
}json
Redis Schema
| Key pattern | Type | TTL | Purpose |
|---|---|---|---|
harness:run:{run_id} | HASH | — | RunRecord (persistent) |
harness:conv:{run_id} | LIST | — | Conversation history (ShortTermMemory, LPUSH) |
harness:scratch:{run_id} | HASH | 3600 s | Scratch pad values |
harness:hot:{run_id}:{skill_ns} | LIST | — | Hot window messages per skill (ContextEngine) |
harness:page:{page_id} | HASH | 24 h | Compressed ContextPage |
harness:pages:{run_id}:{skill_ns} | ZSET | 24 h | Page IDs scored by importance |
harness:actions:{run_id} | LIST | 24 h | ActionRecord entries (ContextEngine) |
harness:span:{span_id} | HASH | 48 h | TraceSpan data |
harness:trace:{run_id} | ZSET | 48 h | Span IDs ordered by start_time |
harness:error:{record_id} | HASH | — | ErrorRecord (Hermes input) |
harness:error_index | ZSET | — | Error IDs by timestamp |
harness:hitl:{request_id} | HASH | 3600 s | ApprovalRequest |
harness:hitl:pending | ZSET | — | Pending approval request IDs |
harness:stream:{agent_id} | STREAM | — | Per-agent message stream |
harness:stream:broadcast | STREAM | — | Broadcast message stream |
harness:events:{run_id} | Pub/Sub | — | SSE event channel (EventBus) |
harness:llm_cache:{tenant}:{hash} | HASH | 3600 s | Cached LLM response |
harness:audit | STREAM | — | Audit event stream |
harness:failures | STREAM | — | StepFailure stream (FailureTracker) |
harness:online_metrics:{agent}:{version} | HASH | — | VersionMetrics (OnlineLearningMonitor) |
Dependencies
Core (always installed)
Optional extras
| Extra | Packages | Enables |
|---|---|---|
[vector] | chromadb, qdrant-client, weaviate-client, sentence-transformers | Vector memory backends + embeddings |
[graph] | neo4j | Neo4j production graph backend |
[observe] | mlflow, opentelemetry-sdk, opentelemetry-exporter-otlp, prometheus-client | MLflow tracking, OTel export, Prometheus metrics |
[mcp] | mcp ≥1.2 | MCP server connections |
[sql] | asyncpg, sqlglot | PostgreSQL async driver + SQL parsing |
[ingestion] | pymupdf, trafilatura, python-docx | PDF/HTML/DOCX document ingestion |
[structured] | instructor | Structured LLM output (Pydantic models) |
[all] | All of the above | Full production stack |
Development setup: pip install harnessagent[vector,observe,mcp] covers the most common production use case. Use NetworkX (default graph backend) for local development; switch to Neo4j for production graph workloads.