Skip to main content
/ 8 min read

Streaming LLM API Responses in Python: A Complete Production Guide

Handle token-by-token output, implement back-pressure, manage rate limits, and build fault-tolerant wrappers around OpenAI-compatible APIs

Featured image for Streaming LLM API Responses in Python: A Complete Production Guide - Handle token-by-token output, implement back-pressure, manage rate limits, and build fault-tolerant wrappers around OpenAI-compatible APIs

"Streaming is the difference between an AI product that feels fast and one that feels slow. Instead of waiting 10–30 seconds for a completed response, a streaming API delivers each token as it is generated. This post covers the full picture: HTTP server-sent events, Python async generators, rate limit handling, and building a robust wrapper you can drop into a production application."

The vast majority of LLM API tutorials show the simplest possible usage: send a prompt, await a response, print the result. In a demo that works fine. In a product that ships to users — where a query might take 20 seconds to complete — it is a bad experience you can measure in bounce rates.

Streaming APIs solve this. The model sends each token as it generates it, and your application forwards it to the client immediately. The user sees text appearing word by word, perceiving the system as fast even if total generation time is unchanged. This is how every serious consumer AI product works — and it requires a slightly different programming model than the blocking request/response cycle.

How Streaming Works at the Protocol Level

LLM streaming uses Server-Sent Events (SSE), a lightweight HTTP protocol for server-to-client one-way streaming. The response is a stream of text lines, each prefixed data: , terminated with a blank line:

data: {"id":"chatcmpl-abc","choices":[{"delta":{"content":"The"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","choices":[{"delta":{"content":" weather"},"finish_reason":null}]}

data: {"id":"chatcmpl-abc","choices":[{"delta":{"content":" today"},"finish_reason":null}]}

data: [DONE]

Your client reads this line by line, strips data: , parses the JSON, and extracts the delta.content field. The [DONE] sentinel signals the end of the stream.

Understanding the raw protocol matters because it explains why naive requests usage doesn’t work for streaming — you need to use stream=True and iterate over response lines, or use a library that handles SSE natively.

SSE streaming flow diagram: LLM API → HTTP chunked stream → FastAPI StreamingResponse → Browser EventSource → token-by-token UI render, with time-to-first-token timing bar

The Minimal Streaming Client

Using the official OpenAI Python library:

from openai import OpenAI

client = OpenAI()  # Reads OPENAI_API_KEY from environment

def stream_completion(prompt: str, model: str = "gpt-4o") -> None:
    with client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        stream=True,
    ) as stream:
        for chunk in stream:
            if chunk.choices[0].delta.content is not None:
                print(chunk.choices[0].delta.content, end="", flush=True)
    print()  # Final newline

stream_completion("Explain how HTTP server-sent events work in three paragraphs.")

The stream=True parameter switches the API call from returning a completed ChatCompletion object to returning a context manager that yields ChatCompletionChunk objects. Each chunk contains a delta — the difference from the previous chunk — rather than the full message.

Async Streaming with httpx

For production applications you almost certainly want async I/O. The OpenAI client has an async version, or you can go lower-level with httpx for full control:

import httpx
import json
import asyncio
from typing import AsyncGenerator

async def stream_tokens(
    prompt: str,
    api_key: str,
    model: str = "gpt-4o",
    base_url: str = "https://api.openai.com/v1",
) -> AsyncGenerator[str, None]:
    """
    Async generator that yields individual token strings as they arrive.
    Compatible with any OpenAI-compatible API (Ollama, Together, Groq, etc.)
    """
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Content-Type": "application/json",
    }
    payload = {
        "model": model,
        "messages": [{"role": "user", "content": prompt}],
        "stream": True,
    }

    async with httpx.AsyncClient(timeout=60.0) as client:
        async with client.stream(
            "POST",
            f"{base_url}/chat/completions",
            headers=headers,
            json=payload,
        ) as response:
            response.raise_for_status()

            async for line in response.aiter_lines():
                if not line or line == "data: [DONE]":
                    continue
                if line.startswith("data: "):
                    data = json.loads(line[6:])  # Strip "data: " prefix
                    delta = data["choices"][0]["delta"]
                    if content := delta.get("content"):
                        yield content


# Usage
async def main():
    async for token in stream_tokens("What is the Turing test?", api_key="sk-..."):
        print(token, end="", flush=True)
    print()

asyncio.run(main())

The key pattern here is AsyncGenerator[str, None] — a function that yields values asynchronously. Callers can consume it with async for, compose it with other async generators, or pass it directly to a FastAPI StreamingResponse.

Wiring Streaming into a FastAPI Endpoint

This is the production pattern for serving streaming LLM output over HTTP:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio

app = FastAPI()

async def token_stream_sse(prompt: str, api_key: str):
    """
    Wraps the stream_tokens generator in SSE format for browser consumption.
    """
    async for token in stream_tokens(prompt, api_key):
        # SSE format: "data: <payload>\n\n"
        yield f"data: {json.dumps({'token': token})}\n\n"
    yield "data: [DONE]\n\n"


@app.post("/generate")
async def generate(prompt: str, api_key: str):
    return StreamingResponse(
        token_stream_sse(prompt, api_key),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # Critical: disables Nginx response buffering
        }
    )

The X-Accel-Buffering: no header is non-obvious but important. Without it, Nginx (the most common reverse proxy) buffers the entire response before forwarding, defeating the purpose of streaming.

On the client side, the browser consumes this with the EventSource API:

const source = new EventSource(`/generate?prompt=${encodeURIComponent(prompt)}`);

source.onmessage = (event) => {
    if (event.data === '[DONE]') {
        source.close();
        return;
    }
    const { token } = JSON.parse(event.data);
    appendToOutput(token);
};

Counting Tokens During Streaming

LLM APIs charge per token. Estimating cost before the response completes is useful for budget guards in production systems. The tiktoken library (the same tokeniser used internally by OpenAI) lets you count tokens locally without an API call:

import tiktoken

def get_encoder(model: str) -> tiktoken.Encoding:
    try:
        return tiktoken.encoding_for_model(model)
    except KeyError:
        return tiktoken.get_encoding("cl100k_base")  # Fallback

class TokenCounter:
    def __init__(self, model: str):
        self.enc = get_encoder(model)
        self.prompt_tokens = 0
        self.completion_tokens = 0

    def count_prompt(self, messages: list[dict]) -> int:
        total = 0
        for message in messages:
            total += 4  # Per-message overhead
            total += len(self.enc.encode(message.get("content", "")))
        self.prompt_tokens = total
        return total

    def record_token(self, token_str: str) -> None:
        """Call this each time a streaming token arrives."""
        self.completion_tokens += len(self.enc.encode(token_str))

    def estimated_cost_usd(
        self,
        prompt_cost_per_1m: float = 2.50,   # gpt-4o pricing as of 2026
        completion_cost_per_1m: float = 10.00,
    ) -> float:
        prompt_cost = (self.prompt_tokens / 1_000_000) * prompt_cost_per_1m
        completion_cost = (self.completion_tokens / 1_000_000) * completion_cost_per_1m
        return prompt_cost + completion_cost

Integrating with the streaming generator:

async def stream_with_cost(prompt: str, api_key: str, model: str = "gpt-4o"):
    counter = TokenCounter(model)
    counter.count_prompt([{"role": "user", "content": prompt}])

    full_response = []

    async for token in stream_tokens(prompt, api_key, model):
        counter.record_token(token)
        full_response.append(token)
        yield token

    total = "".join(full_response)
    cost = counter.estimated_cost_usd()
    print(f"\n[{counter.completion_tokens} tokens | ${cost:.5f}]")

Handling Rate Limits with Exponential Back-off

LLM API rate limits are a fact of production life — especially when you are batching many requests. The correct pattern is exponential back-off with jitter:

import asyncio
import random
import httpx
from functools import wraps

class RateLimitError(Exception):
    def __init__(self, retry_after: float | None = None):
        self.retry_after = retry_after

async def with_exponential_backoff(
    coro_fn,
    max_retries: int = 6,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    *args,
    **kwargs,
):
    """
    Retry an async function with exponential back-off on rate limit errors.
    """
    for attempt in range(max_retries):
        try:
            return await coro_fn(*args, **kwargs)

        except httpx.HTTPStatusError as e:
            if e.response.status_code == 429:
                retry_after = float(e.response.headers.get("retry-after", 0))
                delay = retry_after or min(
                    max_delay,
                    base_delay * (2 ** attempt) + random.uniform(0, 1)
                )
                print(f"Rate limited. Waiting {delay:.1f}s (attempt {attempt + 1}/{max_retries})")
                await asyncio.sleep(delay)
            else:
                raise  # Non-rate-limit errors propagate immediately

    raise RuntimeError(f"Failed after {max_retries} retries")

A Production Document Summarisation Pipeline

Putting it together: a pipeline that summarises a list of documents concurrently, respects rate limits, streams output for real-time display, and tracks cost.

import asyncio
from asyncio import Semaphore

async def summarise_document(
    text: str,
    api_key: str,
    semaphore: Semaphore,
    model: str = "gpt-4o",
) -> str:
    """Summarise a single document, respecting the concurrency semaphore."""
    prompt = f"""Summarise the following document in 3 bullet points. Be concise.

Document:
{text[:8000]}  # Truncate to avoid context limit

Summary:"""

    async with semaphore:  # Limit concurrent API calls
        chunks = []
        async for token in stream_tokens(prompt, api_key, model):
            print(token, end="", flush=True)
            chunks.append(token)
        print()
        return "".join(chunks)


async def batch_summarise(
    documents: list[str],
    api_key: str,
    max_concurrent: int = 5,
) -> list[str]:
    """
    Summarise documents concurrently with a concurrency cap.
    max_concurrent=5 keeps us safely under typical RPM limits.
    """
    semaphore = Semaphore(max_concurrent)
    tasks = [
        summarise_document(doc, api_key, semaphore)
        for doc in documents
    ]
    return await asyncio.gather(*tasks)


# Run the pipeline
if __name__ == "__main__":
    documents = [
        "Python is a high-level, interpreted programming language...",
        "Rust is a systems programming language focused on safety...",
        "WebAssembly is a binary instruction format for a stack-based virtual machine...",
    ]
    summaries = asyncio.run(batch_summarise(documents, api_key="sk-..."))
    for i, summary in enumerate(summaries, 1):
        print(f"\n=== Document {i} ===\n{summary}")

The Semaphore(5) is the practical rate limit lever. If the API allows 100 requests per minute (RPM), 5 concurrent requests with an average latency of 3 seconds each means roughly 100 RPM — right at the limit, without exceeding it.

Comparing Streaming vs Blocking for UX

MetricBlockingStreaming
Time to first tokenFull generation time~200–500ms
Perceived latencyHighLow
Implementation complexityLowModerate
CostIdenticalIdentical
Suitable for long responsesPoorExcellent
Suitable for structured JSONGoodRequires accumulation

One important trade-off: if you need to parse the entire response as structured JSON (e.g. using a JSON output mode), you cannot do anything useful with individual tokens — you must accumulate the full response first. In these cases, streaming is still worth enabling for the 200ms time-to-first-byte improvement, but the user-visible benefit is smaller.

Further Reading