Docs Overview

HarnessAgent

Production-grade, multi-tenant agent harness for building, running, observing, and self-improving AI agents. HarnessAgent wraps any agent framework (LangGraph, CrewAI, AutoGen) or can run native SQL/Code agents with a full production lifecycle: paged context management, semantic memory, tool safety, distributed tracing, and automated self-improvement.

45+
Python modules
Core, Memory, Observability, Tools, Orchestration
5
Memory tiers
Hot window, pages, vector, graph, short-term
8
Span kinds
RUN, LLM, TOOL, GUARDRAIL, MEMORY, HANDOFF, EVAL, AGENT

What it gives you

Run lifecycle

Create → execute → checkpoint → resume → emit SSE events → stream to client. Every run is tracked in Redis with full state including HITL pauses and budget enforcement.

Context engineering

Automatic page-based offloading, LLM/extractive compression, semantic cold-page retrieval, per-skill namespace isolation, action scoring, and sub-agent context bridging.

Full span tracing

Every run produces a hierarchical span tree (RUN → LLM/TOOL/GUARDRAIL) stored in Redis + JSONL, queryable via GET /runs/{id}/trace.

Self-improvement

Hermes loop: collect failures → LLM generates patch → eval on replay → auto-apply + regression rollback. Online monitor tracks per-version metrics.

Architecture

HarnessAgent is structured as a layered service. External requests enter through the FastAPI layer and pass through the orchestration layer to individual agents, which interact with memory, tools, and observability systems.

Client / SDK
REST API SSE Streaming Python SDK (wrap())
FastAPI Server — api/
/runs /runs/{id}/trace /evals /improvement /memory /health
Orchestration — orchestrator/
AgentRunner Planner (DAG) Scheduler (parallel) HITLManager
Agent Layer — agents/
BaseAgent (run loop) CodeAgent SQLAgent LangGraphAdapter CrewAIAdapter AutoGenAdapter
Memory — memory/
ContextEngine (pages) ShortTermMemory (Redis) VectorStore (Chroma/Qdrant) GraphRAGEngine SemanticLLMCache
Tools — tools/
ToolRegistry MCPToolAdapter SkillRegistry DockerSandbox
Observability
TraceRecorder (spans) StepTracer (OTel) MLflowAgentTracer EventBus (SSE) FailureTracker AuditLogger
Self-Improvement — improvement/
ErrorCollector HermesLoop Evaluator OnlineLearningMonitor
Infrastructure
Redis (state + streams) Vector DB (Chroma/Qdrant/Weaviate) Graph DB (NetworkX/Neo4j) MLflow / OTel Docker (sandbox)

Quick Start

Install

pip install harnessagent[vector,observe,mcp]
# Or with all extras:
pip install harnessagent[all]bash

Run a single native agent

from harness.core.context import AgentContext
from harness.agents.sql_agent import SQLAgent
from harness.memory.manager import MemoryManager
from harness.llm.router import LLMRouter
from pathlib import Path

# Build components
memory = await MemoryManager.create(config)
llm    = LLMRouter()
llm.register(anthropic_provider, priority=0)

agent = SQLAgent(
    llm_router=llm,
    memory_manager=memory,
    tool_registry=tool_registry,
    safety_pipeline=None,
    step_tracer=None,
    mlflow_tracer=None,
    failure_tracker=failure_tracker,
    audit_logger=audit_logger,
    event_bus=event_bus,
    cost_tracker=cost_tracker,
    checkpoint_manager=checkpoint_manager,
    trace_recorder=trace_recorder,   # ← new: durable span tree
)

ctx = AgentContext.create(
    tenant_id="acme",
    agent_type="sql",
    task="List all tables and their row counts",
    memory=memory,
    workspace_path=Path("/workspaces/acme/run1"),
)

result = await agent.run(ctx)
print(result.output, result.cost_usd, result.steps)python

Wrap an existing LangGraph agent

import harness
from langgraph.graph import StateGraph

graph = StateGraph(...)  # your existing graph
adapter = harness.wrap(graph)
adapter.attach_harness(
    safety_pipeline=my_pipeline,
    cost_tracker=cost_tracker,
    audit_logger=audit_logger,
)

async for event in adapter.run_with_harness(ctx, {"input": "analyze sales data"}):
    print(event.event_type, event.payload)python

Start the API server

uvicorn harness.api.main:create_app --factory --host 0.0.0.0 --port 8000
# Or via Makefile:
make apibash

Configuration

All settings are loaded from environment variables or a .env file via harness.core.config.Settings (Pydantic BaseSettings). Access the singleton via get_config().

VariableDefaultDescription
ANTHROPIC_API_KEYAnthropic API key (required for Claude models)
OPENAI_API_KEYOpenAI API key (optional)
REDIS_URLredis://localhost:6379Redis connection URL for all state
VECTOR_BACKENDchromaVector store: chroma | qdrant | weaviate
GRAPH_BACKENDnetworkxGraph store: networkx | neo4j
EMBEDDING_MODELall-MiniLM-L6-v2SentenceTransformer model name
MLFLOW_TRACKING_URIsqlite:///mlflow.dbMLflow tracking server URI
OTEL_EXPORTER_ENDPOINTOTLP gRPC endpoint (optional)
HERMES_AUTO_APPLYfalseAuto-apply Hermes patches without human review
HERMES_INTERVAL_SECONDS3600How often Hermes cycle runs (seconds)
HERMES_MIN_ERRORS_TO_TRIGGER5Min failures before Hermes generates a patch
HERMES_PATCH_SCORE_THRESHOLD0.7Min eval score to apply a patch (0–1)
JWT_SECRET_KEYJWT signing secret for API auth
ENVIRONMENTdevdev | staging | prod
WORKSPACE_BASE_PATH/workspacesBase path for agent workspaces
SQL_CONNECTION_STRINGSQLAlchemy connection string for SQL agent

Core — Context & Events

Defined in harness/core/context.py. These are the fundamental data objects that flow through every part of the system.

AgentContext

Mutable per-run state shared across all components. Created once per run and passed everywhere.

FieldTypeDescription
run_idstrAuto-generated UUID hex, unique per run
tenant_idstrTenant identifier for multi-tenancy
agent_typestr"sql" | "code" | custom
taskstrNatural language task description
trace_idstrUUID hex for distributed trace correlation
step_countintCurrent step number (incremented by tick())
token_countintCumulative tokens used
max_stepsintStep budget (default 50)
max_tokensintToken budget (default 100,000)
timeout_secondsfloatWall-clock timeout (default 300 s)
failedboolSet to True when run fails
failure_classstr|NoneFailureClass value when failed
metadatadictArbitrary caller metadata; also used for HITL manager injection
AgentContext.create(tenant_id, agent_type, task, memory, workspace_path, **opts) → AgentContext
Factory method that generates run_id and trace_id automatically.
ctx.tick(tokens=0) → None
Increment step_count and accumulate tokens. Raises BudgetExceeded if any budget limit is exceeded.
ctx.is_budget_ok() → bool
Returns False if step_count ≥ max_steps, token_count ≥ max_tokens, or elapsed_seconds ≥ timeout_seconds.

StepEvent

Emitted at each significant moment in an agent run. Published to EventBus (Redis Pub/Sub) for SSE streaming.

event_typePayload fieldsWhen emitted
startedtask, agent_typeRun begins
llm_callmodel, provider, input_tokens, output_tokens, cached, tool_calls countAfter each LLM response
tool_calltool_id, tool_name, args, is_error, errorAfter each tool execution
token_deltadelta (string chunk)During streaming LLM response
completedoutput, elapsed_secondsSuccessful completion
failederror, failure_class, elapsed_secondsAny fatal error
budget_exceededfailure_classStep/token/time budget hit

Core — Protocols

Defined in harness/core/protocols.py. Structural Protocol ABCs that all pluggable components must satisfy. Use isinstance(x, LLMProvider) for runtime checking.

LLMProvider

provider_name: str, model: str

async complete(messages, *, max_tokens, system, tools) → LLMResponse
async stream(messages, **kwargs) → AsyncIterator[str]
async health_check() → bool

VectorStore

async upsert(id, text, metadata, embedding) → None
async query(text, k=5, filter, hybrid_alpha) → list[VectorHit]
async delete(id) → None

ToolExecutor

name, description, input_schema, timeout_seconds

async execute(ctx, args) → ToolResult

GraphStore

async add_node(id, type, props) → None
async add_edge(src, tgt, type, props) → None
async traverse(start_ids, max_hops=2) → list[GraphPath]

Core — Error Hierarchy

All harness errors extend HarnessError and carry a canonical FailureClass enum value. The failure class drives HTTP status codes, Prometheus labels, and Hermes sampling.

FailureClassExceptionHTTP Status
LLM_ERROR / LLM_TIMEOUT / LLM_RATE_LIMITLLMError502 / 504 / 429
TOOL_NOT_FOUND / TOOL_SCHEMA_ERRORToolError400
TOOL_EXEC_ERROR / TOOL_TIMEOUTToolError500
MCP_CONNECT_ERROR / MCP_TOOL_ERRORHarnessError502
SAFETY_INPUT / SAFETY_STEP / SAFETY_OUTPUTSafetyViolation400
BUDGET_STEPS / BUDGET_TOKENS / BUDGET_TIMEBudgetExceeded429
MEMORY_REDIS / MEMORY_VECTOR / MEMORY_GRAPHHarnessError500
INTER_AGENT_TIMEOUT / INTER_AGENT_REJECTHITLRejected403
UNKNOWNHarnessError500

Using failure classes: StepFailure.from_exception(exc, run_id, step_number, agent_type) automatically captures the stack trace and classifies the exception into the correct FailureClass.

BaseAgent — Run Lifecycle

Defined in harness/agents/base.py. All concrete agents inherit from BaseAgent. The run(ctx) method orchestrates the full production lifecycle.

Constructor Parameters

ParameterTypeRequiredPurpose
llm_routerLLMRouterYesHealth-aware, circuit-breaking LLM dispatch
memory_managerMemoryManagerYesConversation, vector, graph memory
tool_registryToolRegistryYesTool lookup, validation, safety, execution
safety_pipelineSafetyPipelineNoInput/output safety checking
step_tracerStepTracerNoOpenTelemetry span export
trace_recorderTraceRecorderNoDurable span tree in Redis + JSONL
failure_trackerFailureTrackerNoFailure classification + Hermes signal
event_busEventBusNoRedis Pub/Sub for SSE streaming
cost_trackerCostTrackerNoPer-call USD cost recording
checkpoint_managerCheckpointManagerNoAtomic save/restore for crash recovery

Run loop steps

  1. Open RUN trace span, emit started event, begin MLflow run
  2. Resume from checkpoint if one exists (_maybe_resume_checkpoint)
  3. Loop while ctx.is_budget_ok():
    1. Fit history to context window (compress if > 40 messages)
    2. GraphRAG/vector retrieve — build retrieval context string
    3. Build LLM messages + system prompt
    4. Open LLM trace span → call LLM → register token usage → close span
    5. Open GUARDRAIL span → run safety pipeline → close span
    6. If no tool calls → extract final answer → break
    7. HITL approval check (sequential)
    8. Execute all tool calls in parallel, each in a TOOL span
    9. Checkpoint every 10 steps
  4. Close RUN span (OK or ERROR), emit completed/failed event
  5. Return AgentResult

Methods to override in subclasses

build_system_prompt(ctx: AgentContext) → str
Return the system prompt. Default is a generic "complete the task" prompt.
build_messages(ctx, history, retrieval_context) → list[dict]
Assemble the messages list for the LLM call. Override to customize message construction.
extract_final_answer(history: list[dict]) → str
Extract final answer from conversation history. Default returns last assistant message.

CodeAgent

harness/agents/code_agent.py — Specialized for Python code writing, debugging, and iterative improvement.

agent_type: "code"

Workflow: Understand → Write clean Python with type hints → Lint with ruff → Run code → Iterate on failures.

Default tools: run_python, lint_code, read_file, write_file, apply_patch, list_workspace

SQLAgent

harness/agents/sql_agent.py — Specialized for SQL queries, schema exploration, and data analysis.

agent_type: "sql"

Schema pre-population: Before the main loop, _populate_schema(ctx) introspects the database and stores table/column nodes in the knowledge graph. This enables GraphRAG to surface schema context automatically in subsequent queries.

async _populate_schema(ctx) → None
Calls list_tables and describe_table tools. Stores Table + Column nodes in graph. Caps at 50 tables. Runs once per session.

Default tools: list_tables, describe_table, sample_rows, execute_sql

Framework Adapters

harness/adapters/ — Wraps external agent frameworks with the harness production lifecycle.

import harness

# Auto-detect from object type
adapter = harness.wrap(my_langgraph_graph)   # → LangGraphAdapter
adapter = harness.wrap(my_crew)               # → CrewAIAdapter
adapter = harness.wrap(my_autogen_agent)      # → AutoGenAdapter

# Inject production components
adapter.attach_harness(safety_pipeline, cost_tracker, audit_logger)
adapter.attach_mcp(mcp_client, tool_names=["search", "execute"])python
AdapterDetectsExecution model
LangGraphAdapterLangGraph StateGraph or CompiledGraphastream() — yields events per node execution
CrewAIAdapterCrewAI Crew objectkickoff_async() — yields events per agent turn
AutoGenAdapterAutoGen ConversableAgentinitiate_chat() — yields events per message

MemoryManager

harness/memory/manager.py — Unified interface to all memory tiers. Injected into every agent via AgentContext.memory.

Memory tiers

Short-term (Redis LIST)

Per-run conversation history. LPUSH newest-first. Configurable last_n window.

Context engine (Redis + Vector)

Paged context with semantic cold-page retrieval. Offloads when hot window exceeds 80% capacity.

Vector store (long-term)

PII-redacted text embeddings. Chroma / Qdrant / Weaviate backends. Hybrid search (alpha-blended).

Knowledge graph

RDF-style triples (subject, predicate, object). NetworkX (dev) or Neo4j (prod). Used by GraphRAG.

Factory

memory = await MemoryManager.create(config, llm_provider=llm)
# Builds: ShortTermMemory + EmbeddingProvider + VectorStore
#         + GraphMemory + ContextWindowManager + ContextEnginepython

Key methods

async push_message(run_id, role, content, tokens=0, skill_ns="default", step=0)
Feed to both ShortTermMemory and ContextEngine. skill_ns namespaces context by active skill.
async build_context(run_id, query, skill_ns, token_budget) → BuiltContext | ContextWindow
Assemble LLM-ready context. Uses ContextEngine (with offload + cold retrieval) when available.
async smart_retrieve(query, ctx) → RetrievalResult
GraphRAG-first retrieval with vector fallback. Returns graph_context + vector_context + strategy.
async remember(text, metadata, tenant_id) → str
PII-redact and store in vector DB. Returns document ID.
async evaluate_action(run_id, step, goal, llm_content, tool_name, ...) → ActionRecord | None
Score LLM+tool action (goal_progress, tool_relevance, confidence). Stored in Redis.
async slice_for_subagent(parent_run_id, child_run_id, task, token_budget) → SubAgentSlice | None
Extract task-relevant context slice from parent and inject into child's hot window.

ContextEngine

harness/memory/context_engine.py — Paged, skill-isolated context management with automatic offload and semantic retrieval.

Core idea: The hot window (Redis LIST) holds recent messages per (run_id, skill_ns). When it exceeds 80% of max_hot_tokens, the oldest ~2 000 tokens are compressed, embedded, and evicted to the vector store as a ContextPage. Before each LLM call, relevant cold pages are semantically retrieved and re-injected as system messages.

Offload pipeline

  1. Check hot window size (O(1) Redis LLEN)
  2. If count < _HOT_MAX_MSGS (200), skip
  3. Compute hot token usage (scan all raw items)
  4. If < 80% of max_hot_tokens, skip
  5. Walk from oldest to newest until ~2 000 tokens collected
  6. Compress (LLM preferred, extractive fallback)
  7. Embed summary → store in vector DB with metadata
  8. Store ContextPage in Redis HASH + ZSET (scored by importance)
  9. LTRIM hot list to remove offloaded messages

Importance scoring

Cold pages with higher importance are kept longer. Score starts at 0.5 and bumps for:

  • +0.10 tool messages
  • +0.15 error/failed content
  • +0.08 result/found/success content

Action scoring formula

composite = 0.5 × goal_progress
          + 0.3 × tool_relevance
          + 0.2 × confidence

goal_progress  = min(1, keyword_overlap(response, goal) × 2)
tool_relevance = 0.9 (success + result > 20 chars)
               | 0.6 (success, no result)
               | 0.5 (no tool)
               | 0.1 (error)
confidence     = 0.6 − 0.08 per hedging phrase
               + 0.08 per affirming phraseformula

Key constants

ConstantValueDescription
_PAGE_TOKEN_TARGET2,000Target tokens per offloaded page
_PAGE_TTL86,400 s24h TTL for cold pages in Redis
_HOT_MAX_MSGS200Message count that triggers offload check
_ACTIONS_TTL86,400 s24h TTL for action records

GraphRAG Engine

harness/memory/graph_rag.py — Weighted multi-hop graph retrieval with vector bridging. Used by SQLAgent (schema) and any knowledge-intensive workload.

Edge weights

Edge typeWeightMeaning
joins1.5SQL JOIN relationship between tables
used_by_query1.2Table used in a past successful query
references1.0FK reference between tables
has_column0.8Table → column relationship
occurred_in0.6Error occurred in query
(default)0.5Generic relationship

Retrieval strategy (priority order)

  1. Extract entities from query (regex patterns for table/column names)
  2. Find matching graph nodes (find_nodes(entities))
  3. If no anchors found → vector-to-graph bridging: embed query, find vector hits, use as graph seeds
  4. BFS from anchor nodes, score paths by cumulative edge weight
  5. Render top-N paths as compact context: [SCHEMA], [JOINS], [PAST QUERIES], [PAST ERRORS]
  6. Supplement with raw vector hits if graph coverage is thin

Semantic LLM Cache

harness/llm/cache.py — Cosine-similarity cache for LLM responses, keyed by message embedding.

ParameterDefaultDescription
threshold0.97Cosine similarity threshold for cache hit
ttl3600 sCache entry lifetime
Redis keyharness:llm_cache:{tenant}:{hash}HASH per entry

Trace System — Overview

HarnessAgent produces a hierarchical span tree for every agent run. Spans are stored in Redis (48 h TTL) and appended to logs/runs/{run_id}/trace.jsonl.

Span hierarchy

run run:sql_agent  1234ms • root span, entire run
llm llm:call  450ms • 1200 tok • $0.002
guardrail guardrail:output  12ms • passed
tool tool:execute_sql  180ms • 42 rows
llm llm:call  310ms • 800 tok • $0.001
guardrail guardrail:output  8ms • passed

The span stack is tracked automatically per run_id inside TraceRecorder._stacks. Callers never need to pass parent_span_id — it is always the top of the stack.

TraceSpan Schema

harness/observability/trace_schema.py

FieldTypeDescription
trace_idstrMatches AgentContext.trace_id — shared across all spans in one run tree
span_idstr16 hex chars, unique per span
parent_span_idstr|NoneNone for root span; set automatically by recorder stack
run_idstrAgentContext.run_id
kindSpanKindRUN | AGENT | LLM | TOOL | GUARDRAIL | MEMORY | HANDOFF | EVAL
namestre.g. "llm:claude-3-5-sonnet", "tool:execute_sql"
statusSpanStatusRUNNING | OK | ERROR
start_timedatetimeUTC timestamp
end_timedatetime|NoneNull while RUNNING
duration_msfloat|NoneWall-clock milliseconds
input_previewstrFirst 500 chars of input
output_previewstrFirst 500 chars of output
errorstr|NoneError message on ERROR status
input_tokensintLLM input tokens (LLM spans only)
output_tokensintLLM output tokens (LLM spans only)
cost_usdfloatUSD cost (LLM spans only)
cachedboolTrue if response came from cache
agent_typestrCopied from AgentContext
stepintAgentContext.step_count at span open time

TraceRecorder

harness/observability/trace_recorder.py

Persistence sinks

SinkKeyTTLPurpose
Redis HASHharness:span:{span_id}48 hLive query by span_id
Redis ZSETharness:trace:{run_id}48 hSpan index ordered by start_time
JSONL filelogs/runs/{run_id}/trace.jsonlForeverDurable audit trail

Usage

recorder = TraceRecorder.create(redis_url=cfg.redis_url, log_dir="logs")

# Context manager (recommended)
async with recorder.span(run_id, SpanKind.TOOL, "tool:execute_sql", ctx,
                          input_preview=str(args)) as span_id:
    result = await registry.execute(ctx, call)

# Manual control
span_id = await recorder.start_span(run_id, SpanKind.LLM, "llm:call", ctx)
recorder.set_llm_usage(span_id, input_tokens=200, output_tokens=100, cost_usd=0.003)
await recorder.end_span(run_id, span_id)

# Query
trace = await recorder.get_trace(run_id)   # → TraceView | None
span  = await recorder.get_span(span_id)   # → TraceSpan | Nonepython

set_llm_usage must be called before the span context manager exits, or before end_span. It registers pending token/cost data that end_span picks up. If not called, token counts default to 0.

Metrics & Audit

Prometheus metrics (harness/observability/metrics.py)

MetricTypeLabels
harness_agent_steps_totalCounteragent_type, tenant_id, status
harness_agent_tokens_totalCounteragent_type, provider, token_type
harness_tool_calls_totalCountertool_name, agent_type, status
harness_safety_blocks_totalCounterguard, agent_type, stage
harness_active_runsGaugeagent_type
harness_llm_request_duration_secondsHistogramprovider, model
harness_cost_usd_totalCountertenant_id, model
harness_hermes_patches_totalCounteragent_type, status

AuditLogger (harness/observability/audit.py)

Append-only log for compliance. Writes to {workspace_base}/audit/{tenant_id}/{date}.jsonl and Redis stream harness:audit.

Actions tracked: tool_call, memory_write/read, llm_call, agent_start/complete, hitl_request/resolve, safety_block, patch_apply

PII safety: payloads are SHA-256 hashed before storage — content is never stored, only fingerprints.

Event Bus (SSE)

harness/observability/event_bus.py — Redis Pub/Sub channels for real-time streaming via Server-Sent Events.

Channel: harness:events:{run_id}

# BaseAgent emits
await event_bus.publish(StepEvent.llm_called(ctx, response))

# Client subscribes (used by SSE route)
async for event in event_bus.subscribe(run_id):
    print(event.event_type, event.payload)

# From the browser:
const source = new EventSource(`/runs/${runId}/stream`)
source.onmessage = (e) => console.log(JSON.parse(e.data))python / js

ToolRegistry

harness/tools/registry.py — Central tool management with validation, safety, timeout, and audit.

Execution pipeline

  1. Lookup — find tool by name; raise TOOL_NOT_FOUND if missing
  2. Schema validation — validate args against input_schema; raise TOOL_SCHEMA_ERROR if invalid
  3. Safety check — run through safety_pipeline; raise SAFETY_STEP if blocked
  4. Timeout — wrap execution in asyncio.timeout(tool.timeout_seconds)
  5. Execute — call tool.execute(ctx, args)
  6. Audit log — record in AuditLogger
  7. Metrics — increment tool_calls_total

Built-in tools

Tool nameModuleDescription
run_pythoncode_toolsExecute Python in DockerSandbox or RestrictedPython
lint_codecode_toolsRun ruff linter on code string
read_filefile_toolsRead file from workspace (path traversal blocked)
write_filefile_toolsWrite file to workspace
list_tablessql_toolsList all tables in connected database
describe_tablesql_toolsGet column schema and types for a table
execute_sqlsql_toolsExecute SELECT query (read-only mode enforced)
sample_rowssql_toolsGet first N rows from a table

MCP Client

harness/tools/mcp_client.py — Connect to any MCP-compatible server (stdio or SSE) and expose its tools in the ToolRegistry.

from harness.tools.mcp_client import MCPToolAdapter, load_mcp_servers_from_config

# From YAML config
adapters = load_mcp_servers_from_config("mcp_servers.yaml")
for adapter in adapters:
    tools = await adapter.connect()
    for tool in tools:
        registry.register(tool)

# Manual
adapter = MCPToolAdapter(MCPServerConfig(
    name="search",
    transport="stdio",
    command=["npx", "-y", "@modelcontextprotocol/server-brave-search"],
    env={"BRAVE_API_KEY": "${BRAVE_API_KEY}"},
))
tools = await adapter.connect()python

YAML config format

servers:
  search:
    transport: stdio
    command: ["npx", "-y", "@modelcontextprotocol/server-brave-search"]
    env: {BRAVE_API_KEY: "${BRAVE_API_KEY}"}
  filesystem:
    transport: stdio
    command: ["npx", "-y", "@modelcontextprotocol/server-filesystem", "/data"]yaml

Skills

harness/tools/skills.py — Named, versioned, composable agent capabilities. Skills bundle a system prompt fragment, required tools, and usage examples.

from harness.tools.skills import Skill, SkillRegistry

registry = SkillRegistry()
registry.register(Skill(
    name="sql_analysis",
    version="1.0.0",
    description="Analyze SQL databases",
    system_prompt="You are an expert SQL analyst...",
    required_tools=["list_tables", "describe_table", "execute_sql"],
    tags=["sql", "analytics"],
))

# Compose multiple skills
combined = registry.compose("sql_analysis", "data_visualization")

# Filter by tag
analytics_skills = registry.list_for_tags(["analytics"])python

Execution Sandbox

harness/filesystem/sandbox.py — Isolated code execution. Docker is the primary sandbox; RestrictedPython is the fallback.

DockerSandbox security controls

ControlSettingEffect
Memory limit256 MiBOOM kill if exceeded
CPU limit1 coreNo runaway CPU consumption
NetworkDisabled by defaultNo outbound connections
UsernobodyNon-root execution
Privileges--no-new-privilegesCannot escalate
Timeout30 sTerminated if exceeded
FilesystemWorkspace mount onlyCannot access host filesystem

AgentRunner

harness/orchestrator/runner.py — Creates, executes, and tracks agent runs. The main component called by API routes.

async create_run(tenant_id, agent_type, task, metadata) → RunRecord
Creates a RunRecord with status="pending", stores in Redis, returns it.
async execute_run(run_id, timeout=300.0) → AgentResult
Loads run, builds agent, runs it, updates RunRecord to completed/failed.
async cancel_run(run_id)
Sets cancellation flag; running agent loop detects and stops.

RunRecord states

pending running completed
failed cancelled

Planner & Scheduler

Multi-agent DAG decomposition and parallel execution.

Planner (harness/orchestrator/planner.py)

Takes a complex task, calls an LLM to decompose it into a JSON array of SubTask objects with dependency declarations. Validates the DAG (cycle detection via Kahn's algorithm).

planner = Planner(llm_provider=llm)
plan = await planner.plan(
    task="Build a sales dashboard: fetch data, analyze it, generate report",
    available_agents=["sql", "code", "research"],
)
# Returns TaskPlan with SubTasks:
# {id: "fetch", agent_type: "sql",  depends_on: []}
# {id: "analyze", agent_type: "code", depends_on: ["fetch"]}
# {id: "report",  agent_type: "code", depends_on: ["analyze"]}python

Scheduler (harness/orchestrator/scheduler.py)

Executes the plan by repeatedly finding ready tasks (deps satisfied) and running them in parallel with asyncio.gather + semaphore back-pressure.

  • Parallelism: max_concurrent=10 semaphore
  • Retry: max_retries=1 per subtask, exponential backoff
  • Handoff enrichment: predecessor outputs injected into dependent task context
  • Failure propagation: tasks with failed deps are skipped and marked failed

Human-in-the-Loop (HITL)

harness/orchestrator/hitl.py — Pause agent execution and wait for human approval before executing sensitive tool calls.

# In agent context metadata
ctx.metadata["hitl_manager"] = hitl_manager
ctx.metadata["policy"] = policy  # policy.requires_hitl(tool_name) → bool

# During tool execution, BaseAgent calls:
request = await hitl_manager.request_approval(ctx, tool_name, tool_args)
decision = await hitl_manager.await_decision(request.request_id, timeout=3600.0)
# "approved" → continue; "rejected"/"expired" → raise HITLRejectedpython

Pending requests are stored in Redis sorted set harness:hitl:pending and exposed via GET /hitl/pending.

Inter-Agent Messaging

harness/messaging/bus.py — Redis Streams-based messaging between agents in multi-agent systems.

Message typeDirectionDescription
taskParent → ChildDelegate work to child agent
resultChild → ParentReturn completed work
errorAnyError notification
queryAnyRequest information
statusAnyProgress update
heartbeatAnyKeep-alive signal
async request(recipient_id, message_type, payload, timeout=30.0) → AgentMessage | None
Send a message and wait for a correlated reply. Uses correlation_id for matching.
async fan_out(recipient_ids, message_type, payload, timeout=30.0) → list[AgentMessage]
Send to N agents in parallel, gather all replies within timeout.

LLM Router

harness/llm/router.py — Health-aware, context-window-aware routing across multiple LLM providers with per-provider circuit breaking.

Routing algorithm

  1. Sort registered providers by priority (lower = higher priority)
  2. Skip providers where context_window < required_context
  3. Skip providers with open circuit breakers
  4. Try each provider in priority order
  5. On retryable error (LLM_RATE_LIMIT, LLM_TIMEOUT, LLM_ERROR): try next provider
  6. Return response from first successful provider

Circuit breaker

StateBehaviorTransition
ClosedNormal operation→ Open after N failures
OpenAll requests rejected→ Half-open after recovery_timeout
Half-openTest requests allowed→ Closed after M successes

Registering providers

from harness.llm.router import LLMRouter
from harness.llm.anthropic import AnthropicProvider
from harness.llm.openai_provider import OpenAIProvider

router = LLMRouter()
router.register(AnthropicProvider(api_key=cfg.anthropic_api_key),
                priority=0, context_window=200_000)
router.register(OpenAIProvider(api_key=cfg.openai_api_key, model="gpt-4o-mini"),
                priority=10, context_window=128_000)
# Falls back to OpenAI if Anthropic is unavailablepython

Hermes — Self-Improvement Loop

Hermes is the self-healing system that automatically detects failure patterns, generates prompt patches, evaluates them, and optionally applies them — with regression detection and auto-rollback.

Hermes cycle (runs every hermes_interval_seconds)
1. Check pending rollback from last auto-apply
2. Count failures in rolling window → skip if < min_errors
3. Sample representative failures (semantic search)
4. LLM generates prompt patch proposal
5. Evaluator replays failing tasks with patched config → composite score
6. If score ≥ threshold AND auto_apply: apply patch via PromptStore
7. Schedule rollback check for next cycle
8. Log metrics to MLflow + Prometheus

Scoring formula

score = success_rate
      − 0.01 × avg_steps_delta
      − 0.001 × avg_tokens_deltaformula

Regression detection (OnlineLearningMonitor)

After every run, metrics are recorded per prompt version. If the post-patch error count exceeds baseline × (1 + regression_threshold), the patch is automatically rolled back to the previous version.

Config keyDefaultMeaning
HERMES_AUTO_APPLYfalseApply patches without human review
HERMES_INTERVAL_SECONDS3600Time between Hermes cycles
HERMES_MIN_ERRORS_TO_TRIGGER5Min failures to generate a patch
HERMES_PATCH_SCORE_THRESHOLD0.7Min score to apply patch
regression_threshold0.3030% error rate increase triggers rollback

Evaluation Framework

harness/eval/runner.py and harness/eval/diagnostics.py

from harness.eval.runner import EvalRunner
from harness.eval.datasets import EvalDataset, EvalCase

dataset = EvalDataset(
    name="sql_smoke",
    agent_type="sql",
    cases=[
        EvalCase(case_id="c1", task="List all tables", expected_output="users, orders, products"),
        EvalCase(case_id="c2", task="Count users", expected_output="42"),
    ],
)

runner = EvalRunner(agent_runner=agent_runner)
report = await runner.run(dataset, concurrency=3)
print(report.success_rate, report.avg_cost_usd)
print(report.to_markdown())python

EvalDiagnostics

Each EvalReport includes rich per-case diagnostics:

  • failure_stage — where the agent failed: llm | tool | safety | timeout | budget | quality
  • Recommendations — auto-generated hints based on failure patterns
  • By-agent aggregates — token usage, cost, tool errors, guardrail hits per agent type

REST API Reference

Runs

MethodPathDescription
POST/runsCreate and enqueue a run. Body: {agent_type, task, metadata}
GET/runs/{run_id}Retrieve run record by ID
GET/runsList runs for tenant. Query: limit, offset
DELETE/runs/{run_id}Cancel a pending or running run
GET/runs/{run_id}/streamSSE stream of StepEvents. Terminates on completed/failed

Traces

MethodPathDescription
GET/runs/{run_id}/traceFull span hierarchy. Returns TraceView with aggregated totals. 404 if expired (48 h TTL)
GET/runs/spans/{span_id}Single span by ID. 404 if not found or expired

Evaluations

MethodPathDescription
POST/evals/smoke/runRun single-agent smoke suite
POST/evals/multi/runRun multi-agent regression suite
POST/evals/compareCompare two prompt versions

Improvement

MethodPathDescription
GET/improvement/patchesList patches by status (pending/applied/rejected)
POST/improvement/patches/{id}/approveApprove and apply a patch
POST/improvement/patches/{id}/rejectReject a patch
GET/improvement/errorsList recent agent failures

Authentication

Pass an API key in the X-API-Key header. In dev mode, anonymous requests default to tenant "default". JWT bearer tokens are also supported.

Run response schema

{
  "run_id":       "3f2a8c1e4d...",
  "tenant_id":    "acme",
  "agent_type":   "sql",
  "task":         "List all tables",
  "status":       "completed",
  "created_at":   "2024-01-01T00:00:00Z",
  "completed_at": "2024-01-01T00:00:12Z",
  "result": {
    "output":          "Found 7 tables: users, orders...",
    "steps":           4,
    "tokens":          1250,
    "cost_usd":        0.00031,
    "success":         true,
    "elapsed_seconds": 11.8
  }
}json

Trace response schema

{
  "trace_id":             "ddda858ebe8f...",
  "run_id":               "3f2a8c1e4d...",
  "agent_type":           "sql",
  "status":               "ok",
  "duration_ms":          11840,
  "total_input_tokens":   980,
  "total_output_tokens":  270,
  "total_cost_usd":       0.00031,
  "span_count":           6,
  "spans": [
    {
      "span_id":        "01f04e9413851d7f",
      "parent_span_id": null,
      "kind":           "run",
      "name":           "run:sql",
      "status":         "ok",
      "duration_ms":    11840,
      "input_preview":  "List all tables",
      "output_preview": "Found 7 tables..."
    },
    ...
  ]
}json

Redis Schema

Key patternTypeTTLPurpose
harness:run:{run_id}HASHRunRecord (persistent)
harness:conv:{run_id}LISTConversation history (ShortTermMemory, LPUSH)
harness:scratch:{run_id}HASH3600 sScratch pad values
harness:hot:{run_id}:{skill_ns}LISTHot window messages per skill (ContextEngine)
harness:page:{page_id}HASH24 hCompressed ContextPage
harness:pages:{run_id}:{skill_ns}ZSET24 hPage IDs scored by importance
harness:actions:{run_id}LIST24 hActionRecord entries (ContextEngine)
harness:span:{span_id}HASH48 hTraceSpan data
harness:trace:{run_id}ZSET48 hSpan IDs ordered by start_time
harness:error:{record_id}HASHErrorRecord (Hermes input)
harness:error_indexZSETError IDs by timestamp
harness:hitl:{request_id}HASH3600 sApprovalRequest
harness:hitl:pendingZSETPending approval request IDs
harness:stream:{agent_id}STREAMPer-agent message stream
harness:stream:broadcastSTREAMBroadcast message stream
harness:events:{run_id}Pub/SubSSE event channel (EventBus)
harness:llm_cache:{tenant}:{hash}HASH3600 sCached LLM response
harness:auditSTREAMAudit event stream
harness:failuresSTREAMStepFailure stream (FailureTracker)
harness:online_metrics:{agent}:{version}HASHVersionMetrics (OnlineLearningMonitor)

Dependencies

Core (always installed)

fastapi ≥0.115pydantic ≥2.9 redis ≥5.2anthropic ≥0.40 openai ≥1.50httpx ≥0.28 networkx ≥3.3sqlalchemy ≥2.0 uvicornrq ≥2.0 apscheduler ≥3.10sse-starlette ≥2.1

Optional extras

ExtraPackagesEnables
[vector]chromadb, qdrant-client, weaviate-client, sentence-transformersVector memory backends + embeddings
[graph]neo4jNeo4j production graph backend
[observe]mlflow, opentelemetry-sdk, opentelemetry-exporter-otlp, prometheus-clientMLflow tracking, OTel export, Prometheus metrics
[mcp]mcp ≥1.2MCP server connections
[sql]asyncpg, sqlglotPostgreSQL async driver + SQL parsing
[ingestion]pymupdf, trafilatura, python-docxPDF/HTML/DOCX document ingestion
[structured]instructorStructured LLM output (Pydantic models)
[all]All of the aboveFull production stack

Development setup: pip install harnessagent[vector,observe,mcp] covers the most common production use case. Use NetworkX (default graph backend) for local development; switch to Neo4j for production graph workloads.

HarnessAgent Technical Documentation • v0.1.0 • Built 2026