FastAPI + WebSocket/SSE 实现大模型流式输出:前后端落地指南
这篇文章不是“教科书式 API 索引”,而是从一线问题出发,给出能在生产落地的做法:为什么需要流式、SSE 和 WebSocket 的边界、网关/代理的坑、如何监控与回滚。你可以把它当成一个可复用的“流式输出蓝图”。
0. 场景与约束(Problem → Constraints)
目标很朴素:聊天窗口里“边生成边展示”。但上线前先把边界画清楚:
- 浏览器/企业网代理可能劫持/缓冲响应;SSE 受中间件影响更大。
- 鉴权:SSE 无自定义头,Cookie/QueryToken/签名 URL 三选一。
- 断线:移动端切网/锁屏会中断连接,需要可恢复(Last-Event-ID)。
- 成本:流式连接本质是长连接,要算清并发和超时(idle timeout)。
设计原则:文本推送优先 SSE;需要双向交互/多路/工具调用用 WebSocket;两者都要保留短路回退(普通 HTTP)。
1. 架构与选型(Trade-offs)
1 2
| Client ──(SSE/EventSource)──> API Gateway ──> FastAPI ──> LLM Provider/Local LLM (WS/WebSocket) (no buffer) (backpressure, cancel, metrics)
|
SSE 优点:浏览器原生、简单、易缓存控制;缺点:单向、某些代理会缓冲。WS 优点:全双工、业务扩展空间大;缺点:状态管理和伸缩更复杂。生产里两者并存最稳。
2. 后端实现(可复现、可回滚)
下面这个实现关注“正确性”和“可运维性”,而不是只会 yield 文本:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect from fastapi.responses import StreamingResponse import asyncio, json, time
app = FastAPI()
async def token_source(prompt: str): for ch in "今天天气不错,我们来聊聊系统设计。": await asyncio.sleep(0.03) yield ch
async def sse_iter(prompt: str, last_id: int | None): i = (last_id or 0) try: async for t in token_source(prompt): i += 1 yield f"id: {i}\n" f"event: token\n" f"data: {json.dumps({'t': t})}\n\n" yield "event: done\n" "data: {}\n\n" except asyncio.CancelledError: raise
@app.get("/chat/sse") async def chat_sse(q: str, request: Request): last = request.headers.get("last-event-id") last_id = int(last) if last and last.isdigit() else None return StreamingResponse( sse_iter(q, last_id), media_type="text/event-stream", headers={ "Cache-Control": "no-cache, no-transform", "X-Accel-Buffering": "no", }, )
@app.websocket("/chat/ws") async def chat_ws(ws: WebSocket): await ws.accept() try: while True: payload = await ws.receive_text() async for t in token_source(payload): await ws.send_json({"event": "token", "t": t}) await ws.send_json({"event": "done"}) except WebSocketDisconnect: pass
|
运行:
1
| uvicorn main:app --reload --port 8000
|
3. 前端实现(稳定优先)
原生 SSE(含断线续传与节流):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| <pre id="out"></pre> <script> const out = document.getElementById('out') let lastId = localStorage.getItem('last_id') function connect(){ const es = new EventSource('/chat/sse?q=你好', { withCredentials: true }) es.onmessage = (e) => { out.textContent += e.data } es.addEventListener('token', (e)=>{ const data = JSON.parse(e.data) out.textContent += data.t }) es.addEventListener('done', ()=> es.close()) es.addEventListener('error', ()=> es.close()) } connect() </script>
|
React Hook(WS 版,简化错误处理):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| import { useEffect, useRef, useState } from 'react'
export default function useChatWS(url){ const [text, setText] = useState('') const wsRef = useRef(null) useEffect(()=>{ const ws = new WebSocket(url) wsRef.current = ws ws.onmessage = (e)=>{ const msg = JSON.parse(e.data) if(msg.event === 'token') setText(t=> t + msg.t) } return ()=> ws.close() }, [url]) return { text, send: (q)=> wsRef.current?.send(q) } }
|
4. 反向代理与网关(最容易踩坑的部分)
Nginx:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| location /chat/sse { proxy_pass http://127.0.0.1:8000; proxy_http_version 1.1; proxy_buffering off; proxy_cache off; chunked_transfer_encoding off; proxy_set_header Connection ''; }
location /chat/ws { proxy_pass http://127.0.0.1:8000; proxy_http_version 1.1; proxy_set_header Upgrade $http_upgrade; proxy_set_header Connection "upgrade"; }
|
Cloudflare/其他 CDN:开启 Cache: Bypass 对 text/event-stream,并调整 Idle timeout,否则长连会被重置。
5. 监控、压测与回滚(Run → Observe → Rollback)
- 指标:连接数、存活时长、TTFB(首字节时间)、中断率、重连次数/小时。
- 压测:流式需要“长时间少量并发 + 间歇突增”两种模型。工具可选
k6/oha/autocannon,重点观察连接上限与代理超时。
- 回滚:保留
POST /chat 的非流式接口,前端收到异常自动切回普通请求。
Prometheus 计数器示例:
1 2 3 4 5 6 7 8
| from prometheus_client import Counter SSE_CONN = Counter('sse_conn_total', 'sse connections')
@app.get('/chat/sse') async def chat_sse(q: str, request: Request): SSE_CONN.inc() ...
|
6. 排障清单(Troubleshooting Checklist)
- 页面一直“转圈”但无输出:检查网关是否
proxy_buffering off。
- 连接 1 分钟固定断:CDN/负载均衡的 Idle Timeout;与上游保持一致或发送心跳。
- SSE 无法鉴权:不要自定义头,改为 Cookie / 签名 URL;或改走 WS。
- iOS/Safari 间歇丢字:退回
fetch + ReadableStream 实现。
- 返回被合并成一个块:上游/中间件进行了压缩或缓冲,确认
Cache-Control: no-transform。
7. 成功标准(Definition of Done)
- 首屏 TTFB 可视化(监控/前端埋点)稳定在目标阈值内。
- 意外断开能在 3 秒内自动重连并续传(Last-Event-ID)。
- 有清晰回退路径:SSE → WS → 普通 HTTP。
- 有稳定的观察面:连接数、错误分类、版本标识与回滚按钮。
这套做法的价值不是“能跑”,而是“能上线、能回滚、能排障、能量化”。把它收进你的基建库,以后所有 LLM 产品都能复用。