InfraWeaveDocs

Python SDK

Define agents in code. Run them durably. Stream everything.

pip install infraweave

The SDK is a thin, typed layer over the REST API — everything it does, you can do with plain HTTP.

Define and start

from infraweave import Workflow, agent

@agent("coder", model="tier-1-reasoning")
async def coder(ctx, task):
    return await ctx.llm.complete(task.prompt)

@agent("reviewer", model="fast-cheap")
async def reviewer(ctx, draft):
    verdict = await ctx.llm.complete(f"Review:\n{draft}")
    return {"approved": "LGTM" in verdict, "notes": verdict}

wf = Workflow("checkout-flow")
run = await wf.start(input={"task": "ship it"})
print(run.id, run.url)   # run_8f31a2, the live console trace

Inside an agent, ctx is the governed capability surface:

  • ctx.llm — routed model calls (alias-resolved, budgeted, provenance-stamped).
  • ctx.tools — allowlisted tool invocations through the gateway.
  • ctx.secrets — named references; values are injected at call time, never exposed.
  • ctx.memory — RAG / knowledge-graph retrieval.

The run handle

run = await wf.start(input=..., idempotency_key="idem_9f2c41")

await run.status()          # queued | running | retrying | succeeded | failed | cancelled
result = await run.result() # blocks until terminal; raises on failure
await run.cancel()          # cooperative
await run.signal("approval", {"request_id": "apr_114", "payload": {"approved": True}})

Stream events

async for event in run.events():          # resumes automatically on disconnect
    match event.type:
        case "node_completed":
            print(f"{event.node}: {event.latency_ms}ms · {event.tokens} tok")
        case "input_requested":
            await run.signal(event.signal_name, {
                "request_id": event.request_id,
                "payload": {"approved": True},
            })
        case "status_changed" if event.terminal:
            break

Testing agents

Agents are functions, so unit-test them with a stub context; test workflows end-to-end with record-replay — a captured run's history replays deterministically against your code, no live providers required:

from infraweave.testing import StubContext, replay

async def test_coder():
    ctx = StubContext(llm_responses=["def checkout(): ..."])
    out = await coder(ctx, Task(prompt="write checkout"))
    assert "def checkout" in out

async def test_checkout_flow_replay():
    await replay("checkout-flow", history="fixtures/run_8f3199.json")

On this page