Skip to main content

Installation

uv add streamstraight-server

Usage

Streamstraight requires you tag every stream with a unique streamId; we recommend using an identifier that maps to the downstream client session or request.

Tee a stream to both Streamstraight and HTTP response

For production applications, you often want to send chunks to both your HTTP response and Streamstraight simultaneously, while ensuring the LLM stream completes even if the client disconnects. This pattern uses an asyncio.Queue to decouple the HTTP response from stream processing:
Python
import os
import asyncio
from collections.abc import AsyncIterator

from fastapi import APIRouter
from fastapi.responses import StreamingResponse

from streamstraight_server import streamstraight_server

router = APIRouter()


async def generate_response() -> AsyncIterator[str]:
  # Replace with your LLM / tool invocation
  yield "hello"
  yield " world"


@router.post("/create-stream")
async def create_stream() -> StreamingResponse:
  llm_stream = generate_response()

  ss_server = await streamstraight_server(
      {"api_key": os.getenv("STREAMSTRAIGHT_API_KEY")},
      {"stream_id": "your-stream-id"}
  )
  writer_context = ss_server.stream_writer()

  # Queue ensures LLM response finishes even if HTTP client disconnects
  stream_queue: asyncio.Queue[str | object] = asyncio.Queue()
  _STREAM_SENTINEL = object()

  async def fan_out_stream():
    """Consume LLM stream and write to both Streamstraight and queue"""
    writer_failed = False
    try:
      async with writer_context as writer:
        async for chunk in llm_stream:
          if not writer_failed:
            try:
              await writer.send(chunk)
            except Exception as exc:  # pylint: disable=broad-except
              writer_failed = True
              # Streamstraight failed, but continue streaming locally
          await stream_queue.put(chunk)
    finally:
      await stream_queue.put(_STREAM_SENTINEL)

  # Start fan-out task
  fan_task = asyncio.create_task(fan_out_stream())

  async def response_stream():
    """Read from queue and yield to HTTP response"""
    try:
      while True:
        chunk = await stream_queue.get()
        if chunk is _STREAM_SENTINEL:
          break
        yield chunk
      await fan_task
    except asyncio.CancelledError:
      # Client disconnected - shield fan-out task so LLM stream completes
      await asyncio.shield(fan_task)
      raise

  return StreamingResponse(response_stream(), media_type="text/event-stream")
When a client/browser disconnects from an ASGI server (FastAPI, Starlette, etc.), the server raises asyncio.CancelledError to clean up the response generator. Without the Queue pattern, this cancellation propagates through the coroutines, immediately closes writer_context, and stops the llm_stream loop mid-generation without fully waiting for the LLM stream to complete.What happens without the Queue:
  1. Client disconnects or refreshes the page
  2. ASGI server cancels the stream_body() coroutine
  3. The async with writer_context block receives cancellation and closes
  4. The generate_response() iterator stops yielding chunks
  5. Your LLM stream is interrupted, even though you’re still generating tokens
Why this is problematic:
  • You may have code that persists the LLM stream to your database run at the end of the loop. Because the loop is interrupted, this code will not run.
  • Streamstraight won’t have the complete stream for users to resume later.
  • Your application loses data that’s already been generated
How the Queue pattern solves this:The Queue decouples the HTTP response lifecycle from the stream processing lifecycle:
  • The fan_out_stream() task runs independently and consumes the entire LLM stream
  • The response_stream() generator reads from the queue and yields to the HTTP response
  • When the client disconnects, asyncio.shield() protects the fan-out task from cancellation
This ensures your application continues processing the LLM stream even if the client disconnects on ASGI servers.

Pipe an async iterator directly

Already have an async iterator or generator producing chunks? Hand it straight to the SDK and we’ll stream it for you.
Python
import os
from collections.abc import AsyncIterator

from fastapi import APIRouter
from fastapi.responses import StreamingResponse

from streamstraight_server import streamstraight_server

router = APIRouter()


async def generate_response() -> AsyncIterator[dict[str, str]]:
  # Replace with your LLM / tool invocation
  yield {"content": "hello"}
  yield {"content": " world"}


@router.post("/create-stream-iterator")
async def create_stream_iterator() -> StreamingResponse:
  server = await streamstraight_server(
      {"api_key": os.getenv("STREAMSTRAIGHT_API_KEY")},
      {"stream_id": "your-stream-id"},
  )  # connects and configures the stream

  # Streamstraight consumes the iterator directly
  await server.stream(generate_response())

  return StreamingResponse(("done\n",), media_type="text/plain")
Both approaches share the same acknowledgement and end-frame semantics; choose whichever fits your application flow.

Fetching your JWT token

Streamstraight requires your frontend client to connect via a JWT token. Our SDK contains helper functions that generates a JWT token from your API key.
FastAPI
from fastapi import FastAPI
from streamstraight_server import fetch_client_token

app = FastAPI()

@app.post("/api/streamstraight-token")
async def get_token() -> dict[str, str]:

  token = await fetch_client_token({"api_key": os.getenv("STREAMSTRAIGHT_API_KEY")})

  return {"jwt_token": token}