diff --git a/pom.xml b/pom.xml
index bba8d18cc..96e0e08ce 100644
--- a/pom.xml
+++ b/pom.xml
@@ -22,15 +22,15 @@
${java.version}
3.8.0
- 2.4.2
+ 2.4.4
3.0.2
1.5.22
5.1.46
1.2.4
- 3.4.1
- 3.14.1
+ 3.4.2
+ 3.15.1
1.7.0
@@ -42,7 +42,7 @@
1.16.14
1.4.1.Final
- 5.5.6
+ 5.6.1
2.2.7
2.2
1.0.5
@@ -249,27 +249,7 @@
cn.hutool
- hutool-core
- ${hutool.version}
-
-
- cn.hutool
- hutool-extra
- ${hutool.version}
-
-
- cn.hutool
- hutool-captcha
- ${hutool.version}
-
-
- cn.hutool
- hutool-http
- ${hutool.version}
-
-
- cn.hutool
- hutool-crypto
+ hutool-all
${hutool.version}
diff --git a/src/main/java/cn/iocoder/dashboard/framework/redis/config/RedisConfig.java b/src/main/java/cn/iocoder/dashboard/framework/redis/config/RedisConfig.java
index 13cda9dbb..f069738e5 100644
--- a/src/main/java/cn/iocoder/dashboard/framework/redis/config/RedisConfig.java
+++ b/src/main/java/cn/iocoder/dashboard/framework/redis/config/RedisConfig.java
@@ -1,21 +1,22 @@
package cn.iocoder.dashboard.framework.redis.config;
-import cn.hutool.core.net.NetUtil;
+import cn.hutool.system.SystemUtil;
import cn.iocoder.dashboard.framework.redis.core.pubsub.AbstractChannelMessageListener;
import cn.iocoder.dashboard.framework.redis.core.stream.AbstractStreamMessageListener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
-import org.springframework.data.redis.connection.stream.*;
+import org.springframework.data.redis.connection.stream.Consumer;
+import org.springframework.data.redis.connection.stream.ObjectRecord;
+import org.springframework.data.redis.connection.stream.ReadOffset;
+import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
-import org.springframework.util.ErrorHandler;
-import java.time.Duration;
import java.util.List;
/**
@@ -25,6 +26,9 @@ import java.util.List;
@Slf4j
public class RedisConfig {
+ /**
+ * 创建 RedisTemplate Bean,使用 JSON 序列化方式
+ */
@Bean
public RedisTemplate redisTemplate(RedisConnectionFactory factory) {
// 创建 RedisTemplate 对象
@@ -33,11 +37,16 @@ public class RedisConfig {
template.setConnectionFactory(factory);
// 使用 String 序列化方式,序列化 KEY 。
template.setKeySerializer(RedisSerializer.string());
+ template.setHashKeySerializer(RedisSerializer.string());
// 使用 JSON 序列化方式(库是 Jackson ),序列化 VALUE 。
template.setValueSerializer(RedisSerializer.json());
+ template.setHashValueSerializer(RedisSerializer.json());
return template;
}
+ /**
+ * 创建 Redis Pub/Sub 广播消费的容器
+ */
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer(RedisConnectionFactory factory,
List> listeners) {
@@ -54,52 +63,48 @@ public class RedisConfig {
return container;
}
+ /**
+ * 创建 Redis Stream 集群消费的容器
+ *
+ * Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html
+ */
@Bean(initMethod = "start", destroyMethod = "stop")
- public StreamMessageListenerContainer> redisStreamMessageListenerContainer(
- RedisConnectionFactory factory, List> listeners) {
- // 创建配置对象
- StreamMessageListenerContainer.StreamMessageListenerContainerOptions>
- streamMessageListenerContainerOptions = StreamMessageListenerContainer.StreamMessageListenerContainerOptions
- .builder()
- // 一次性最多拉取多少条消息
- .batchSize(10)
- // 执行消息轮询的执行器
- // .executor(this.threadPoolTaskExecutor)
- // 消息消费异常的handler
- .errorHandler(new ErrorHandler() {
- @Override
- public void handleError(Throwable t) {
- // throw new RuntimeException(t);
- t.printStackTrace();
- }
- })
- // 超时时间,设置为0,表示不超时(超时后会抛出异常)
- .pollTimeout(Duration.ZERO)
- // 序列化器
- .serializer(RedisSerializer.string())
- .targetType(String.class)
- .build();
+ public StreamMessageListenerContainer> redisStreamMessageListenerContainer(RedisTemplate redisTemplate,
+ List> listeners) {
+ // 第一步,创建 StreamMessageListenerContainer 容器
+ // 创建 options 配置
+ StreamMessageListenerContainer.StreamMessageListenerContainerOptions> containerOptions =
+ StreamMessageListenerContainer.StreamMessageListenerContainerOptions.builder()
+ .batchSize(10) // 一次性最多拉取多少条消息
+ .targetType(String.class) // 目标类型。统一使用 String,通过自己封装的 AbstractStreamMessageListener 去反序列化
+ .build();
+ // 创建 container 对象
+ StreamMessageListenerContainer> container = StreamMessageListenerContainer.create(
+ redisTemplate.getRequiredConnectionFactory(), containerOptions);
- // 根据配置对象创建监听容器对象
- StreamMessageListenerContainer> container = StreamMessageListenerContainer
- .create(factory, streamMessageListenerContainerOptions);
-
- RedisTemplate redisTemplate = redisTemplate(factory);
-
- // 使用监听容器对象开始监听消费(使用的是手动确认方式)
- String consumerName = NetUtil.getLocalHostName(); // TODO 需要优化下,晚点参考下 rocketmq consumer 的
- for (AbstractStreamMessageListener> listener : listeners) {
+ // 第二步,注册监听器,消费对应的 Stream 主题
+ String consumerName = buildConsumerName();
+ listeners.forEach(listener -> {
+ // 创建 listener 对应的消费者分组
try {
redisTemplate.opsForStream().createGroup(listener.getStreamKey(), listener.getGroup());
- } catch (Exception ignore) {
-// ignore.printStackTrace();
- }
-
- container.receive(Consumer.from(listener.getGroup(), consumerName),
- StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed()), listener);
- }
-
+ } catch (Exception ignore) {}
+ // 创建 Consumer 对象
+ Consumer consumer = Consumer.from(listener.getGroup(), consumerName);
+ // 设置 Consumer 消费进度,以最小消费进度为准
+ StreamOffset streamOffset = StreamOffset.create(listener.getStreamKey(), ReadOffset.lastConsumed());
+ // 设置 Consumer 监听
+ StreamMessageListenerContainer.StreamReadRequestBuilder builder = StreamMessageListenerContainer.StreamReadRequest
+ .builder(streamOffset).consumer(consumer)
+ .autoAcknowledge(false) // 不自动 ack
+ .cancelOnError(throwable -> false); // 默认配置,发生异常就取消消费,显然不符合预期;因此,我们设置为 false
+ container.register(builder.build(), listener);
+ });
return container;
}
+ private static String buildConsumerName() {
+ return String.format("%s@%d", SystemUtil.getHostInfo().getAddress(), SystemUtil.getCurrentPID());
+ }
+
}
diff --git a/src/main/java/cn/iocoder/dashboard/framework/redis/core/stream/AbstractStreamMessageListener.java b/src/main/java/cn/iocoder/dashboard/framework/redis/core/stream/AbstractStreamMessageListener.java
index 6a029507c..7621d3638 100644
--- a/src/main/java/cn/iocoder/dashboard/framework/redis/core/stream/AbstractStreamMessageListener.java
+++ b/src/main/java/cn/iocoder/dashboard/framework/redis/core/stream/AbstractStreamMessageListener.java
@@ -46,6 +46,9 @@ public abstract class AbstractStreamMessageListener
@Override
public void onMessage(ObjectRecord message) {
System.out.println(message);
+ if (true) {
+// throw new IllegalStateException("测试下");
+ }
}
/**
diff --git a/src/main/java/cn/iocoder/dashboard/framework/redis/core/util/RedisMessageUtils.java b/src/main/java/cn/iocoder/dashboard/framework/redis/core/util/RedisMessageUtils.java
index 8d01d4cf0..9331606af 100644
--- a/src/main/java/cn/iocoder/dashboard/framework/redis/core/util/RedisMessageUtils.java
+++ b/src/main/java/cn/iocoder/dashboard/framework/redis/core/util/RedisMessageUtils.java
@@ -31,7 +31,7 @@ public class RedisMessageUtils {
* @param message 消息
* @return 消息记录的编号对象
*/
- public static RecordId sendStreamMessage(RedisTemplate redisTemplate, T message) {
+ public static RecordId sendStreamMessage(RedisTemplate redisTemplate, T message) {
return redisTemplate.opsForStream().add(StreamRecords.newRecord()
.ofObject(JsonUtils.toJsonString(message)) // 设置内容
.withStreamKey(message.getStreamKey())); // 设置 stream key
diff --git a/src/test-integration/java/cn/iocoder/dashboard/framework/redis/core/stream/RedisStreamTest.java b/src/test-integration/java/cn/iocoder/dashboard/framework/redis/core/stream/RedisStreamTest.java
index 022f45bfd..3d0d8a249 100644
--- a/src/test-integration/java/cn/iocoder/dashboard/framework/redis/core/stream/RedisStreamTest.java
+++ b/src/test-integration/java/cn/iocoder/dashboard/framework/redis/core/stream/RedisStreamTest.java
@@ -9,6 +9,7 @@ import cn.iocoder.dashboard.modules.system.mq.message.mail.SysMailSendMessage;
import cn.iocoder.dashboard.modules.system.mq.message.sms.SysSmsSendMessage;
import org.junit.jupiter.api.Test;
import org.springframework.context.annotation.Import;
+import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import javax.annotation.Resource;
@@ -31,13 +32,18 @@ public class RedisStreamTest {
@Resource
private StringRedisTemplate stringRedisTemplate;
+ @Resource
+ private RedisTemplate redisTemplate;
+
@Test
public void testProducer01() {
- // 创建消息
- SysSmsSendMessage message = new SysSmsSendMessage();
- message.setMobile("15601691300").setTemplateCode("test");
- // 发送消息
- RedisMessageUtils.sendStreamMessage(stringRedisTemplate, message);
+ for (int i = 0; i < 100; i++) {
+ // 创建消息
+ SysSmsSendMessage message = new SysSmsSendMessage();
+ message.setMobile("15601691300").setTemplateCode("test:" + i);
+ // 发送消息
+ RedisMessageUtils.sendStreamMessage(stringRedisTemplate, message);
+ }
}
@Test