mq 重构:默认的 redis 实现:
1)默认 channel 和 stream key 2)移除 enabled 开关,通过 listener 是否存在来实现 3)调整包名,为接入 rocketmq 作为 mq 实现做准备
This commit is contained in:
parent
d048daf7d6
commit
c066ea46f9
@ -1,8 +1,8 @@
|
||||
package cn.iocoder.yudao.framework.tenant.core.mq;
|
||||
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
|
||||
import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder;
|
||||
|
||||
import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID;
|
||||
|
@ -1,4 +1,5 @@
|
||||
/**
|
||||
* TODO 芋艿:调整注释
|
||||
* 消息队列,基于 Redis 提供:
|
||||
* 1. 基于 Pub/Sub 实现广播消费
|
||||
* 2. 基于 Stream 实现集群消费
|
||||
|
@ -1,21 +1,20 @@
|
||||
package cn.iocoder.yudao.framework.mq.config;
|
||||
package cn.iocoder.yudao.framework.mq.redis.config;
|
||||
|
||||
import cn.hutool.core.map.MapUtil;
|
||||
import cn.hutool.core.util.StrUtil;
|
||||
import cn.hutool.system.SystemUtil;
|
||||
import cn.iocoder.yudao.framework.common.enums.DocumentEnum;
|
||||
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
|
||||
import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
|
||||
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener;
|
||||
import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
|
||||
import cn.iocoder.yudao.framework.mq.job.RedisPendingMessageResendJob;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractChannelMessageListener;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractStreamMessageListener;
|
||||
import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.redisson.api.RedissonClient;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.boot.autoconfigure.AutoConfiguration;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
|
||||
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.data.redis.connection.RedisServerCommands;
|
||||
import org.springframework.data.redis.connection.stream.Consumer;
|
||||
@ -27,7 +26,6 @@ import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.data.redis.core.StringRedisTemplate;
|
||||
import org.springframework.data.redis.listener.ChannelTopic;
|
||||
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
|
||||
import org.springframework.data.redis.stream.DefaultStreamMessageListenerContainerX;
|
||||
import org.springframework.data.redis.stream.StreamMessageListenerContainer;
|
||||
import org.springframework.scheduling.annotation.EnableScheduling;
|
||||
|
||||
@ -42,7 +40,7 @@ import java.util.Properties;
|
||||
@Slf4j
|
||||
@EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息
|
||||
@AutoConfiguration(after = YudaoRedisAutoConfiguration.class)
|
||||
public class YudaoMQAutoConfiguration {
|
||||
public class YudaoRedisMQAutoConfiguration {
|
||||
|
||||
@Bean
|
||||
public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate,
|
||||
@ -60,7 +58,6 @@ public class YudaoMQAutoConfiguration {
|
||||
*/
|
||||
@Bean(initMethod = "start", destroyMethod = "stop")
|
||||
@ConditionalOnBean(AbstractChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听
|
||||
@ConditionalOnProperty(prefix = "yudao.mq.redis.pubsub", value = "enable", matchIfMissing = true) // 允许使用 yudao.mq.redis.pubsub.enable=false 禁用多租户
|
||||
public RedisMessageListenerContainer redisMessageListenerContainer(
|
||||
RedisMQTemplate redisMQTemplate, List<AbstractChannelMessageListener<?>> listeners) {
|
||||
// 创建 RedisMessageListenerContainer 对象
|
||||
@ -82,7 +79,6 @@ public class YudaoMQAutoConfiguration {
|
||||
*/
|
||||
@Bean
|
||||
@ConditionalOnBean(AbstractStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
|
||||
@ConditionalOnProperty(prefix = "yudao.mq.redis.stream", value = "enable", matchIfMissing = true) // 允许使用 yudao.mq.redis.stream.enable=false 禁用多租户
|
||||
public RedisPendingMessageResendJob redisPendingMessageResendJob(List<AbstractStreamMessageListener<?>> listeners,
|
||||
RedisMQTemplate redisTemplate,
|
||||
@Value("${spring.application.name}") String groupName,
|
||||
@ -92,12 +88,11 @@ public class YudaoMQAutoConfiguration {
|
||||
|
||||
/**
|
||||
* 创建 Redis Stream 集群消费的容器
|
||||
* <p>
|
||||
* Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html
|
||||
*
|
||||
* 基础知识:<a href="https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html">Redis Stream 的 xreadgroup 命令</a>
|
||||
*/
|
||||
@Bean(initMethod = "start", destroyMethod = "stop")
|
||||
@ConditionalOnBean(AbstractStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听
|
||||
@ConditionalOnProperty(prefix = "yudao.mq.redis.stream", value = "enable", matchIfMissing = true) // 允许使用 yudao.mq.redis.stream.enable=false 禁用多租户
|
||||
public StreamMessageListenerContainer<String, ObjectRecord<String, String>> redisStreamMessageListenerContainer(
|
||||
RedisMQTemplate redisMQTemplate, List<AbstractStreamMessageListener<?>> listeners) {
|
||||
RedisTemplate<String, ?> redisTemplate = redisMQTemplate.getRedisTemplate();
|
||||
@ -111,8 +106,7 @@ public class YudaoMQAutoConfiguration {
|
||||
.build();
|
||||
// 创建 container 对象
|
||||
StreamMessageListenerContainer<String, ObjectRecord<String, String>> container =
|
||||
// StreamMessageListenerContainer.create(redisTemplate.getRequiredConnectionFactory(), containerOptions);
|
||||
DefaultStreamMessageListenerContainerX.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);
|
||||
StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions);
|
||||
|
||||
// 第二步,注册监听器,消费对应的 Stream 主题
|
||||
String consumerName = buildConsumerName();
|
@ -1,10 +1,10 @@
|
||||
package cn.iocoder.yudao.framework.mq.core;
|
||||
package cn.iocoder.yudao.framework.mq.redis.core;
|
||||
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
|
||||
import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
|
||||
import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage;
|
||||
import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractChannelMessage;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractStreamMessage;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.Getter;
|
||||
import org.springframework.data.redis.connection.stream.RecordId;
|
@ -1,6 +1,6 @@
|
||||
package cn.iocoder.yudao.framework.mq.core.interceptor;
|
||||
package cn.iocoder.yudao.framework.mq.redis.core.interceptor;
|
||||
|
||||
import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
|
||||
/**
|
||||
* {@link AbstractRedisMessage} 消息拦截器
|
@ -1,8 +1,8 @@
|
||||
package cn.iocoder.yudao.framework.mq.job;
|
||||
package cn.iocoder.yudao.framework.mq.redis.core.job;
|
||||
|
||||
import cn.hutool.core.collection.CollUtil;
|
||||
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
|
||||
import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractStreamMessageListener;
|
||||
import lombok.AllArgsConstructor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.redisson.api.RLock;
|
@ -1,4 +1,4 @@
|
||||
package cn.iocoder.yudao.framework.mq.core.message;
|
||||
package cn.iocoder.yudao.framework.mq.redis.core.message;
|
||||
|
||||
import lombok.Data;
|
||||
|
@ -1,6 +1,6 @@
|
||||
package cn.iocoder.yudao.framework.mq.core.pubsub;
|
||||
package cn.iocoder.yudao.framework.mq.redis.core.pubsub;
|
||||
|
||||
import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
|
||||
/**
|
||||
@ -11,11 +11,13 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
public abstract class AbstractChannelMessage extends AbstractRedisMessage {
|
||||
|
||||
/**
|
||||
* 获得 Redis Channel
|
||||
* 获得 Redis Channel,默认使用类名
|
||||
*
|
||||
* @return Channel
|
||||
*/
|
||||
@JsonIgnore // 避免序列化。原因是,Redis 发布 Channel 消息的时候,已经会指定。
|
||||
public abstract String getChannel();
|
||||
public String getChannel() {
|
||||
return getClass().getSimpleName();
|
||||
}
|
||||
|
||||
}
|
@ -1,10 +1,10 @@
|
||||
package cn.iocoder.yudao.framework.mq.core.pubsub;
|
||||
package cn.iocoder.yudao.framework.mq.redis.core.pubsub;
|
||||
|
||||
import cn.hutool.core.util.TypeUtil;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
|
||||
import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
|
||||
import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
import lombok.Setter;
|
||||
import lombok.SneakyThrows;
|
||||
import org.springframework.data.redis.connection.Message;
|
@ -1,6 +1,6 @@
|
||||
package cn.iocoder.yudao.framework.mq.core.stream;
|
||||
package cn.iocoder.yudao.framework.mq.redis.core.stream;
|
||||
|
||||
import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
|
||||
/**
|
||||
@ -11,11 +11,13 @@ import com.fasterxml.jackson.annotation.JsonIgnore;
|
||||
public abstract class AbstractStreamMessage extends AbstractRedisMessage {
|
||||
|
||||
/**
|
||||
* 获得 Redis Stream Key
|
||||
* 获得 Redis Stream Key,默认使用类名
|
||||
*
|
||||
* @return Channel
|
||||
*/
|
||||
@JsonIgnore // 避免序列化
|
||||
public abstract String getStreamKey();
|
||||
public String getStreamKey() {
|
||||
return getClass().getSimpleName();
|
||||
}
|
||||
|
||||
}
|
@ -1,10 +1,10 @@
|
||||
package cn.iocoder.yudao.framework.mq.core.stream;
|
||||
package cn.iocoder.yudao.framework.mq.redis.core.stream;
|
||||
|
||||
import cn.hutool.core.util.TypeUtil;
|
||||
import cn.iocoder.yudao.framework.common.util.json.JsonUtils;
|
||||
import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate;
|
||||
import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor;
|
||||
import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor;
|
||||
import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
import lombok.SneakyThrows;
|
@ -0,0 +1,6 @@
|
||||
/**
|
||||
* 消息队列,基于 Redis 提供:
|
||||
* 1. 基于 Pub/Sub 实现广播消费
|
||||
* 2. 基于 Stream 实现集群消费
|
||||
*/
|
||||
package cn.iocoder.yudao.framework.mq.redis;
|
@ -1,62 +0,0 @@
|
||||
package org.springframework.data.redis.stream;
|
||||
|
||||
import cn.hutool.core.util.ReflectUtil;
|
||||
import org.springframework.data.redis.connection.RedisConnectionFactory;
|
||||
import org.springframework.data.redis.connection.stream.ByteRecord;
|
||||
import org.springframework.data.redis.connection.stream.ReadOffset;
|
||||
import org.springframework.data.redis.connection.stream.Record;
|
||||
import org.springframework.util.Assert;
|
||||
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.function.Function;
|
||||
|
||||
/**
|
||||
* 拓展 DefaultStreamMessageListenerContainer 实现,解决 Spring Data Redis + Redisson 结合使用时,Redisson 在 Stream 获得不到数据时,返回 null 而不是空 List,导致 NPE 异常。
|
||||
* 对应 issue:https://github.com/spring-projects/spring-data-redis/issues/2147 和 https://github.com/redisson/redisson/issues/4006
|
||||
* 目前看下来 Spring Data Redis 不肯加 null 判断,Redisson 暂时也没改返回 null 到空 List 的打算,所以暂时只能自己改,哽咽!
|
||||
*
|
||||
* @author 芋道源码
|
||||
*/
|
||||
public class DefaultStreamMessageListenerContainerX<K, V extends Record<K, ?>> extends DefaultStreamMessageListenerContainer<K, V> {
|
||||
|
||||
/**
|
||||
* 参考 {@link StreamMessageListenerContainer#create(RedisConnectionFactory, StreamMessageListenerContainerOptions)} 的实现
|
||||
*/
|
||||
public static <K, V extends Record<K, ?>> StreamMessageListenerContainer<K, V> create(RedisConnectionFactory connectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions<K, V> options) {
|
||||
Assert.notNull(connectionFactory, "RedisConnectionFactory must not be null!");
|
||||
Assert.notNull(options, "StreamMessageListenerContainerOptions must not be null!");
|
||||
return new DefaultStreamMessageListenerContainerX<>(connectionFactory, options);
|
||||
}
|
||||
|
||||
public DefaultStreamMessageListenerContainerX(RedisConnectionFactory connectionFactory, StreamMessageListenerContainerOptions<K, V> containerOptions) {
|
||||
super(connectionFactory, containerOptions);
|
||||
}
|
||||
|
||||
/**
|
||||
* 参考 {@link DefaultStreamMessageListenerContainer#register(StreamReadRequest, StreamListener)} 的实现
|
||||
*/
|
||||
@Override
|
||||
public Subscription register(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {
|
||||
return this.doRegisterX(getReadTaskX(streamRequest, listener));
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private StreamPollTask<K, V> getReadTaskX(StreamReadRequest<K> streamRequest, StreamListener<K, V> listener) {
|
||||
StreamPollTask<K, V> task = ReflectUtil.invoke(this, "getReadTask", streamRequest, listener);
|
||||
// 修改 readFunction 方法
|
||||
Function<ReadOffset, List<ByteRecord>> readFunction = (Function<ReadOffset, List<ByteRecord>>) ReflectUtil.getFieldValue(task, "readFunction");
|
||||
ReflectUtil.setFieldValue(task, "readFunction", (Function<ReadOffset, List<ByteRecord>>) readOffset -> {
|
||||
List<ByteRecord> records = readFunction.apply(readOffset);
|
||||
//【重点】保证 records 不是空,避免 NPE 的问题!!!
|
||||
return records != null ? records : Collections.emptyList();
|
||||
});
|
||||
return task;
|
||||
}
|
||||
|
||||
private Subscription doRegisterX(Task task) {
|
||||
return ReflectUtil.invoke(this, "doRegister", task);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1 +1 @@
|
||||
cn.iocoder.yudao.framework.mq.config.YudaoMQAutoConfiguration
|
||||
cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQAutoConfiguration
|
||||
|
@ -145,12 +145,6 @@ yudao:
|
||||
- cn.iocoder.yudao.module.pay.enums.ErrorCodeConstants
|
||||
- cn.iocoder.yudao.module.system.enums.ErrorCodeConstants
|
||||
- cn.iocoder.yudao.module.mp.enums.ErrorCodeConstants
|
||||
mq:
|
||||
redis:
|
||||
pubsub:
|
||||
enable: false # 是否开启 Redis pubsub 广播消费,默认为 true。这里设置成 false,可以按需开启
|
||||
stream:
|
||||
enable: false # 是否开启 Redis stream 集群消费,默认为 true。这里设置成 false,可以按需开启
|
||||
tenant: # 多租户相关配置项
|
||||
enable: true
|
||||
ignore-urls:
|
||||
|
Loading…
Reference in New Issue
Block a user