Hello World

吞风吻雨葬落日 欺山赶海踏雪径

0%

Server-Sent Events (SSE)

最近对接AI服务,AI应答的内容是HTTP 流式返回的,应答的Content-Typetext/event-stream,查阅了些相关的资料,做下笔记。

HTTP流式返回

HTTP流式返回(HTTP Streaming) 是一种在客户端和服务器之间持续传输数据的技术,与传统的HTTP响应不同,后者通常在服务器处理完所有数据后一次性返回给客户端,流式返回允许服务器在处理请求时,不必等待所有数据准备完毕,就可以开始向客户端发送数据。这种方式对于处理大量数据或者长时间运行的任务非常有用,因为它可以提高用户体验,让客户端能够实时接收和处理数据,而不需要等待整个过程完成。

流式返回的实现方式主要有以下几种:

  1. 分块传输编码(Chunked Transfer Encoding):这是HTTP/1.1协议中的一种特性,允许服务器在不知道内容总长度的情况下,将数据分成多个块(chunks)发送给客户端。每个块包含数据和块长度信息,最后一个块的长度为0,表示传输结束。客户端可以边接收边处理这些数据块。
  2. Server-Sent Events (SSE) :SSE是一种允许服务器向客户端推送实时更新的技术。服务器通过创建一个HTTP连接,然后定期发送更新数据给客户端。客户端可以通过JavaScript的EventSource接口监听这些事件。SSE适用于单向通信,即服务器到客户端的数据流
  3. WebSockets:WebSocket提供了全双工的通信能力,允许服务器和客户端之间建立一个持久的连接,双方可以随时发送消息。虽然WebSocket不是HTTP协议的一部分,但它提供了更为灵活的实时通信能力,适用于需要双向通信的场景。

下面着重介绍一下Server-Sent Events (SSE)

SSE 详解

SSE是一种允许服务器向客户端推送实时更新的技术。它是HTML5规范的一部分,提供了一种轻量级的方法来实现服务器到客户端的单向通信。与传统的轮询和长轮询相比,SSE提供了更高效和实时的数据推送机制。

SSE 的特点

SSE 与 WebSocket 作用相似,都是建立浏览器与服务器之间的通信渠道,然后服务器向浏览器推送信息。总体来说,WebSocket 更强大和灵活。因为它是全双工通道,可以双向通信;SSE 是单向通道,只能服务器向浏览器发送,因为流信息本质上就是下载。如果浏览器向服务器发送信息,就变成了另一次 HTTP 请求。

20240309000001.jpg

但是,SSE 也有自己的优点:

  1. SSE E基于 HTTP 协议,现有的服务器软件都支持。WebSocket 是一个独立协议。
  2. SSE 属于轻量级,使用简单;WebSocket 协议相对复杂。
  3. SSE默认支持断线重连,如果连接意外关闭,客户端会自动尝试重新建立连接。WebSocket 需要自己实现。
  4. SSE 一般只用来传送文本,二进制数据需要编码后传送,WebSocket 默认支持传送二进制数据。
  5. SSE 支持自定义发送的消息类型。

SSE 数据格式

服务器向客户端发送的 SSE 数据,必须是 UTF-8 编码的文本,具有如下的 HTTP 头信息:

1
2
3
Content-Type: text/event-stream
Cache-Control: no-cache
Connection: keep-alive

Content-Type必须指定 MIME 类型为event-steam。每一次发送的信息,由若干个message组成,每个message之间用\n\n分隔。每个message内部由若干行组成,每一行都是如下格式:

1
[field]: value\n

上面的field可以取四个值:

  • data
  • id
  • event
  • retry

此外,还可以有冒号开头的行,表示注释。通常,服务器每隔一段时间就会向浏览器发送一个注释,保持连接不中断。

1
: This is a comment

data

数据内容用data字段表示。(规范上没有对data格式有任何限制,可以是纯文本或者是JSON、XML格式)

1
data:  message\n\n

如果数据很长,可以分成多行,最后一行用\n\n结尾,前面行都用\n结尾。

1
2
data: begin message\n
data: continue message\n\n

id

数据标识符用id字段表示,相当于每一条数据的编号。
浏览器用lastEventId属性读取这个值。一旦连接断线,浏览器会发送一个 HTTP 头,里面包含一个特殊的Last-Event-ID头信息,将这个值发送回来,用来帮助服务器端重建连接。因此,这个头信息可以被视为一种同步机制。

event

event字段表示自定义的事件类型,默认是message事件。浏览器可以用addEventListener()监听该事件。

1
2
3
4
5
6
7
event: foo\n
data: a foo event\n\n

data: an unnamed event\n\n

event: bar\n
data: a bar event\n\n

上面的代码创造了三条信息。第一条的名字是foo,触发浏览器的foo事件;第二条未取名,表示默认类型,触发浏览器的message事件;第三条是bar,触发浏览器的bar事件

retry

服务器可以用retry字段,指定浏览器重新发起连接的时间间隔。
两种情况会导致浏览器重新发起连接:一种是时间间隔到期,二是由于网络错误等原因,导致连接出错。

SSE完整例子

Java服务端(Spring Boot 3.x + JDK17)

StreamingController:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@GetMapping(value = "/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter streamSseMvc() {
SseEmitter emitter = new SseEmitter();

EXECUTE_SERVICE.execute(() -> {
try {
for (int i = 0; i < 15; i++) {
LocalDateTime now = LocalDateTime.now();
String data = now.format(DATE_FORMATTER_SS);
SseEmitter.SseEventBuilder event = SseEmitter.event()
.id(String.valueOf(i))
.data(data + "\nanother line.")
.name("test")
.reconnectTime(1000)
.comment("comment");
emitter.send(event);
Thread.sleep(1000);
}
emitter.complete();
} catch (Exception ex) {
emitter.completeWithError(ex);
}
});
return emitter;
}

报文格式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
id:0
data:20240309194825
data:another line.
event:test
retry:1000
:comment

id:1
data:20240309194827
data:another line.
event:test
retry:1000
:comment

id:2
data:20240309194828
data:another line.
event:test
retry:1000
:comment

......

id:10
data:20240309194836
data:another line.
event:test
retry:1000
:comment

Java客户端(HTTPClient)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
@Test
public void testSSE() {

RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(CONNECT_TIMEOUT)
.setSocketTimeout(CONNECT_TIMEOUT)
.setConnectionRequestTimeout(CONNECT_TIMEOUT)
.build();
// 创建HttpClient实例,并应用上面的超时设置
CloseableHttpClient httpClient = HttpClients.custom()
.setDefaultRequestConfig(requestConfig)
.build();

long start = System.currentTimeMillis();
try {

HttpGet httpGet = new HttpGet("http://localhost:8080/sse");
httpGet.addHeader("Content-Type", "application/json");
httpGet.addHeader("Accept", "*/*");
// 创建一个自定义的ResponseHandler来处理流式响应
ResponseHandler<String> responseHandler = response -> {
int status = response.getStatusLine().getStatusCode();
if (status != 200) {
throw new IllegalStateException("invalid http status code: " + status);
}

HttpEntity entity = response.getEntity();
if (entity == null) {
throw new IllegalStateException("invalid http response: null");
}

// 获取响应实体的内容流
try (InputStream inputStream = entity.getContent();
InputStreamReader inputStreamReader = new InputStreamReader(inputStream, StandardCharsets.UTF_8);
BufferedReader bufferedReader = new BufferedReader(inputStreamReader)) {

String result = "";
String line;
while ((line = bufferedReader.readLine()) != null) {
// 处理每一行数据
if (StringUtils.isNotBlank(line) && StringUtils.startsWith(line, "data:")) {
String data = StringUtils.trim(line.substring(5));
System.out.println(">>>" + data);
result = data;
}
}
return result;
}
};
// 执行HTTP POST请求,并通过ResponseHandler处理流式响应
String result = httpClient.execute(httpGet, responseHandler);
System.out.println("Last result is [" + result + "]");
} catch (Exception e) {
System.out.println("调用失败: " + e.getMessage());
} finally {
try {
httpClient.close();
} catch (Exception e) {
}
System.out.println("cost " + (System.currentTimeMillis() - start) / 1000 + " s");
}
}

参考

Chunked Transfer Coding RFC 9112 §7.1

Chunked transfer encoding Wiki

Server-sent_events Wiki

Server-Sent Events 教程

WebSocket

Server-Sent Events in Spring

How to stream data over HTTP using Node and Fetch API

Java Implementation for Streaming HTTP Response

基于HTTP流式传输的长时响应体验提升

HTTP Streaming (or Chunked vs Store & Forward)