FastAPI + WebSocket/SSE 实现大模型流式输出:前后端落地指南

zhichao Lv3

这篇文章不是“教科书式 API 索引”,而是从一线问题出发,给出能在生产落地的做法:为什么需要流式、SSE 和 WebSocket 的边界、网关/代理的坑、如何监控与回滚。你可以把它当成一个可复用的“流式输出蓝图”。


0. 场景与约束(Problem → Constraints)

目标很朴素:聊天窗口里“边生成边展示”。但上线前先把边界画清楚:

  • 浏览器/企业网代理可能劫持/缓冲响应;SSE 受中间件影响更大。
  • 鉴权:SSE 无自定义头,Cookie/QueryToken/签名 URL 三选一。
  • 断线:移动端切网/锁屏会中断连接,需要可恢复(Last-Event-ID)。
  • 成本:流式连接本质是长连接,要算清并发和超时(idle timeout)。

设计原则:文本推送优先 SSE;需要双向交互/多路/工具调用用 WebSocket;两者都要保留短路回退(普通 HTTP)。


1. 架构与选型(Trade-offs)

1
2
Client ──(SSE/EventSource)──> API Gateway ──> FastAPI ──> LLM Provider/Local LLM
(WS/WebSocket) (no buffer) (backpressure, cancel, metrics)

SSE 优点:浏览器原生、简单、易缓存控制;缺点:单向、某些代理会缓冲。WS 优点:全双工、业务扩展空间大;缺点:状态管理和伸缩更复杂。生产里两者并存最稳。


2. 后端实现(可复现、可回滚)

下面这个实现关注“正确性”和“可运维性”,而不是只会 yield 文本:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
# pip install fastapi uvicorn sse-starlette anyio
from fastapi import FastAPI, Request, WebSocket, WebSocketDisconnect
from fastapi.responses import StreamingResponse
import asyncio, json, time

app = FastAPI()

async def token_source(prompt: str):
# 生产里替换为真实 LLM 流;这里模拟一个可取消的异步迭代器
for ch in "今天天气不错,我们来聊聊系统设计。":
await asyncio.sleep(0.03)
yield ch

async def sse_iter(prompt: str, last_id: int | None):
i = (last_id or 0)
try:
async for t in token_source(prompt):
i += 1
# id: 可用于断线续传;data: 业务负载
yield f"id: {i}\n" f"event: token\n" f"data: {json.dumps({'t': t})}\n\n"
yield "event: done\n" "data: {}\n\n"
except asyncio.CancelledError:
# 客户端断开一定会触发取消,正常处理即可
raise

@app.get("/chat/sse")
async def chat_sse(q: str, request: Request):
last = request.headers.get("last-event-id")
last_id = int(last) if last and last.isdigit() else None
return StreamingResponse(
sse_iter(q, last_id), media_type="text/event-stream",
headers={
# 禁止中间件缓冲/改写
"Cache-Control": "no-cache, no-transform",
"X-Accel-Buffering": "no",
},
)

# 可选:WebSocket 版本,适合工具调用/多路消息
@app.websocket("/chat/ws")
async def chat_ws(ws: WebSocket):
await ws.accept()
try:
while True:
payload = await ws.receive_text()
async for t in token_source(payload):
await ws.send_json({"event": "token", "t": t})
await ws.send_json({"event": "done"})
except WebSocketDisconnect:
pass

运行:

1
uvicorn main:app --reload --port 8000

3. 前端实现(稳定优先)

原生 SSE(含断线续传与节流):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<pre id="out"></pre>
<script>
const out = document.getElementById('out')
let lastId = localStorage.getItem('last_id')
function connect(){
const es = new EventSource('/chat/sse?q=你好', {
withCredentials: true
})
es.onmessage = (e) => {
// 只有默认 event 才会走这里;我们在服务端显式使用 event: token
out.textContent += e.data
}
es.addEventListener('token', (e)=>{
const data = JSON.parse(e.data)
out.textContent += data.t
})
es.addEventListener('done', ()=> es.close())
es.addEventListener('error', ()=> es.close())
}
connect()
// Safari 早期版本对 EventSource 支持不稳,可退回 fetch+ReadableStream
</script>

React Hook(WS 版,简化错误处理):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import { useEffect, useRef, useState } from 'react'

export default function useChatWS(url){
const [text, setText] = useState('')
const wsRef = useRef(null)
useEffect(()=>{
const ws = new WebSocket(url)
wsRef.current = ws
ws.onmessage = (e)=>{
const msg = JSON.parse(e.data)
if(msg.event === 'token') setText(t=> t + msg.t)
}
return ()=> ws.close()
}, [url])
return { text, send: (q)=> wsRef.current?.send(q) }
}

4. 反向代理与网关(最容易踩坑的部分)

Nginx:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# SSE:核心是禁缓冲 + 不改写
location /chat/sse {
proxy_pass http://127.0.0.1:8000;
proxy_http_version 1.1; # HTTP/2 也可,但很多人配置不当被缓存
proxy_buffering off;
proxy_cache off;
chunked_transfer_encoding off;
proxy_set_header Connection '';
}

# WebSocket
location /chat/ws {
proxy_pass http://127.0.0.1:8000;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}

Cloudflare/其他 CDN:开启 Cache: Bypasstext/event-stream,并调整 Idle timeout,否则长连会被重置。


5. 监控、压测与回滚(Run → Observe → Rollback)

  • 指标:连接数、存活时长、TTFB(首字节时间)、中断率、重连次数/小时。
  • 压测:流式需要“长时间少量并发 + 间歇突增”两种模型。工具可选 k6/oha/autocannon,重点观察连接上限与代理超时。
  • 回滚:保留 POST /chat 的非流式接口,前端收到异常自动切回普通请求。

Prometheus 计数器示例:

1
2
3
4
5
6
7
8
# pip install prometheus-client
from prometheus_client import Counter
SSE_CONN = Counter('sse_conn_total', 'sse connections')

@app.get('/chat/sse')
async def chat_sse(q: str, request: Request):
SSE_CONN.inc()
...

6. 排障清单(Troubleshooting Checklist)

  • 页面一直“转圈”但无输出:检查网关是否 proxy_buffering off
  • 连接 1 分钟固定断:CDN/负载均衡的 Idle Timeout;与上游保持一致或发送心跳。
  • SSE 无法鉴权:不要自定义头,改为 Cookie / 签名 URL;或改走 WS。
  • iOS/Safari 间歇丢字:退回 fetch + ReadableStream 实现。
  • 返回被合并成一个块:上游/中间件进行了压缩或缓冲,确认 Cache-Control: no-transform

7. 成功标准(Definition of Done)

  • 首屏 TTFB 可视化(监控/前端埋点)稳定在目标阈值内。
  • 意外断开能在 3 秒内自动重连并续传(Last-Event-ID)。
  • 有清晰回退路径:SSE → WS → 普通 HTTP。
  • 有稳定的观察面:连接数、错误分类、版本标识与回滚按钮。

这套做法的价值不是“能跑”,而是“能上线、能回滚、能排障、能量化”。把它收进你的基建库,以后所有 LLM 产品都能复用。

  • Title: FastAPI + WebSocket/SSE 实现大模型流式输出:前后端落地指南
  • Author: zhichao
  • Created at : 2024-04-20 20:00:00
  • Updated at : 2025-10-07 20:03:01
  • Link: https://chozzc.me/2024/04/20/2024-04-tech-fastapi-streaming/
  • License: This work is licensed under CC BY-NC-SA 4.0.
Comments