SSE

本文最后更新于:2025年6月28日 晚上

Server-Sent Events (SSE)

Server-Sent Events (SSE) 是一种允许服务器向客户端实时推送数据的Web技术,它基于HTTP协议,提供了一种简单有效的服务器到客户端的单向通信机制。

注意:SSE更适配响应式编程

响应式编程(WebFlux)的优势

响应式栈(如WebFlux + Reactor)的SSE实现更高效:

(简单示例)

1
2
3
4
5
6
7
8
9
10
@RestController
public class ReactiveSseController {
private final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();

@GetMapping(path = "/events", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ServerSentEvent<String>> streamEvents() {
return sink.asFlux()
.map(data -> ServerSentEvent.builder(data).build());
}
}

优势

特性 阻塞式(MVC) 响应式(WebFlux)
线程占用 1连接=1线程 固定少量IO线程
内存消耗 每个连接独立缓冲 共享背压缓冲
广播复杂度 O(n)手动发送 O(1)自动多播
扩展性 千级连接 万级连接

适用场景选择

✅ 适合响应式SSE的场景:

  • 高并发(如实时仪表盘、股票行情)
  • 需要组合多个异步数据源(如数据库变更+消息队列)
  • 长周期事件流(如文件导入进度)

❌ 适合传统阻塞式的场景:

  • 简单低频通知(如单个用户订单状态更新)
  • 遗留系统无法升级响应式栈
  • 需要与阻塞式库(如JPA/Hibernate)深度集成

性能对比数据

在相同4核8G服务器上:

连接数 Spring MVC SSE WebFlux SSE
1,000 线程池耗尽 内存占用200MB
10,000 崩溃 内存占用1.2GB
延迟 15-30ms 2-8ms

基本概念

SSE 的主要特点包括:

  • 单向通信:仅服务器可以向客户端推送数据
  • 基于HTTP:使用标准HTTP协议,不需要特殊协议
  • 文本格式:数据以纯文本格式传输
  • 自动重连:内置连接断开后的自动重连机制
  • 简单API:浏览器端使用简单的EventSource API

工作原理

  1. 客户端通过EventSource对象发起连接

  2. 服务器保持连接打开,以text/event-stream格式发送数据

  3. 数据以流的形式持续发送,格式为:

    1
    2
    event: message
    data: 这是消息内容\n\n

与WebSocket的比较

特性 SSE WebSocket
通信方向 单向(服务器→客户端) 双向
协议 HTTP 独立的ws/wss协议
数据格式 文本 文本或二进制
自动重连 支持 需要手动实现
复杂度 简单 较复杂
浏览器支持 广泛(除IE) 广泛

适用场景

SSE 非常适合以下场景:

  • 实时通知系统
  • 新闻/股票行情推送
  • 社交媒体动态更新
  • 实时日志监控
  • 需要简单实时功能的场景

注意事项

  1. 浏览器兼容性:大多数现代浏览器支持,但IE不支持
  2. 连接限制:浏览器对每个源的SSE连接数有限制(通常6个)
  3. 代理问题:某些代理服务器可能缓冲SSE流
  4. 数据格式:仅支持文本,不支持二进制数据

SSE 提供了一种轻量级的实时通信解决方案,在不需要双向通信的场景下,比WebSocket更简单易用。

代码示例(阻塞式):

扩展:像集成微信扫码登录这种,可以通过SSE的方式替代前端轮询扫码登录状态的大量无效请求开销。大致流程就是:前端生成的唯一设备id,请求登录二维码,后端返回二维码并将设备id作为SSE连接id,然后启动SSE线程。前端启动SSE事件监听,前后端心跳测试正常。登录成功后,向前端发送登录成功事件通知,前端跳转到登录页。

服务端(后端)

Maven依赖

1
2
3
4
5
<!-- SpringBoot中已经有SseEmitter了,所以不需要额外引入其他包。 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

spring配置文件

application.yaml

1
2
3
4
5
6
7
server:
port: 8080
servlet:
encoding:
charset: UTF-8
force: true
enabled: true

跨源配置

Webconfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import org.springframework.context.annotation.Configuration;
import org.springframework.web.servlet.config.annotation.CorsRegistry;
import org.springframework.web.servlet.config.annotation.WebMvcConfigurer;


/**
* 跨源配置
* @author peter
*/
@Configuration
public class WebConfig implements WebMvcConfigurer {

@Override
public void addCorsMappings(CorsRegistry registry) {
registry.addMapping("/**")
.allowedOrigins("*")
.allowedMethods("GET", "HEAD", "POST", "PUT", "DELETE", "OPTIONS")
.allowCredentials(false)
.maxAge(3600)
.allowedHeaders("*");
}
}

异常类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* SSE异常信息
*/
public class SseException extends RuntimeException {
public SseException() {
}

public SseException(String message) {
super(message);
}

public SseException(String message, Throwable cause) {
super(message, cause);
}

public SseException(Throwable cause) {
super(cause);
}

public SseException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) {
super(message, cause, enableSuppression, writableStackTrace);
}
}

存储连接

使用ConcurrentHashMap存储

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import com.qrlogin.exception.SseException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledFuture;

/**
* SSE Session
* @author peter
*/
public class SseSession {

private SseSession (){
throw new IllegalStateException("Utility class");
}

private static final Logger logger = LoggerFactory.getLogger(SseSession.class);

/**
* Session维护Map
*/
private static final Map<String, SseEmitter> SESSION = new ConcurrentHashMap<>();

/**
* 判断Session是否存在
*
* @param id 客户端ID
* @return
*/
public static boolean exist(String id) {
return SESSION.get(id) == null;
}

/**
* 增加Session
*
* @param id 客户端ID
* @param emitter SseEmitter
*/
public static void add(String id, SseEmitter emitter) {
final SseEmitter oldEmitter = SESSION.get(id);
if (oldEmitter != null) {
oldEmitter.completeWithError(new SseException("RepeatConnect(Id:" + id + ")"));
}
SESSION.put(id, emitter);
}


/**
* 删除Session
*
* @param id 客户端ID
* @return
*/
public static boolean del(String id) {
final SseEmitter emitter = SESSION.remove(id);
if (emitter != null) {
emitter.complete();
return true;
}
return false;
}

/**
* 发送消息
*
* @param id 客户端ID
* @param msg 发送的消息
* @return
*/
public static boolean send(String id,String name, Object msg) {
if (id == null){

for (Map.Entry<String, SseEmitter> entry : SESSION.entrySet()){
try {
SseEmitter emitter = entry.getValue();
emitter.send(
SseEmitter.event()
.name("broadcast")
.data(msg)
.reconnectTime(30_000L));
return true;
} catch (IOException e) {
logger.error("MSG: SendMessageError-IOException | ID: " + id + " | Date: " + new Date() + " |", e);
}
}
}else {
SseEmitter emitter = SESSION.get(id);
if (emitter != null) {
try {
emitter.send(
SseEmitter.event()
.name(name)
.data(msg).
reconnectTime(30_000L));
return true;
} catch (IOException e) {
logger.error("MSG: SendMessageError-IOException | ID: " + id + " | Date: " + new Date() + " |", e);
return false;
}
}
}
return false;
}

// 发送无事件名的简单消息
public static void sendSimpleMessage(String id, Object msg) {
SseEmitter emitter = SESSION.get(id);
if (emitter != null) {
try {
emitter.send(
SseEmitter.event()
.data(msg).
reconnectTime(30_000L));
} catch (IOException e) {
logger.error("MSG: SendMessageError-IOException | ID: " + id + " | Date: " + new Date() + " |", e);
}
}
}

/**
* SseEmitter onCompletion 后执行的逻辑
*
* @param id 客户端ID
* @param future
*/
public static void onCompletion(String id, ScheduledFuture<?> future) {
SESSION.remove(id);
if (future != null) {
// SseEmitter断开后需要中断心跳发送
future.cancel(true);
}
}

/**
* SseEmitter onTimeout 或 onError 后执行的逻辑
*
* @param id
* @param e
*/
public static void onError(String id, SseException e) {
final SseEmitter emitter = SESSION.get(id);
if (emitter != null) {
// 这里会触发错误监听的同时,执行完成监听方法终止心跳线程
emitter.completeWithError(e);
SESSION.remove(id);
}
}
}

心跳任务

用来检测对方是否还存在

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import com.qrlogin.session.SseSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Date;

/**
* 心跳任务
* @author peter
*/
public class HeartBeatTask implements Runnable {
private static final Logger logger = LoggerFactory.getLogger(HeartBeatTask.class);

private final String clientId;

public HeartBeatTask(String clientId) {
// 这里可以按照业务传入需要的数据
this.clientId = clientId;
}

@Override
public void run() {
logger.info("MSG: SseHeartbeat | ID: {} | Date: {}", clientId, new Date());

SseSession.send(clientId,"system_inform", "ping");
}
}

SSE业务类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import com.qrlogin.exception.SseException;
import com.qrlogin.service.SseService;
import com.qrlogin.session.SseSession;
import com.qrlogin.task.HeartBeatTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.util.Date;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;


/**
* SSE 相关业务实现
*/
@Service
public class SseServiceImpl implements SseService {
private static final Logger logger = LoggerFactory.getLogger(SseServiceImpl.class);

/**
* 发送心跳线程池
*/
private static ScheduledExecutorService heartbeatExecutors = Executors.newScheduledThreadPool(2);

/**
* 新建连接
*
* @param clientId 客户端ID
* @return
*/
@Override
public SseEmitter start(String clientId) {
// 默认30秒超时,设置为0L则永不超时
SseEmitter emitter = new SseEmitter(30_000L);
logger.info("MSG: SseConnect | EmitterHash: {} | ID: {} | Date: {}", emitter.hashCode(), clientId, new Date());
// 添加连接
SseSession.add(clientId, emitter);
// 添加心跳任务
final ScheduledFuture<?> future = heartbeatExecutors.scheduleAtFixedRate(new HeartBeatTask(clientId), 0, 10, TimeUnit.SECONDS);
// 添加完成事件
emitter.onCompletion(() -> {
logger.info("MSG: SseConnectCompletion | EmitterHash: {} |ID: {} | Date: {}", emitter.hashCode(), clientId, new Date());
// 完成后,移除session连接
SseSession.onCompletion(clientId, future);
});
// 添加超时配置
emitter.onTimeout(() -> {
logger.error("MSG: SseConnectTimeout | EmitterHash: {} |ID: {} | Date: {}", emitter.hashCode(), clientId, new Date());
SseSession.onError(clientId, new SseException("TimeOut(clientId: " + clientId + ")"));
});

// 添加错误配置
emitter.onError(t -> {
logger.error("MSG: SseConnectError | EmitterHash: {} |ID: {} | Date: {}", emitter.hashCode(), clientId, new Date());
SseSession.onError(clientId, new SseException("Error(clientId: " + clientId + ")"));
});
return emitter;
}

/**
* 广播发送
* @param msg
* @return
*/
@Override
public String broadcastSend(String msg) {
String name = "systemNotification";
SseSession.send(null,name, msg);
return "OK";
}

/**
* 定向发送数据
* @param clientId 客户端ID
* @return
*/
@Override
public String send(String clientId) {
if (SseSession.send(clientId,"broadcast", System.currentTimeMillis())) {
return "Succeed!";
}
return "error";
}

/**
* 关闭连接
*
* @param clientId 客户端ID
* @return
*/
@Override
public String close(String clientId) {
logger.info("MSG: SseConnectClose | ID: {} | Date: {}", clientId, new Date());
if (SseSession.del(clientId)) {
return "Succeed!";
}
return "Error!";
}
}

Controller

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import com.qrlogin.session.SseSession;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

/**
* @author peter
* @date 2025/6/28
*/
@Controller
public class IndexController {

@GetMapping("/")
public String login()
{
return "login";
}

@GetMapping("/index")
public String index()
{
return "index";
}

@ResponseBody
@GetMapping("/test")
public String test(@RequestParam(value = "id",required = false) String id)
{
// 模拟获取登录二维码
return "/sse/start/?clientId="+id;
}

@ResponseBody
@GetMapping("/direct")
public void direct(String id) {
SseSession.send(id, "direct", "test");
}

@GetMapping("/simple")
public void simple(String id)
{
// 由前端eventSource.onmessage接收消息
SseSession.sendSimpleMessage(id, "simple");
}

@ResponseBody
@GetMapping("/inform")
public void inform(String id)
{
// 模拟访问扫码登录接口,发送登录成功消息给客户端
SseSession.send(id, "login_success", "OK");
}
}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import com.qrlogin.service.SseService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

/**
* SSE测试控制器
*/
@RestController
@RequestMapping("sse")
public class SseTestController {
private static final Logger logger = LoggerFactory.getLogger(SseTestController.class);

@Autowired
private SseService sseService;

@RequestMapping("start")
public SseEmitter start(@RequestParam String clientId) {
return sseService.start(clientId);
}

/**
* 发送定向消息
* @param clientId
* @return
*/
@GetMapping("send")
public String send(@RequestParam String clientId) {
return sseService.send(clientId);
}

/**
* 发送广播消息
* @param msg
* @return
*/
@GetMapping("sendBroadcast")
public String sendBroadcast(@RequestParam String msg) {
return sseService.broadcastSend(msg);
}

/**
* 将SseEmitter对象设置成完成
*
* @param clientId
* @return
*/
@RequestMapping("/end")
public String close(String clientId) {
return sseService.close(clientId);
}

}


客户端(前端)

login.html

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
<!DOCTYPE html>
<html>
<head>
<title>测试SSE消息推送</title>
</head>
<body>
<button id="login-btn">模拟获取登录二维码按钮</button>
<button id="reconnect-btn">手动重连按钮</button>
<script>
let retryCount = 0;
const maxRetries = 5;
let eventSource;
const clientId = 'user_' + Math.random().toString(36).substr(2, 9);
function connectSSE(url) {

eventSource = new EventSource(url);

eventSource.onopen = function(e) {
retryCount = 0;
console.log("连接已建立");
};

// 监听消息(无事件名称的消息)
eventSource.onmessage = function(e) {
console.log("收到消息:", e.data);
};
// 监听定向消息(指定发送到这个客户端的,这些都是根据自己需要定义)
eventSource.addEventListener('direct', function(e) {
console.log("定向消息:", e.data);
});
// 监听广播消息
eventSource.addEventListener('broadcast', function(e) {
console.log("广播消息:", e.data);
});
// 监听系统消息
eventSource.addEventListener('system_inform', function(e) {
console.log("心跳检测消息:", e.data);
});

// 监听登录成功消息
eventSource.addEventListener('login_success', function(e) {

if (e.data === 'OK') {
location.href = '/index';
}
});

eventSource.onerror = function(e) {
console.log("错误重试调用");
if (e.eventPhase === EventSource.CLOSED) {
console.log("连接已超时,正在重试!");
//eventSource.close();
} else {
console.log("连接出错");
}
if (retryCount++ < maxRetries) {
console.log(`尝试第${retryCount}次重连...`);
} else {
console.log("达到最大重试次数,停止重连");
retryCount=0;
eventSource.close();
}
};
}

// 初始连接
//connectSSE();

// 手动重连按钮
document.getElementById('reconnect-btn').addEventListener('click', function() {
if (eventSource) eventSource.close();
retryCount = 0;
connectSSE();
});

// 手动重连按钮
document.getElementById('login-btn').addEventListener('click', function() {
console.log("模拟获取登录二维码");
const xhr = new XMLHttpRequest();
xhr.open('GET', 'http://localhost:8080/test?id='+clientId, true);
// xhr.setRequestHeader('Content-Type', 'application/json'); // 设置请求头

xhr.onload = function() {
if (xhr.status === 200) {
console.log('收到监听地址:', xhr.responseText);

connectSSE("http://localhost:8080"+xhr.responseText);
}
};

xhr.setRequestHeader("Content-Type", "application/x-www-form-urlencoded");

xhr.send();

});
</script>
</body>
</html>

index.html

1
2
3
4
5
6
7
8
9
10
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>首页</title>
</head>
<body>
<h1>登录成功!</h1>
</body>
</html>

相关示例参考:https://github.com/zuster/my-demo-springboot-sse?tab=readme-ov-file

代码示例(响应式):

待本人以后学习完响应式编程后,再来完善。


SSE
https://superlovelace.top/2025/04/10/SSE/
作者
棱境
发布于
2025年4月10日
更新于
2025年6月28日
许可协议