Redis 发布与订阅

本文最后更新于:2025年2月18日 下午

Redis的发布与订阅是实时产生的,不会保存消息记录,若当前用户订阅了A组,当B组来消息后,再切换到B组,也不会收到B组之前的消息。

配置依赖

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
<!-- Redis 启动依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

<!-- Redis 专属连接池 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.20</version>
</dependency>

<!-- websocket 依赖包含 web依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-websocket</artifactId>
</dependency>

配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
server:
port: 80
spring:
# Redis配置
redis:
# 指定数据库(Redis一共有16个数据库0-15,这里指定用0号数据库)
database: 0
# 主机地址
host: 127.0.0.1
# Redis 运行端口
port: 6379
# Redis 密码 可不设置
password:
# jedis与lettuce是两种不同的Redis客户端实现,这俩都提供了操作Redis数据库的API,
# 但在spring整合包中都被Spring集成封装了统一的调用方法,两者具体差别请自行百度搜索。
jedis:
# 连接池
pool:
# 最大空闲连接
max-idle: 30
# 最大连接等待时间
max-wait: 100ms
# 最小空闲连接
min-idle: 5
# 最大活动连接
max-active: 50

配置类

RedisConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.listener.adapter.MessageListenerAdapter;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.GenericJackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Configuration
public class RedisConfig {

@Bean
public RedisMessageListenerContainer redisContainer(
RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, topic());
return container;
}

@Bean
public MessageListenerAdapter listenerAdapter(RedisMessageSubscriber subscriber) {
MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(subscriber, "onMessage");
messageListenerAdapter.setSerializer(new GenericJackson2JsonRedisSerializer());
return messageListenerAdapter;
}

@Bean
public RedisTemplate<String,Object> redisTemplate(RedisConnectionFactory connectionFactory) {
RedisTemplate<String,Object> template = new RedisTemplate<>();
template.setConnectionFactory(connectionFactory);
Jackson2JsonRedisSerializer<Object> ojjrs = new Jackson2JsonRedisSerializer<>(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.PROPERTY);
ojjrs.setObjectMapper(om);
StringRedisSerializer srs = new StringRedisSerializer();
// 设置key和HashKey的序列化
// 这里参数内等同于RedisSerializer.string()都是指向了StringRedisSerializer里的静态常量UTF_8
template.setKeySerializer(srs);
template.setHashKeySerializer(srs);
//设置Value和HashValue的序列化
template.setValueSerializer(ojjrs);
template.setHashValueSerializer(ojjrs);
return template;
}

@Bean
public ChannelTopic topic() {
return new ChannelTopic("/topic/group/1");
}
}

实体类

ChatMessage.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
import lombok.Data;

import java.io.Serializable;

/**
* 消息类
* @author peter
* @date 2024/9/25
* @description 这里必须实现序列化,因为redis存储时需要将此类序列化
*/
@Data
public class ChatMessage implements Serializable {

// 群组id
private String groupId;
// 用户id
private String userId;
// 用户名
private String username;
// 消息内容
private String content;
}

消息订阅类

RedisMessageSubscriber.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Service;

/**
* Redis消息订阅类
* @author peter
* @date 2024/9/26
* @description 接收发布的所有信息
*/
@Service
public class RedisMessageSubscriber{

// SimpMessagingTemplate 是 Spring Framework 中用于支持 WebSocket 消息传递的一个重要类。它提供了高层次的抽象,简化了消息的发送和接收过程。
@Autowired
private SimpMessagingTemplate messagingTemplate;

/**
* 获得订阅的消息
* @author peter
* @date 2024/9/26
* @param chatMessage 聊天信息
* @description Redis指定订阅的频道消息都会发到这里来进行处理
*/
public void onMessage(ChatMessage chatMessage) {
// 通过websocket将消息发布出去
messagingTemplate.convertAndSend("/topic/group/"+chatMessage.getGroupId(),chatMessage);
// 处理接收到的群聊消息
System.out.println("收到的消息: " +chatMessage);
}
}

视图控制类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.simp.SimpMessagingTemplate;
import org.springframework.stereotype.Controller;

@Controller
public class ChatController {

@Autowired
private RedisTemplate<String,Object> redisTemplate;

@MessageMapping("/chat.sendMessage")
public void sendMessage(ChatMessage chatMessage) {
// 群组消息发布到特定的群组话题
redisTemplate.convertAndSend("/topic/group/" + chatMessage.getGroupId(), chatMessage);
}
}

前端页面

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Redis订阅与转发</title>
</head>
<body>
<form>

<input type="hidden" name="userId" value="001">
<input type="hidden" name="username" value="admin">
<label for="chat">
群组
</label><br>
<input id="chat" type="text" name="groupId" placeholder="请输入要发送的群组id"><br>
<label for="content">
信息
</label><br>
<textarea id="content" name="content" placeholder="请输入内容"></textarea><br>
<button type="submit">提交</button><br>
</form>
<button onclick="changeGroup(1)">切换频道1</button>
<button onclick="changeGroup(2)">切换频道2</button>
<button onclick="changeGroup(3)">切换频道3</button>
<script src="/js/jquery.js"></script>
<script src="/js/sockjs.js"></script>
<script src="/js/stomp.js"></script>
<script>
var socket = new SockJS('/chat-websocket');
var stompClient = Stomp.over(socket);
var groupId = 1;

$('form').on('submit', function(event) {
event.preventDefault(); // 阻止默认提交行为

// 将表单数据序列化为对象
const formData = {
groupId: $('input[name="groupId"]').val(),
userId: $('input[name="userId"]').val(),
username: $('input[name="username"]').val(),
content: $('textarea[name="content"]').val(),
};
sendMessage(formData.userId,formData.username,formData.content,formData.groupId);
});

let currentSubscription;

stompClient.connect({}, function (frame) {
// 订阅不同的群组
currentSubscription = stompClient.subscribe('/topic/group/' + groupId, function (messageOutput) {
// 处理收到的消息
//console.log(messageOutput.body);
//showMessage(JSON.parse(messageOutput.body));
});
});

function sendMessage(userId,username,messageContent, groupId) {
stompClient.send("/app/chat.sendMessage", {}, JSON.stringify({
userId: userId,
username: username,
content: messageContent,
groupId: groupId
}));
}


function subscribeToGroup(groupId) {
// 如果已经订阅了其他频道,先取消之前的订阅
if (currentSubscription) {
currentSubscription.unsubscribe();
}

// 订阅新的频道
currentSubscription = stompClient.subscribe('/topic/group/' + groupId, function(message) {
// console.log('Received message for group ' + groupId + ': ' + message.body);
//displayMessage(JSON.parse(message.body));
});
}

// 切换群组时订阅新的群组
function changeGroup(newGroupId) {
subscribeToGroup(newGroupId);
}

</script>
</body>
</html>

Redis 发布与订阅
https://superlovelace.top/2024/09/26/Redis订阅与转发/
作者
棱境
发布于
2024年9月26日
更新于
2025年2月18日
许可协议