Financial Intelligence Pipeline¶
| Profile | AutoAgent |
| Infra required | Redis (auto-detected from REDIS_URL) |
| Workflow | fetch → analyse → report |
| Run | python examples/financial_pipeline.py |
What it does¶
Three AutoAgent specialists form a sequential workflow. The WorkflowEngine dispatches steps in dependency order, passing outputs from each step into the next agent's execution context automatically.
MarketDataAgent → AnalysisAgent → ReportAgent
fetch analyse report
(OHLCV data) (signals, alerts) (Markdown briefing)
The final Markdown briefing is saved to blob_storage/reports/financial-daily-001.md.
Key pattern: workflow DAG¶
from jarviscore import Mesh
from jarviscore.profiles import AutoAgent
REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379/0")
mesh = Mesh(config={"redis_url": REDIS_URL}) # (1)!
mesh.add(MarketDataAgent)
mesh.add(AnalysisAgent)
mesh.add(ReportAgent)
await mesh.start()
results = await mesh.workflow("financial-daily-001", [
{
"id": "fetch",
"agent": "market_data",
"task": "Generate synthetic daily market data for AAPL, MSFT, NVDA ...",
},
{
"id": "analyse",
"agent": "analyst",
"task": "Analyse the market data from the fetch step ...",
"depends_on": ["fetch"], # (2)!
},
{
"id": "report",
"agent": "reporter",
"task": "Write an executive Markdown market briefing ...",
"depends_on": ["analyse"], # (3)!
},
])
- Pass
redis_urlin the config dict. The Mesh detects the Redis connection atstart()time and activates the workflow engine with persistence automatically. Nomode=argument needed. depends_ontells theWorkflowEngineto wait untilfetchsucceeds. Thefetchoutput is injected into the analyst's execution context automatically.- Chained dependency —
reportwaits foranalyse. Context includes both prior step outputs.
Key pattern: auto-injected infrastructure¶
class MarketDataAgent(AutoAgent):
role = "market_data"
capabilities = ["market_data", "data_collection", "finance"]
system_prompt = "..." # LLM prompt — store result in variable named 'result'
async def setup(self):
await super().setup()
# self._redis_store and self._blob_storage are injected
# by Mesh.start() BEFORE setup() is called.
self.memory = UnifiedMemory(
workflow_id=WORKFLOW_ID,
step_id="fetch",
agent_id=self.role,
redis_store=self._redis_store, # (1)!
blob_storage=self._blob_storage, # (2)!
)
self._redis_store— a liveRedisMemoryStore, ready to use.Noneif Redis is unavailable (example degrades gracefully).self._blob_storage— aLocalBlobStorageinstance. On cloud deployments swap forS3BlobStoragevia config.
Key pattern: crash recovery¶
The workflow uses a deterministic workflow_id. Re-running skips already-completed steps:
python examples/financial_pipeline.py # crashes after "fetch"
python examples/financial_pipeline.py # re-run: skips "fetch", continues from "analyse"
Verify in Redis:
redis-cli keys "*financial-daily-001*"
# → step_output:financial-daily-001:fetch ← already set, will be skipped
Success criteria¶
- [ ] All 3 steps complete:
fetch → analyse → report - [ ] Report saved to
blob_storage/reports/financial-daily-001.md - [ ] Console prints
Pipeline complete: 3/3 steps successful - [ ] Re-running skips completed steps (crash recovery)