最近在对接 deepseek 的 API,请求路径为 前端 => 后端(springboot)=> 私有部署的 deepseek API。deepseek API 本身是支持 stream 流式输出的,但因为中间加了一层后端,来做一些上下文的处理工作,所以在 deepseek API 返回流式数据的时候,也需要流式返回给前端才行。
什么是Server-Sent Events(SSE)
关于 Server-Sent Events(SSE) 的介绍,网络上已经有很多,这里只是简单说一下。
SSE 是建立在 HTTP 协议上的,服务端 单向 向客户端传递信息的技术,就像下载一样。客户端向服务端请求后,服务端向客户端返回text/event-stream
类型的响应类型,说明接下来会有好多次数据分批发送,客户端不关闭连接,而是一直等待数据传递。
与 SSE 相似的技术是 websocket,但 websocket 是双向通信的,在只需要服务端向客户端推送的场景下,SSE 更加轻量,也无需额外的协议。
在SpringBoot中实现SSE
SpringMVC 和 Spring WebFlux 都可以实现 SSE,并且不需要引入spring-boot-starter-web
或 spring-boot-starter-webflux
之外的依赖。在 SpringMVC 中,提供了SseEmitter
来简化操作,而在Spring WebFlux 中,实现 SSE 更加自然。
SpringMVC
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter test(String msg){
//SSE 的接口就像其他接口一样,只不过创建一个SseEmitter对象,并返回就好了
SseEmitter emitter = new SseEmitter();
//SseEmitter 可以注册几个回调函数,用来感知变化
sseEmitter.onCompletion(() -> {
// 在sseEmitter结束其生命周期的时候会被调用,如
// 客户端断开连接、超时
// 或者服务端调用sseEmitter.complete()
//可以对所有请求产生的 Emitter 进行管理,避免内存泄露等问题
emitters.remove(sseEmitter);
});
sseEmitter.onError(error -> {
//发生错误时
emitters.remove(sseEmitter);
});
sseEmitter.onTimeout(() -> {
//超时时回调,SseEmitter的默认超时时间是 30s,如果超过了这个时间,会执行此回调
//在 AI 场景下,很容易超过 30s,可以在创建 SseEmitter 的时候指定超时时间
emitters.remove(sseEmitter);
});
//在单独的其他线程中,调用SseEmitter.send 方法向服务端发送数据
new Thread(() -> {
try {
for (int i = 0; i < 10; i++) {
SseEmitter.SseEventBuilder event = SseEmitter.event()
.data("Message " + i)
.id(String.valueOf(i))
.name("message-event");
emitter.send(event);
Thread.sleep(1000); // 模拟延迟
}
emitter.complete(); // 完成发送
} catch (IOException | InterruptedException e) {
emitter.completeWithError(e); // 处理错误
}
}).start()
return emitter;
}
ps:由于是第一次尝试使用SseEmitter
,从 start.spring.io 上选择了最新的 3.4.4 版本,生成了一个项目,结果发现onCompletion
不执行,不能正常回调,网上找不到相关信息,最后换了 3.3.10 版本才正常了。这也说明,新的版本,尤其是最后一位比较小的时候,很可能会有新版本的 bug,后面还是谨慎选择最新版本,尤其是最后一位比较小的版本。或者在遇到诡异的问题时,及时更换为旧的版本。
SpringWebFlux
在Spring WebFlux,可以更简洁地实现 SSE
@GetMapping(value = "/reactive-sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> handleReactiveSse() {
return Flux.interval(Duration.ofSeconds(1)) // 每秒生成一个事件
.map(sequence -> "Message " + sequence);
}
测试
可以通过 curl 、浏览器(IE 除外,因为 IE 不支持 SSE)中直接访问,或者写一段 js 代码来测试 SSE 的效果
const eventSource = new EventSource('/sse');
eventSource.onmessage = function(event) {
console.log(event)
};
eventSource.onerror = function(error) {
console.error('EventSource failed:', error);
eventSource.close();
};