Python SDK
Define agents in code. Run them durably. Stream everything.
pip install infraweaveThe SDK is a thin, typed layer over the REST API — everything it does, you can do with plain HTTP.
Define and start
from infraweave import Workflow, agent
@agent("coder", model="tier-1-reasoning")
async def coder(ctx, task):
return await ctx.llm.complete(task.prompt)
@agent("reviewer", model="fast-cheap")
async def reviewer(ctx, draft):
verdict = await ctx.llm.complete(f"Review:\n{draft}")
return {"approved": "LGTM" in verdict, "notes": verdict}
wf = Workflow("checkout-flow")
run = await wf.start(input={"task": "ship it"})
print(run.id, run.url) # run_8f31a2, the live console traceInside an agent, ctx is the governed capability surface:
ctx.llm— routed model calls (alias-resolved, budgeted, provenance-stamped).ctx.tools— allowlisted tool invocations through the gateway.ctx.secrets— named references; values are injected at call time, never exposed.ctx.memory— RAG / knowledge-graph retrieval.
The run handle
run = await wf.start(input=..., idempotency_key="idem_9f2c41")
await run.status() # queued | running | retrying | succeeded | failed | cancelled
result = await run.result() # blocks until terminal; raises on failure
await run.cancel() # cooperative
await run.signal("approval", {"request_id": "apr_114", "payload": {"approved": True}})Stream events
async for event in run.events(): # resumes automatically on disconnect
match event.type:
case "node_completed":
print(f"{event.node}: {event.latency_ms}ms · {event.tokens} tok")
case "input_requested":
await run.signal(event.signal_name, {
"request_id": event.request_id,
"payload": {"approved": True},
})
case "status_changed" if event.terminal:
breakTesting agents
Agents are functions, so unit-test them with a stub context; test workflows end-to-end with record-replay — a captured run's history replays deterministically against your code, no live providers required:
from infraweave.testing import StubContext, replay
async def test_coder():
ctx = StubContext(llm_responses=["def checkout(): ..."])
out = await coder(ctx, Task(prompt="write checkout"))
assert "def checkout" in out
async def test_checkout_flow_replay():
await replay("checkout-flow", history="fixtures/run_8f3199.json")