Skip to content

JarvisCore User Guide

Practical guide to building agent systems with JarvisCore.


Table of Contents

  1. Quick Start
  2. Basic Concepts
  3. AutoAgent Tutorial
  4. Custom Profile Tutorial
  5. CustomAgent Tutorial
  6. Multi-Agent Workflows
  7. Infrastructure & Memory (v0.4.0) — blob, mailbox, memory, auth, telemetry
  8. Internet Search
  9. Remote Sandbox
  10. Result Storage
  11. Code Registry
  12. FastAPI Integration (v0.3.0)
  13. Cloud Deployment (v0.3.0)
  14. Cognitive Discovery (v0.3.0)
  15. Session Context (v0.3.2)
  16. Async Requests (v0.3.2)
  17. Load Balancing (v0.3.2)
  18. Mesh Diagnostics (v0.3.2)
  19. Testing with MockMesh (v0.3.2)
  20. Best Practices
  21. Common Patterns
  22. Troubleshooting

Quick Start

Step 1: Installation (1 minute)

pip install jarviscore-framework

Step 2: Configuration (2 minutes)

JarvisCore needs an LLM provider to generate code for AutoAgent. Initialize your project:

# Initialize project (creates .env.example and examples)
python -m jarviscore.cli.scaffold --examples

# Copy and configure your environment
cp .env.example .env

Edit .env and add one of these API keys:

# Option 1: Claude (Recommended)
CLAUDE_API_KEY=sk-ant-your-key-here

# Option 2: Azure OpenAI
AZURE_API_KEY=your-key-here
AZURE_ENDPOINT=https://your-resource.openai.azure.com
AZURE_DEPLOYMENT=gpt-4o

# Option 3: Google Gemini
GEMINI_API_KEY=your-key-here

# Option 4: Local vLLM
LLM_ENDPOINT=http://localhost:8000

Step 3: Validate Setup (30 seconds)

Run the health check to ensure everything works:

# Basic check
python -m jarviscore.cli.check

# Test LLM connectivity
python -m jarviscore.cli.check --validate-llm

Run the smoke test to validate end-to-end:

python -m jarviscore.cli.smoketest

Step 4: Your First Agent (30 seconds)

import asyncio
from jarviscore import Mesh
from jarviscore.profiles import AutoAgent

class CalculatorAgent(AutoAgent):
    role = "calculator"
    capabilities = ["math", "calculation"]
    system_prompt = "You are a math expert. Store results in 'result' variable."

async def main():
    mesh = Mesh(mode="autonomous")
    mesh.add(CalculatorAgent)
    await mesh.start()

    results = await mesh.workflow("calc", [
        {"agent": "calculator", "task": "Calculate the factorial of 10"}
    ])

    print(results[0]['output'])  # 3628800
    await mesh.stop()

asyncio.run(main())

That's it! Three steps: 1. Define agent class 2. Add to mesh 3. Run workflow


Basic Concepts

The Mesh

The Mesh is the runtime that wires everything together. You create one Mesh per process, configure its mode, register agent classes with mesh.add(), then call await mesh.start() to initialise them. The Mesh routes each workflow step to the right agent, injects infrastructure (Redis store, blob storage, mailbox) before setup() runs, and manages the full agent lifecycle — from startup through teardown.

# Autonomous mode (single machine, workflow engine only)
mesh = Mesh(mode="autonomous")

# P2P mode (agent-to-agent communication via SWIM/ZMQ)
mesh = Mesh(mode="p2p", config={'bind_port': 7950})

# Distributed mode (both workflow engine AND P2P networking)
mesh = Mesh(mode="distributed", config={'bind_port': 7950})

Mode Selection: | Mode | Use Case | Components | |------|----------|------------| | autonomous | Single machine, simple pipelines | Workflow Engine | | p2p | Agent swarms, real-time coordination | P2P Coordinator | | distributed | Multi-node production systems | Both |

Agents

Agents are workers that execute tasks. Each agent has a role (a unique string identifier), a list of capabilities, and belongs to a Profile that determines how it runs.

A Profile is the execution contract an agent fulfills. You subclass a Profile, set its class attributes, and pass the class to mesh.add() — the Mesh instantiates and manages it. JarvisCore offers two profiles:

Profile Best For How It Works
AutoAgent Rapid prototyping LLM generates + executes code from prompts
CustomAgent Your own code Implement on_peer_request() for P2P or execute_task() for workflows

See AutoAgent Guide and CustomAgent Guide for details.

Workflows

A Workflow is a list of steps — each step names an agent by role or capability, plus a task description. Steps that declare depends_on wait for those dependencies to complete before starting; independent steps run in parallel. Each completed step's output is forwarded automatically to dependent steps via task['context']['previous_step_results'], so agents can build on each other's work without manual wiring.

await mesh.workflow("pipeline-id", [
    {"agent": "scraper", "task": "Scrape data"},
    {"agent": "processor", "task": "Clean data", "depends_on": [0]},
    {"agent": "storage", "task": "Save data", "depends_on": [1]}
])

AutoAgent Tutorial

AutoAgent handles the "prompt → code → result" workflow automatically. See AutoAgent Guide for distributed mode.

Example 1: Simple Calculator

import asyncio
from jarviscore import Mesh
from jarviscore.profiles import AutoAgent

class CalculatorAgent(AutoAgent):
    role = "calculator"
    capabilities = ["math", "calculation"]
    system_prompt = "You are a mathematical calculation expert. Store results in 'result'."

async def calculator_demo():
    mesh = Mesh(mode="autonomous")
    mesh.add(CalculatorAgent)
    await mesh.start()

    result = await mesh.workflow("calc", [
        {"agent": "calculator", "task": "Calculate 15!"}
    ])

    print(f"15! = {result[0]['output']}")
    await mesh.stop()

asyncio.run(calculator_demo())

Example 2: Data Analyst

class AnalystAgent(AutoAgent):
    role = "analyst"
    capabilities = ["data_analysis", "statistics"]
    system_prompt = "You are a data analyst expert. Store results in 'result'."

async def data_analyst_demo():
    mesh = Mesh(mode="autonomous")
    mesh.add(AnalystAgent)
    await mesh.start()

    result = await mesh.workflow("analysis", [{
        "agent": "analyst",
        "task": """
        Given this data: [23, 45, 12, 67, 89, 34, 56, 78, 90, 11]
        Calculate: mean, median, mode, standard deviation, and min/max
        """
    }])

    print(result[0]['output'])
    await mesh.stop()

asyncio.run(data_analyst_demo())

Custom Profile Tutorial

Use CustomAgent profile.

See CustomAgent Guide for: - Converting standalone agents to JarvisCore - P2P mode for agent-to-agent communication - Distributed mode for multi-node systems


CustomAgent Tutorial

CustomAgent gives you full control over execution logic. See CustomAgent Guide for P2P and distributed modes.

Quick Example

from jarviscore import Mesh
from jarviscore.profiles import CustomAgent

class MyAgent(CustomAgent):
    role = "processor"
    capabilities = ["data_processing"]

    async def setup(self):
        """Initialize resources (DB connections, API clients, etc.)."""
        await super().setup()
        self.data = []

    async def execute_task(self, task):
        """Called by workflow engine for each task."""
        task_desc = task.get("task", "")
        context = task.get("context", {})  # From depends_on steps

        # Your logic here
        result = {"processed": task_desc.upper()}

        return {
            "status": "success",
            "output": result,
            "agent_id": self.agent_id
        }

async def main():
    mesh = Mesh(mode="autonomous")
    mesh.add(MyAgent)
    await mesh.start()

    results = await mesh.workflow("demo", [
        {"agent": "processor", "task": "hello world"}
    ])

    print(results[0]["output"])  # {"processed": "HELLO WORLD"}
    await mesh.stop()

Key Methods

Method Purpose Mode
setup() Initialize resources All
execute_task(task) Handle workflow steps Autonomous/Distributed
run() Continuous loop P2P
teardown() Cleanup resources All

Multi-Agent Workflows

Example: Data Pipeline

from jarviscore import Mesh
from jarviscore.profiles import AutoAgent

class ScraperAgent(AutoAgent):
    role = "scraper"
    capabilities = ["web_scraping", "data_collection"]
    system_prompt = "You are a web scraping expert. Store results in 'result'."

class ProcessorAgent(AutoAgent):
    role = "processor"
    capabilities = ["data_processing", "cleaning"]
    system_prompt = "You are a data cleaning expert. Store results in 'result'."

class AnalyzerAgent(AutoAgent):
    role = "analyzer"
    capabilities = ["analysis", "statistics"]
    system_prompt = "You are a data analysis expert. Store results in 'result'."

async def data_pipeline():
    mesh = Mesh(mode="autonomous")
    mesh.add(ScraperAgent)
    mesh.add(ProcessorAgent)
    mesh.add(AnalyzerAgent)
    await mesh.start()

    results = await mesh.workflow("pipeline", [
        {
            "id": "scrape",
            "agent": "scraper",
            "task": "Generate sample e-commerce data (10 products with prices)"
        },
        {
            "id": "clean",
            "agent": "processor",
            "task": "Clean and normalize the product data",
            "depends_on": ["scrape"]
        },
        {
            "id": "analyze",
            "agent": "analyzer",
            "task": "Calculate price statistics (mean, median, range)",
            "depends_on": ["clean"]
        }
    ])

    print("Scrape:", results[0]['output'])
    print("Clean:", results[1]['output'])
    print("Analysis:", results[2]['output'])
    await mesh.stop()

asyncio.run(data_pipeline())

Infrastructure & Memory

JarvisCore v1.0.0 ships a full production infrastructure stack. All features degrade gracefully when not configured.

Auto-Injection Quick Reference

Before every agent's setup(), the Mesh injects:

Attribute What it is Requires
self._redis_store RedisContextStore — step outputs, workflow graph, mailbox, checkpoints REDIS_URL
self._blob_storage LocalBlobStorage | AzureBlobStorage — artifact I/O always present
self.mailbox MailboxManager — async inter-agent messaging via Redis Streams REDIS_URL
class MyAgent(CustomAgent):
    async def setup(self):
        await super().setup()
        # All injected — use directly, no constructor wiring
        print(self._redis_store, self._blob_storage, self.mailbox)

Blob Storage

Save and load any artifact — string, bytes, or JSON:

STORAGE_BACKEND=local          # default: ./blob_storage/
STORAGE_BASE_PATH=./blob_storage
# or: STORAGE_BACKEND=azure with AZURE_STORAGE_CONNECTION_STRING
# Save
await self._blob_storage.save("reports/daily-001.md", markdown_text)
await self._blob_storage.save("data/result.json", json.dumps(data))

# Load
content = await self._blob_storage.load("reports/daily-001.md")
data = json.loads(content) if content else {}

Path convention: {type}/{workflow_id}/{filename}.{ext}


Mailbox Messaging

Fire-and-forget inter-agent messages via Redis Streams. Messages survive process restarts.

# Send (fire-and-forget)
self.mailbox.send(other_agent_id, {"event": "done", "workflow": "wf-001"})

# Drain inbox
messages = self.mailbox.read(max_messages=10)
for msg in messages:
    print(msg["event"])

Prometheus Telemetry

Prometheus support is optional. If prometheus-client is not installed, all metric calls are silent no-ops — nothing fails and no configuration is required. Install it only when you want to scrape metrics:

pip install "jarviscore[prometheus]"

Enable and configure via .env:

PROMETHEUS_ENABLED=true   # default: false
PROMETHEUS_PORT=9090      # default: 9090

When PROMETHEUS_ENABLED=true, the Mesh automatically starts an HTTP metrics server on the configured port at mesh.start(). No code changes are needed — all built-in metrics are collected automatically.

Metrics exposed

Metric Type Labels Description
jarviscore_llm_tokens_input_total Counter provider, model Total input tokens consumed
jarviscore_llm_tokens_output_total Counter provider, model Total output tokens generated
jarviscore_llm_cost_dollars_total Counter provider, model Total LLM cost in USD
jarviscore_llm_request_duration_seconds Histogram provider, model LLM API call duration
jarviscore_llm_requests_total Counter provider, model, status Total LLM requests (success/error)
jarviscore_workflow_steps_total Counter status Steps completed, labelled by outcome
jarviscore_step_execution_duration_seconds Histogram status Per-step execution time
jarviscore_active_workflows Gauge Workflows currently running
jarviscore_active_steps Gauge Steps currently executing
jarviscore_events_emitted_total Counter event_type Trace events emitted

LLM metrics (tokens, cost, duration) are recorded automatically by the AutoAgent code-generation pipeline. Workflow metrics (steps_total, active_workflows) are recorded automatically by the WorkflowEngine.

Recording step metrics in CustomAgent

AutoAgent records step metrics automatically. For CustomAgent, call record_step_execution manually:

import time
from jarviscore.telemetry.metrics import record_step_execution

class MyAgent(CustomAgent):
    async def execute_task(self, task):
        start = time.time()
        result = self._do_work(task)
        record_step_execution(time.time() - start, "completed")
        return {"status": "success", "output": result}

Viewing metrics

# Raw metrics endpoint
curl -s http://localhost:9090/metrics | grep jarviscore

# Check active workflows during a run
curl -s http://localhost:9090/metrics | grep active_workflows

# Token usage by provider
curl -s http://localhost:9090/metrics | grep llm_tokens

Metrics are in the standard Prometheus text format and can be scraped by any Prometheus-compatible system (Prometheus server, Grafana, Datadog Agent, etc.).


Distributed WorkflowEngine

mesh = Mesh(mode="distributed", config={"redis_url": "redis://localhost:6379/0"})
results = await mesh.workflow("wf-001", [
    {"id": "fetch",   "agent": "fetcher",  "task": "Fetch data"},
    {"id": "analyse", "agent": "analyst",  "task": "Analyse", "depends_on": ["fetch"]},
])
  • DAG persisted to Redis hash workflow_graph:{wf_id} — crash recovery on restart
  • Distributed nodes autonomously claim matching steps via atomic SETNX
  • _wait_remote_step() polls Redis every 2s until a remote step is "completed"

Auth Injection (Nexus OSS)

NEXUS_GATEWAY_URL=https://your-dromos-gateway.example.com
AUTH_MODE=production    # or "mock" for local dev
class SecureAgent(CustomAgent):
    requires_auth = True   # → self._auth_manager injected before setup()

    async def execute_task(self, task):
        if self._auth_manager:
            result = await self._auth_manager.make_authenticated_request(
                provider="github", method="GET",
                url="https://api.github.com/user",
            )

Full Nexus OSS flow: request_connection → browser OAuth → poll ACTIVE → resolve_strategy → apply header. _auth_manager is None when NEXUS_GATEWAY_URL is not set (graceful degradation).


UnifiedMemory

from jarviscore.memory import UnifiedMemory, RedisMemoryAccessor

# In setup()
self.memory = UnifiedMemory(
    workflow_id="wf-001", step_id="analyst", agent_id=self.role,
    redis_store=self._redis_store, blob_storage=self._blob_storage,
)

# EpisodicLedger
await self.memory.episodic.append({"event": "started", "ts": time.time()})
recent = await self.memory.episodic.tail(5)

# LongTermMemory
await self.memory.ltm.save_summary("Key findings: ...")
summary = await self.memory.ltm.load_summary()

# Cross-step accessor — read prior step output without explicit passing
accessor = RedisMemoryAccessor(self._redis_store, workflow_id="wf-001")
raw = accessor.get("fetch")
data = raw.get("output", raw) if isinstance(raw, dict) else {}

Production Examples

All examples require Redis: docker compose -f docker-compose.infra.yml up -d

Example Mode Profile
Ex1 — Financial Pipeline autonomous AutoAgent
Ex2 — Research Network (4 nodes) distributed AutoAgent
Ex3 — Support Swarm p2p CustomAgent
Ex4 — Content Pipeline distributed CustomAgent
python examples/ex1_financial_pipeline.py
python examples/ex2_synthesizer.py &  # then start nodes 1-3
python examples/ex3_support_swarm.py
python examples/ex4_content_pipeline.py

Full walkthroughs: AUTOAGENT_GUIDE.mdCUSTOMAGENT_GUIDE.md


Enable web search for research tasks:

from jarviscore import Mesh
from jarviscore.profiles import AutoAgent

class ResearcherAgent(AutoAgent):
    role = "researcher"
    capabilities = ["research", "web_search"]
    system_prompt = "You are an expert researcher. Store results in 'result'."
    enable_search = True  # ← Enable internet search

async def search_demo():
    mesh = Mesh(mode="autonomous")
    mesh.add(ResearcherAgent)
    await mesh.start()

    result = await mesh.workflow("search", [{
        "agent": "researcher",
        "task": "Search for 'Python asyncio best practices' and summarize top 3 results"
    }])

    print(result[0]['output'])
    await mesh.stop()

asyncio.run(search_demo())

Search Capabilities: - DuckDuckGo web search - Content extraction from URLs - Automatic summarization - No API keys required


Remote Sandbox

Use remote code execution for better security:

Enable Remote Sandbox

# .env file
SANDBOX_MODE=remote
SANDBOX_SERVICE_URL=https://browser-task-executor.bravesea-3f5f7e75.eastus.azurecontainerapps.io

Test Remote Execution

from jarviscore.execution import create_sandbox_executor

async def test_remote():
    executor = create_sandbox_executor(
        timeout=30,
        config={
            'sandbox_mode': 'remote',
            'sandbox_service_url': 'https://...'
        }
    )

    code = "result = 2 + 2"
    result = await executor.execute(code)

    print(f"Mode: {result['mode']}")  # "remote"
    print(f"Output: {result['output']}")  # 4
    print(f"Time: {result['execution_time']}s")

asyncio.run(test_remote())

Benefits: - Full process isolation - Better security - Azure Container Apps hosting - Automatic fallback to local

When to use: - Production deployments - Untrusted code execution - Multi-tenant systems - High security requirements


Result Storage

All execution results are automatically stored:

Access Result Storage

from jarviscore.execution import create_result_handler

handler = create_result_handler()

# Get specific result
result = handler.get_result("calculator-abc123_2026-01-12T12-00-00_123456")

# Get agent's recent results
recent = handler.get_agent_results("calculator-abc123", limit=10)

for r in recent:
    print(f"{r['task']}: {r['output']}")

Storage Location

logs/
├── calculator-abc123/
│   ├── calculator-abc123_2026-01-12T12-00-00_123456.json
│   └── calculator-abc123_2026-01-12T12-05-30_789012.json
├── analyzer-def456/
│   └── analyzer-def456_2026-01-12T12-10-00_345678.json
└── code_registry/
    ├── index.json
    └── functions/
        ├── calculator-abc123_3a5b2f76.py
        └── analyzer-def456_8b2c4d91.py

What's Stored: - Task description - Generated code - Execution output - Status (success/failure) - Execution time - Token usage - Cost (if tracked) - Repair attempts - Timestamp


Code Registry

Reuse successful code across agents:

Search Registry

from jarviscore.execution import create_code_registry

registry = create_code_registry()

# Search for math functions
matches = registry.search(
    query="factorial calculation",
    capabilities=["math"],
    limit=3
)

for match in matches:
    print(f"Function: {match['function_id']}")
    print(f"Task: {match['task']}")
    print(f"Output sample: {match['output_sample']}")

Get Function Code

# Get specific function
func = registry.get("calculator-abc123_3a5b2f76")

print("Code:")
print(func['code'])

print("\nMetadata:")
print(f"Agent: {func['agent_id']}")
print(f"Capabilities: {func['capabilities']}")
print(f"Registered: {func['registered_at']}")

Use Cases: - Share functions between agents - Build function library - Audit generated code - Performance analysis


FastAPI Integration (v0.3.0)

Deploy agents as FastAPI services with minimal boilerplate:

JarvisLifespan

from fastapi import FastAPI, Request
from jarviscore.profiles import CustomAgent
from jarviscore.integrations.fastapi import JarvisLifespan

class ProcessorAgent(CustomAgent):
    role = "processor"
    capabilities = ["processing"]

    async def on_peer_request(self, msg):
        return {"result": msg.data.get("task", "").upper()}

# 3 lines to integrate
agent = ProcessorAgent()
app = FastAPI(lifespan=JarvisLifespan(agent, mode="p2p", bind_port=7950))

@app.get("/peers")
async def list_peers(request: Request):
    agent = request.app.state.jarvis_agents["processor"]
    return {"peers": agent.peers.list_peers()}

What JarvisLifespan handles: - Mesh startup/shutdown - Background task management for agent run() loops - Graceful shutdown with timeouts - State injection into FastAPI app


Cloud Deployment (v0.3.0)

Deploy agents to containers without a central orchestrator:

Self-Registration Pattern

# In your container entrypoint
import asyncio
from jarviscore.profiles import CustomAgent

class MyAgent(CustomAgent):
    role = "worker"
    capabilities = ["processing"]

    async def on_peer_request(self, msg):
        return {"processed": msg.data}

async def main():
    agent = MyAgent()

    # Join existing mesh (uses JARVISCORE_SEED_NODES env var)
    await agent.join_mesh()

    print(f"Joined as {agent.role}, discovered: {agent.peers.list_peers()}")

    # Run until shutdown, auto-leaves mesh on exit
    await agent.run_standalone()

asyncio.run(main())

Docker/Kubernetes

FROM python:3.11-slim
WORKDIR /app
COPY . .
RUN pip install jarviscore-framework

# Point to existing mesh
ENV JARVISCORE_SEED_NODES=mesh-service:7946

CMD ["python", "-m", "myapp.agent"]

Environment Variables: - JARVISCORE_SEED_NODES - Comma-separated list of seed nodes - JARVISCORE_MESH_ENDPOINT - Single endpoint to join


Cognitive Discovery (v0.3.0)

Let LLMs dynamically discover mesh peers instead of hardcoding agent names:

The Problem

# Before: Hardcoded peer names in prompts
system_prompt = """
You can delegate to:
- analyst for data analysis
- scout for research
"""
# Breaks when mesh composition changes!

The Solution

# After: Dynamic discovery
system_prompt = self.peers.build_system_prompt(
    "You are a coordinator agent."
)
# Automatically includes all available peers!

get_cognitive_context()

# Get prompt-ready peer descriptions
context = self.peers.get_cognitive_context(format="markdown")

Output:

## AVAILABLE MESH PEERS

You are part of a multi-agent mesh. The following peers are available:

- **analyst** (`agent-analyst-abc123`)
  - Capabilities: analysis, charting, reporting
  - Description: Analyzes data and generates insights

- **scout** (`agent-scout-def456`)
  - Capabilities: research, reconnaissance
  - Description: Gathers information

Use the `ask_peer` tool to delegate tasks to these specialists.

Formats: markdown, json, text


Session Context (v0.3.2)

Pass metadata through your message flows for tracing, priority, and session tracking:

Basic Usage

# Send request with context
response = await self.peers.request(
    "analyst",
    {"query": "analyze sales data"},
    context={"mission_id": "m-123", "priority": "high", "user_id": "u-456"}
)

# Context is available in the handler
async def on_peer_request(self, msg):
    mission_id = msg.context.get("mission_id")  # "m-123"
    print(f"Processing request for mission: {mission_id}")
    return {"result": "analysis complete"}

Auto-Propagation

When you respond(), context automatically propagates from the request:

async def on_peer_request(self, msg):
    # msg.context = {"mission_id": "m-123", ...}
    result = process(msg.data)

    # Context auto-propagates - no need to pass it!
    await self.peers.respond(msg, {"result": result})

    # Or override with custom context
    await self.peers.respond(msg, {"result": result},
                            context={"status": "completed"})

All Methods Support Context

# notify
await self.peers.notify("logger", {"event": "started"}, context={"trace_id": "t-1"})

# request
response = await self.peers.request("worker", {"task": "..."}, context={"priority": "low"})

# broadcast
await self.peers.broadcast({"alert": "system ready"}, context={"source": "coordinator"})

# ask_async
req_id = await self.peers.ask_async("analyst", {"q": "..."}, context={"batch_id": "b-1"})

Async Requests (v0.3.2)

Fire-and-collect pattern for parallel requests without blocking:

Basic Pattern

# Fire off multiple requests (non-blocking)
request_ids = []
for analyst in self.peers.discover(role="analyst"):
    req_id = await self.peers.ask_async(analyst.agent_id, {"task": "analyze"})
    request_ids.append(req_id)

# Do other work while analysts process...
await self.do_other_work()

# Collect responses
results = []
for req_id in request_ids:
    response = await self.peers.check_inbox(req_id, timeout=10)
    if response:
        results.append(response)

API Reference

# Send async request - returns immediately
req_id = await self.peers.ask_async(target, message, timeout=120, context=None)

# Check for response (non-blocking if timeout=0)
response = await self.peers.check_inbox(req_id, timeout=0)

# Check with wait
response = await self.peers.check_inbox(req_id, timeout=5)

# List pending requests
pending = self.peers.get_pending_async_requests()
# [{"request_id": "...", "target": "analyst", "sent_at": 1234567890.0}]

# Clear inbox
self.peers.clear_inbox(req_id)  # Specific
self.peers.clear_inbox()        # All

Load Balancing (v0.3.2)

Distribute requests across multiple peers with discovery strategies:

Strategies

# Default: first in discovery order
peers = self.peers.discover(role="worker", strategy="first")

# Random: shuffle for basic load distribution
peers = self.peers.discover(role="worker", strategy="random")

# Round-robin: rotate through peers on each call
peers = self.peers.discover(role="worker", strategy="round_robin")

# Least-recent: prefer peers not used recently
peers = self.peers.discover(role="worker", strategy="least_recent")

Convenience Method

# Get single peer with strategy
worker = self.peers.discover_one(role="worker", strategy="round_robin")
if worker:
    await self.peers.request(worker.agent_id, {"task": "..."})

Track Usage for least_recent

peer = self.peers.discover_one(role="worker", strategy="least_recent")
response = await self.peers.request(peer.agent_id, {"task": "..."})

# Update usage timestamp after successful communication
self.peers.record_peer_usage(peer.agent_id)

Example: Round-Robin Work Distribution

async def distribute_tasks(self, tasks):
    results = []
    for task in tasks:
        # Each call rotates to next worker
        worker = self.peers.discover_one(role="worker", strategy="round_robin")
        if worker:
            response = await self.peers.request(worker.agent_id, {"task": task})
            results.append(response)
    return results

Mesh Diagnostics (v0.3.2)

Monitor mesh health and debug connectivity issues:

Get Diagnostics

diag = mesh.get_diagnostics()

print(f"Mode: {diag['local_node']['mode']}")
print(f"Status: {diag['connectivity_status']}")
print(f"Agents: {diag['local_node']['agent_count']}")

for agent in diag['local_agents']:
    print(f"  - {agent['role']}: {agent['capabilities']}")

for peer in diag['known_peers']:
    print(f"  - {peer['role']} @ {peer['node_id']}: {peer['status']}")

Connectivity Status Values

Status Meaning
healthy P2P active with connected peers
isolated P2P active but no peers found
degraded Some connectivity issues
not_started Mesh not yet started
local_only Autonomous mode (no P2P)

FastAPI Health Endpoint

@app.get("/health")
async def health(request: Request):
    mesh = request.app.state.jarvis_mesh
    diag = mesh.get_diagnostics()
    return {
        "status": diag["connectivity_status"],
        "agents": diag["local_node"]["agent_count"],
        "peers": len(diag["known_peers"])
    }

Testing with MockMesh (v0.3.2)

Unit test your agents without real P2P infrastructure:

Basic Setup

import pytest
from jarviscore.testing import MockMesh
from jarviscore.profiles import CustomAgent

class MyAgent(CustomAgent):
    role = "processor"
    capabilities = ["processing"]

    async def on_peer_request(self, msg):
        # Delegate to analyst
        analysis = await self.peers.request("analyst", {"data": msg.data})
        return {"processed": True, "analysis": analysis}

@pytest.mark.asyncio
async def test_processor_delegates():
    mesh = MockMesh()
    mesh.add(MyAgent)
    await mesh.start()

    agent = mesh.get_agent("processor")

    # Configure mock response for analyst
    agent.peers.set_mock_response("analyst", {"result": "analyzed"})

    # Test the agent
    response = await agent.peers.request("analyst", {"test": "data"})

    # Verify
    assert response["result"] == "analyzed"
    agent.peers.assert_requested("analyst")

    await mesh.stop()

MockPeerClient Features

# Configure responses
agent.peers.set_mock_response("analyst", {"result": "..."})
agent.peers.set_default_response({"status": "ok"})

# Custom handler for dynamic responses
async def handler(target, message, context):
    return {"echo": message, "target": target}
agent.peers.set_request_handler(handler)

# Inject messages for handler testing
from jarviscore.p2p.messages import MessageType
agent.peers.inject_message("sender", MessageType.REQUEST, {"data": "test"})

# Assertions
agent.peers.assert_notified("target")
agent.peers.assert_requested("analyst", message_contains={"query": "test"})
agent.peers.assert_broadcasted()

# Track what was sent
notifications = agent.peers.get_sent_notifications()
requests = agent.peers.get_sent_requests()

# Reset between tests
agent.peers.reset()

Best Practices

1. Always Use Context Managers

# Good
async with Mesh() as mesh:
    mesh.add_agent(AutoAgent, ...)
    await mesh.start()
    results = await mesh.run_workflow([...])
    # Automatic cleanup

# Manual (also works)
mesh = Mesh()
try:
    await mesh.start()
    results = await mesh.run_workflow([...])
finally:
    await mesh.stop()

2. Handle Errors Gracefully

try:
    results = await mesh.run_workflow([...])

    for i, result in enumerate(results):
        if result['status'] == 'failure':
            print(f"Step {i} failed: {result['error']}")
        else:
            print(f"Step {i} succeeded: {result['output']}")

except TimeoutError:
    print("Workflow timed out")
except RuntimeError as e:
    print(f"Runtime error: {e}")

3. Use Clear System Prompts

# Good
system_prompt = """
You are a financial data analyst expert.
Your task is to analyze stock data and provide insights.
Always return results as structured JSON.
"""

# Bad
system_prompt = "You are helpful"

4. Set Appropriate Timeouts

# Short tasks
mesh.add_agent(AutoAgent, ..., max_repair_attempts=1)

# Long-running tasks
config = {'execution_timeout': 600}  # 10 minutes
mesh = Mesh(config=config)

5. Monitor Costs

class MyAgent(CustomAgent):
    async def execute_task(self, task):
        # ... do work ...

        # Track costs
        self.track_cost(
            input_tokens=500,
            output_tokens=200,
            cost_usd=0.015
        )

        return result

Common Patterns

Pattern 1: Fan-Out, Fan-In

# Process multiple items in parallel, then aggregate
results = await mesh.run_workflow([
    # Fan-out: Process items
    {"id": "item1", "agent": "processor", "task": "Process item 1"},
    {"id": "item2", "agent": "processor", "task": "Process item 2"},
    {"id": "item3", "agent": "processor", "task": "Process item 3"},

    # Fan-in: Aggregate results
    {
        "id": "aggregate",
        "agent": "aggregator",
        "task": "Combine all processed items",
        "depends_on": ["item1", "item2", "item3"]
    }
])

Pattern 2: Conditional Execution

# Execute step 1
results = await mesh.run_workflow([
    {"id": "check", "agent": "validator", "task": "Validate input data"}
])

# Decide next step based on result
if results[0]['output']['valid']:
    results = await mesh.run_workflow([
        {"agent": "processor", "task": "Process valid data"}
    ])
else:
    print("Validation failed, skipping processing")

Pattern 3: Retry with Different Agent

try:
    # Try primary agent
    result = await mesh.run_workflow([
        {"agent": "primary_scraper", "task": "Scrape website"}
    ])
except Exception:
    # Fallback to backup agent
    result = await mesh.run_workflow([
        {"agent": "backup_scraper", "task": "Scrape website"}
    ])

Troubleshooting

Issue: Agent not found

# Error: No agent found for step
# Solution: Check role/capability spelling
mesh.add_agent(AutoAgent, role="calculator", capabilities=["math"])

# This will fail
await mesh.run_workflow([
    {"agent": "calcul", "task": "..."}  # Typo!
])

# This works
await mesh.run_workflow([
    {"agent": "calculator", "task": "..."}  # Correct role
])

# This also works
await mesh.run_workflow([
    {"agent": "math", "task": "..."}  # Uses capability
])

Issue: Mesh not started

# Error: RuntimeError: Workflow engine not started
# Solution: Call mesh.start() before run_workflow()

mesh = Mesh()
mesh.add_agent(...)
await mesh.start()  # ← Don't forget this!
await mesh.run_workflow([...])

Issue: Timeout

# Error: TimeoutError: Execution exceeded 300 seconds
# Solution: Increase timeout

config = {'execution_timeout': 600}  # 10 minutes
mesh = Mesh(config=config)

Issue: No LLM provider configured

# Error: RuntimeError: No LLM provider configured
# Solution: Set environment variables

# .env file
CLAUDE_API_KEY=your-key
# or
AZURE_API_KEY=your-key
AZURE_ENDPOINT=https://...

Issue: Code execution fails

# Check logs for details
import logging
logging.basicConfig(level=logging.DEBUG)

# Enable verbose output
config = {'log_level': 'DEBUG'}
mesh = Mesh(config=config)

Next Steps

  1. AutoAgent Guide - Multi-node distributed mode
  2. CustomAgent Guide - P2P and distributed with your code
  3. API Reference - Detailed component documentation
  4. Configuration Guide - Environment setup
  5. Explore examples/ directory for more code samples

Version

User Guide for JarvisCore v1.0.2

Last Updated: 2026-03-04