Series · Claude Agent SDK in Production · Part 2 of 14
· 24 min read
Claude Agent SDK in Production, Part 2: The FastAPI Bridge and the Event Vocabulary
The agent goes behind a URL, and its whole run streams as six kinds of labeled events. This table of six words is the most load-bearing design decision in the series.
claude-agent-sdk · fastapi · sse · tutorial
By the end of this page you'll watch your analyst investigate live over HTTP, tool call by tool call, with nothing fancier than curl. But the demo isn't the real product of this part. The real product is a table with six rows in it. Twelve of the fourteen parts in this series ship their features as new rows in that table, and the client code that reads it will never change. Getting that table right, today, is the difference between a series that compounds and a series that rewrites itself every third part. So this part moves fast through the plumbing and slow through the design.
Why a server at all
Part 1's agent runs in your terminal, which means it runs as you, with your Claude login, on your machine. Nobody else can use it, and no browser can either: a web page can't hold your credentials, spawn the SDK's subprocess, or touch your filesystem, and it should not be able to do any of those things. So we do what every AI product does: the agent lives behind an HTTP endpoint on a machine we control, and clients talk to the endpoint. FastAPI is the front door, and if FastAPI itself is new to you, LangGraph Part 2 teaches it from zero; this part assumes it and moves at review speed.
The blocking version, in one screen
Start with the obvious thing, so we can watch it fail in an interesting way. Move Part 1's imports into a package (mkdir app && touch app/__init__.py, next to workspace/), then a first app/main.py: request model in, agent run in the middle, reply out.
from claude_agent_sdk import ClaudeAgentOptions, ResultMessage, queryfrom fastapi import FastAPIfrom fastapi.middleware.cors import CORSMiddlewarefrom pydantic import BaseModel
MODEL = "claude-haiku-4-5"
OPTIONS = ClaudeAgentOptions( cwd="workspace", tools=["Read", "Glob", "Grep", "Bash", "Write"], permission_mode="bypassPermissions", model=MODEL,)
app = FastAPI()app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:3000"], allow_methods=["*"], allow_headers=["*"],)Same four option decisions as Part 1, now module-level: the options don't change per request. The CORS block is the one piece of forward wiring: no browser talks to this server until Part 3, but when it does, it'll be a Next.js dev server on localhost:3000, and this is the permission slip (LangGraph Part 2 has the from-zero story on CORS). Then the endpoint:
class ChatRequest(BaseModel): message: str
class ChatResponse(BaseModel): reply: str total_cost_usd: float
@app.post("/chat")async def chat(request: ChatRequest) -> ChatResponse: reply, cost = "", 0.0 async for message in query(prompt=request.message, options=OPTIONS): if isinstance(message, ResultMessage): reply = message.result or "" cost = message.total_cost_usd or 0.0 return ChatResponse(reply=reply, total_cost_usd=cost)We drain the whole stream, keep only the receipt, and return it. Note total_cost_usd riding in the response: the cost ritual goes on the wire from the very first endpoint, so every client this series ever builds can show the bill. Run it from backend/:
uv run uvicorn app.main:app --reloadAnd in a second terminal, ask the March question:
You've sat through this silence before. Someone demos a prototype, the room asks it a question, and a spinner spins while the presenter narrates what the code is "probably doing right now" to twelve people watching a frozen screen. Every second of that silence converts one believer into a skeptic. Agents make it worse, because the honest answer to "what's it doing?" is genuinely interesting, and the blocking endpoint throws that answer away.
The answer came back correct, with the receipt, for $0.0208. And it took 22.3 seconds of absolute silence. No progress, no partial output, no sign of life; if that agent had needed forty tool calls instead of ten, your reverse proxy would have killed the connection before the answer existed. Name the problem precisely, because this part exists to fix it: the agent narrates its work as a stream, and this endpoint flattens the stream into one number and one string. Part 1's terminal showed you eight events' worth of drama; this JSON shows none of it.
The amnesia demo
One more failure to meet on purpose, while the server's up. The blocking endpoint answered "which store had the best March" with Downtown. Ask the natural follow-up:
"I don't have any context about which store you're referring to." That's not a bug in our endpoint; it's the honest shape of what we built. Every call to query() starts a fresh session with a fresh id and a blank history. The diary from Part 1 is still being written, one file per one-question conversation, but nobody's handing the agent its old diary back yet. Part 5 does exactly that. Until then, remember the working rule: the files in the workspace are the only memory. The agent re-reads the CSVs every request, which is also why it keeps working at all.
Designing the envelope
Now the design section, the one this part is named after. The client needs to see the agent's work as it happens. The agent's work arrives as SDK message objects. We are not going to ship SDK objects over HTTP: they're Python dataclasses, they change with SDK versions, and three-quarters of what they carry is bookkeeping no UI needs. Instead we define a wire vocabulary: every event is one JSON object with a type field, sent as a server-sent event. Labeled parcels, one conveyor belt. Six labels today:
type | Payload | Meaning |
|---|---|---|
session_start | session_id | The turn began; here's the diary key |
text_delta | text | A piece of the answer, in order |
tool_use_start | tool_id, tool_name, tool_input | The agent reached for a tool |
tool_result | tool_id, content, is_error | The tool came back |
complete | usage, total_cost_usd, duration_ms | The receipt; the turn is over |
error | message | Something broke; the turn is over |
Look at what's not here, because that's where the design earns its keep. Nothing in this table says "agent", "SDK", or "Claude". A client that reads this vocabulary is coupled to our product, not to our engine. And the parser contract is one sentence: switch on type, ignore types you don't recognize. That second clause is the whole trick. In Part 4, artifact_update events start riding this belt; in Part 7, approval_request; in Part 10, plan_proposed and thinking_delta. Every one of those features will be additive: new label, same belt, and a Part 3 client that never heard of approvals keeps working, blind to them, instead of crashing. We're designing today for events we haven't invented yet.
The translator
New file, app/events.py. First the framing and one small mercy:
# A tool can return an enormous payload (Read on a 2,000-line file). The# wire narrates the work; it doesn't haul the data. Clip what we forward.MAX_TOOL_RESULT_CHARS = 2_000
def sse(event: dict) -> str: """Frame one event dict as a server-sent event.""" return f"data: {json.dumps(event)}\n\n"
def clip(content: object) -> str: text = content if isinstance(content, str) else json.dumps(content, default=str) if len(text) <= MAX_TOOL_RESULT_CHARS: return text return text[:MAX_TOOL_RESULT_CHARS] + f"\n… clipped {len(text) - MAX_TOOL_RESULT_CHARS} chars"sse() is the entire SSE wire format: the JSON, a data: prefix, a blank line as the delimiter. (LangGraph Part 5 covers why that blank line matters and the client-side bug it causes; we inherit all of it.) clip() encodes a rule you'll be glad of in Part 3: when the agent Reads a 2,000-line CSV, the UI needs to know the read happened and roughly what came back, not to receive the whole file inside a chat event. Narration on the wire, data on the disk.
Then the translator itself: Part 1's message anatomy, cashed in. One async for, and every message type maps to its parcel:
async def translate(messages: AsyncIterator[Message]) -> AsyncIterator[str]: """Map SDK messages onto the wire vocabulary, one SSE frame at a time.""" try: async for message in messages: if isinstance(message, SystemMessage) and message.subtype == "init": yield sse({"type": "session_start", "session_id": message.data["session_id"]})
elif isinstance(message, StreamEvent): delta = message.event.get("delta", {}) if delta.get("type") == "text_delta": yield sse({"type": "text_delta", "text": delta["text"]})
elif isinstance(message, AssistantMessage): for block in message.content: if isinstance(block, ToolUseBlock): yield sse({ "type": "tool_use_start", "tool_id": block.id, "tool_name": block.name, "tool_input": block.input, })(Ignore the StreamEvent branch for a few minutes; it's dormant until we flip a switch below.) The rest of the ladder handles the world answering and the turn ending:
elif isinstance(message, UserMessage) and isinstance(message.content, list): for block in message.content: if isinstance(block, ToolResultBlock): yield sse({ "type": "tool_result", "tool_id": block.tool_use_id, "content": clip(block.content), "is_error": bool(block.is_error), })
elif isinstance(message, ResultMessage): yield sse({ "type": "complete", "usage": message.usage, "total_cost_usd": message.total_cost_usd, "duration_ms": message.duration_ms, }) except Exception as exc: # noqa: BLE001 - anything broken becomes a wire event yield sse({"type": "error", "message": str(exc)})Two Part 1 facts are doing the work here. Tool results arrive inside UserMessage, so that's where tool_result parcels come from, with the tool_use_id passed through as tool_id so a client can match result to call. And ResultMessage is the receipt, so complete carries the cost to every client, always. The except at the bottom is the sixth event type earning its row: a translator that dies mid-stream takes the whole story with it, so failures become parcels too. You'll see that fire, for real, before this page ends.
Block-level streaming: the door becomes a window
Replace the blocking endpoint. This is the whole diff:
@app.post("/chat")async def chat(request: ChatRequest) -> StreamingResponse: stream = query(prompt=request.message, options=OPTIONS) return StreamingResponse(translate(stream), media_type="text/event-stream")Read it as a pipeline, because it is one: query() produces SDK messages, translate() turns each into a framed parcel, and StreamingResponse pushes each parcel out the socket the moment it exists. No buffering, no waiting for the end. (ChatResponse and the ResultMessage import go in the bin; the receipt is an event now.) The server restarts, and this time watch with -N, which tells curl not to buffer:
Same server, same agent, transformed experience: every tool call and result appears the instant it happens, thirty seconds of investigation you can actually watch. This is the part's dessert and it's two lines of FastAPI. But notice what's still clunky: the text_delta parcels are huge. One whole paragraph at a time, because the translator is converting completed TextBlocks. The narration streams; the prose still arrives in slabs.
Token-level: flip the switch
The SDK yields whole messages by default: you hear nothing while the model writes a paragraph, then get the finished block. One option changes that. In main.py:
OPTIONS = ClaudeAgentOptions( cwd="workspace", tools=["Read", "Glob", "Grep", "Bash", "Write"], permission_mode="bypassPermissions", model=MODEL, include_partial_messages=True,)With partial messages on, the stream gains StreamEvent objects carrying the model's raw token-by-token deltas, and the dormant branch in the translator wakes up: content_block_delta events whose payload is a text_delta become our text_delta parcels, word-fragment by word-fragment.
One subtlety, and it's the kind that bites silently: partial events add to the stream, they don't replace anything. The finished TextBlock still arrives afterward inside AssistantMessage. If the translator forwarded both, every sentence would go over the wire twice and every client would render the answer twice. That's why the AssistantMessage branch you wrote earlier handles only ToolUseBlock and deliberately ignores text: deltas carry the prose, blocks carry the actions, each said exactly once.
Right now you have: an HTTP endpoint that runs the full agent loop and narrates it live, in a six-word vocabulary, at whatever granularity the client can render. The only consumer is curl. That's Part 3's opening problem.
Break it on purpose: the sixth event
The error row in the table hasn't done anything yet, and an untested error path is a rumor, not a feature. The cheapest real failure available: make the agent's desk vanish. Stop the server, rename the workspace folder, restart, and ask anything:
{"type": "error", "message": "Working directory does not exist: workspace"}. Study what happened, because it's subtly important for every streaming API you'll ever build. The HTTP response already said 200 OK when the stream opened; status codes are spent before the first parcel ships. Past that point, errors have to be events, or they're silence. Our translator's except clause turned a Python exception into a parcel a client can render as a red banner. Rename the folder back, and this reflex comes with you for the rest of the series: whenever a turn can fail, the failure is a typed event on the belt, never a dropped connection.
The pipeline, drawn
Everything this part built, in one picture:
Three pieces, one job each. The engine does the work and speaks SDK. The translator speaks both languages and is the only code that does; when the SDK changes shape someday, one file cares. The response object keeps the socket open and pushes parcels. And the belt's contract at the bottom is what the whole series builds on: clients switch on type and skip what they don't know, so the vocabulary can grow forever without breaking anyone. If you've read the LangGraph series, this is Part 5's conveyor belt grown up: same wire format, but carrying a whole agent's narration instead of one model's tokens.
The cost ritual
Today's ledger, all real runs against this part's endpoint:
| Run | Result | Cost |
|---|---|---|
Blocking /chat, March question | right answer, 22s of silence | $0.0208 |
| Amnesia follow-up | "which store?", instantly | $0.0028 |
| Block-level stream, weekend question | right answer ($315,334.80, verified) | $0.0256 |
| Token-level stream, March question | right answer, fully narrated | $0.0216 |
The lesson this time is what didn't change: streaming is free. Block-level, token-level, and blocking runs of the same question all land within a cent of each other, because streaming changes when you see tokens, not how many exist. The complete event now delivers total_cost_usd to any client that connects, which means from Part 3 onward, showing the user the bill is a UI decision, not a plumbing project.
What you built
Part 2- An HTTP front door for the analyst:
POST /chatwith CORS pre-wired for Part 3's frontend, built at review speed on LangGraph Part 2's foundations. - The event vocabulary: six JSON parcel types on one SSE belt, with the parser contract (switch on
type, ignore the unknown) that lets twelve future parts add features without breaking a client. - A translator (
app/events.py) that is the only code speaking both SDK and wire, including theclip()rule: narration on the wire, data on the disk. - Streaming at two granularities:
StreamingResponsemade tool activity live, andinclude_partial_messages=Truemade the prose arrive token by token, with the double-render trap dodged deliberately. - Errors as events: a real failure (
Working directory does not exist) delivered as the sixth parcel type, because a 200-OK stream can't change its status code after the fact.
Test yourself
The Part 3 client will read this stream by switching on the type field and ignoring unknown types. Why does that second clause matter so much?
The workspace folder went missing and the agent couldn't start, yet /chat still returned HTTP 200. Why?
With include_partial_messages=True, why must the translator ignore TextBlock inside AssistantMessage?
One minute after the server named Downtown as March's best store, the follow-up 'how did that store do in April?' drew a blank. What's the root cause?
Why does the translator clip tool_result content at 2,000 characters instead of forwarding everything?
Commit it, from the project root, in a terminal that isn't hosting the server:
git add .git commit -m "part 2: the agent behind HTTP, narrating in six event types"Your analyst streams its whole investigation to anyone who can spell curl. Nobody who matters can spell curl. In Part 3 the real client arrives: a chat UI where prose types itself out, tool calls appear as live badges that resolve in place, and long turns show a working clock instead of a spinner of lies.
The complete, tested code for this part lives in part-02-fastapi-streaming in the companion repo. Code blocks with a GitHub icon link straight to the exact file; "View full file" shows the whole file in place with this section's changes highlighted.