diff --git a/yudao-dependencies/pom.xml b/yudao-dependencies/pom.xml index e558536ff..dbd6e2f3c 100644 --- a/yudao-dependencies/pom.xml +++ b/yudao-dependencies/pom.xml @@ -30,6 +30,8 @@ 1.4.7 3.18.0 8.1.3.62 + + 2.2.3 2.2.5 1.7.1 @@ -96,11 +98,6 @@ yudao-spring-boot-starter-biz-operatelog ${revision} - - cn.iocoder.boot - yudao-spring-boot-starter-biz-trade - ${revision} - cn.iocoder.boot yudao-spring-boot-starter-biz-dict @@ -260,6 +257,12 @@ ${revision} + + org.apache.rocketmq + rocketmq-spring-boot-starter + ${rocketmq-spring.version} + + cn.iocoder.boot diff --git a/yudao-framework/yudao-common/src/main/java/cn/iocoder/yudao/framework/common/core/KeyValue.java b/yudao-framework/yudao-common/src/main/java/cn/iocoder/yudao/framework/common/core/KeyValue.java index 48cf8e7ef..07a8f39b5 100644 --- a/yudao-framework/yudao-common/src/main/java/cn/iocoder/yudao/framework/common/core/KeyValue.java +++ b/yudao-framework/yudao-common/src/main/java/cn/iocoder/yudao/framework/common/core/KeyValue.java @@ -4,6 +4,8 @@ import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor; +import java.io.Serializable; + /** * Key Value 的键值对 * @@ -12,7 +14,7 @@ import lombok.NoArgsConstructor; @Data @NoArgsConstructor @AllArgsConstructor -public class KeyValue { +public class KeyValue implements Serializable { private K key; private V value; diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/pom.xml b/yudao-framework/yudao-spring-boot-starter-biz-tenant/pom.xml index 0fb0a4f71..c1a537613 100644 --- a/yudao-framework/yudao-spring-boot-starter-biz-tenant/pom.xml +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/pom.xml @@ -48,6 +48,22 @@ cn.iocoder.boot yudao-spring-boot-starter-mq + true + + + org.springframework.kafka + spring-kafka + true + + + org.springframework.amqp + spring-rabbit + true + + + org.apache.rocketmq + rocketmq-spring-boot-starter + true diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/config/YudaoTenantAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/config/YudaoTenantAutoConfiguration.java index a632aab7a..c3dd35c91 100644 --- a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/config/YudaoTenantAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/config/YudaoTenantAutoConfiguration.java @@ -6,7 +6,9 @@ import cn.iocoder.yudao.framework.redis.config.YudaoCacheProperties; import cn.iocoder.yudao.framework.tenant.core.aop.TenantIgnoreAspect; import cn.iocoder.yudao.framework.tenant.core.db.TenantDatabaseInterceptor; import cn.iocoder.yudao.framework.tenant.core.job.TenantJobAspect; -import cn.iocoder.yudao.framework.tenant.core.mq.TenantRedisMessageInterceptor; +import cn.iocoder.yudao.framework.tenant.core.mq.rabbitmq.TenantRabbitMQInitializer; +import cn.iocoder.yudao.framework.tenant.core.mq.redis.TenantRedisMessageInterceptor; +import cn.iocoder.yudao.framework.tenant.core.mq.rocketmq.TenantRocketMQInitializer; import cn.iocoder.yudao.framework.tenant.core.redis.TenantRedisCacheManager; import cn.iocoder.yudao.framework.tenant.core.security.TenantSecurityWebFilter; import cn.iocoder.yudao.framework.tenant.core.service.TenantFrameworkService; @@ -18,6 +20,7 @@ import cn.iocoder.yudao.module.system.api.tenant.TenantApi; import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor; import com.baomidou.mybatisplus.extension.plugins.inner.TenantLineInnerInterceptor; import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.boot.context.properties.EnableConfigurationProperties; import org.springframework.boot.web.servlet.FilterRegistrationBean; @@ -92,6 +95,18 @@ public class YudaoTenantAutoConfiguration { return new TenantRedisMessageInterceptor(); } + @Bean + @ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate") + public TenantRabbitMQInitializer tenantRabbitMQInitializer() { + return new TenantRabbitMQInitializer(); + } + + @Bean + @ConditionalOnClass(name = "org.apache.rocketmq.spring.core.RocketMQTemplate") + public TenantRocketMQInitializer tenantRocketMQInitializer() { + return new TenantRocketMQInitializer(); + } + // ========== Job ========== @Bean diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/kafka/TenantKafkaEnvironmentPostProcessor.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/kafka/TenantKafkaEnvironmentPostProcessor.java new file mode 100644 index 000000000..8bf7cc1a8 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/kafka/TenantKafkaEnvironmentPostProcessor.java @@ -0,0 +1,37 @@ +package cn.iocoder.yudao.framework.tenant.core.mq.kafka; + +import cn.hutool.core.util.StrUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.env.EnvironmentPostProcessor; +import org.springframework.core.env.ConfigurableEnvironment; + +/** + * 多租户的 Kafka 的 {@link EnvironmentPostProcessor} 实现类 + * + * Kafka Producer 发送消息时,增加 {@link TenantKafkaProducerInterceptor} 拦截器 + * + * @author 芋道源码 + */ +@Slf4j +public class TenantKafkaEnvironmentPostProcessor implements EnvironmentPostProcessor { + + private static final String PROPERTY_KEY_INTERCEPTOR_CLASSES = "spring.kafka.producer.properties.interceptor.classes"; + + @Override + public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) { + // 添加 TenantKafkaProducerInterceptor 拦截器 + try { + String value = environment.getProperty(PROPERTY_KEY_INTERCEPTOR_CLASSES); + if (StrUtil.isEmpty(value)) { + value = TenantKafkaProducerInterceptor.class.getName(); + } else { + value += "," + TenantKafkaProducerInterceptor.class.getName(); + } + environment.getSystemProperties().put(PROPERTY_KEY_INTERCEPTOR_CLASSES, value); + } catch (NoClassDefFoundError ignore) { + // 如果触发 NoClassDefFoundError 异常,说明 TenantKafkaProducerInterceptor 类不存在,即没引入 kafka-spring 依赖 + } + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/kafka/TenantKafkaProducerInterceptor.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/kafka/TenantKafkaProducerInterceptor.java new file mode 100644 index 000000000..8ded8019a --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/kafka/TenantKafkaProducerInterceptor.java @@ -0,0 +1,47 @@ +package cn.iocoder.yudao.framework.tenant.core.mq.kafka; + +import cn.hutool.core.util.ReflectUtil; +import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; +import org.apache.kafka.clients.producer.ProducerInterceptor; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kafka.common.header.Headers; +import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; + +import java.util.Map; + +import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID; + +/** + * Kafka 消息队列的多租户 {@link ProducerInterceptor} 实现类 + * + * 1. Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中 + * 2. Consumer 消费消息时,将消息的 Header 的租户编号,添加到 {@link TenantContextHolder} 中,通过 {@link InvocableHandlerMethod} 实现 + * + * @author 芋道源码 + */ +public class TenantKafkaProducerInterceptor implements ProducerInterceptor { + + @Override + public ProducerRecord onSend(ProducerRecord record) { + Long tenantId = TenantContextHolder.getTenantId(); + if (tenantId != null) { + Headers headers = (Headers) ReflectUtil.getFieldValue(record, "headers"); // private 属性,没有 get 方法,智能反射 + headers.add(HEADER_TENANT_ID, tenantId.toString().getBytes()); + } + return record; + } + + @Override + public void onAcknowledgement(RecordMetadata metadata, Exception exception) { + } + + @Override + public void close() { + } + + @Override + public void configure(Map configs) { + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rabbitmq/TenantRabbitMQInitializer.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rabbitmq/TenantRabbitMQInitializer.java new file mode 100644 index 000000000..b856ce954 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rabbitmq/TenantRabbitMQInitializer.java @@ -0,0 +1,23 @@ +package cn.iocoder.yudao.framework.tenant.core.mq.rabbitmq; + +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; + +/** + * 多租户的 RabbitMQ 初始化器 + * + * @author 芋道源码 + */ +public class TenantRabbitMQInitializer implements BeanPostProcessor { + + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + if (bean instanceof RabbitTemplate) { + RabbitTemplate rabbitTemplate = (RabbitTemplate) bean; + rabbitTemplate.addBeforePublishPostProcessors(new TenantRabbitMQMessagePostProcessor()); + } + return bean; + } + +} \ No newline at end of file diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rabbitmq/TenantRabbitMQMessagePostProcessor.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rabbitmq/TenantRabbitMQMessagePostProcessor.java new file mode 100644 index 000000000..3e6969cd2 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rabbitmq/TenantRabbitMQMessagePostProcessor.java @@ -0,0 +1,31 @@ +package cn.iocoder.yudao.framework.tenant.core.mq.rabbitmq; + +import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; +import org.apache.kafka.clients.producer.ProducerInterceptor; +import org.springframework.amqp.AmqpException; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessagePostProcessor; +import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; + +import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID; + +/** + * RabbitMQ 消息队列的多租户 {@link ProducerInterceptor} 实现类 + * + * 1. Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中 + * 2. Consumer 消费消息时,将消息的 Header 的租户编号,添加到 {@link TenantContextHolder} 中,通过 {@link InvocableHandlerMethod} 实现 + * + * @author 芋道源码 + */ +public class TenantRabbitMQMessagePostProcessor implements MessagePostProcessor { + + @Override + public Message postProcessMessage(Message message) throws AmqpException { + Long tenantId = TenantContextHolder.getTenantId(); + if (tenantId != null) { + message.getMessageProperties().getHeaders().put(HEADER_TENANT_ID, tenantId); + } + return message; + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantRedisMessageInterceptor.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/redis/TenantRedisMessageInterceptor.java similarity index 86% rename from yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantRedisMessageInterceptor.java rename to yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/redis/TenantRedisMessageInterceptor.java index c493b41d1..f6b7747ff 100644 --- a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/TenantRedisMessageInterceptor.java +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/redis/TenantRedisMessageInterceptor.java @@ -1,8 +1,8 @@ -package cn.iocoder.yudao.framework.tenant.core.mq; +package cn.iocoder.yudao.framework.tenant.core.mq.redis; import cn.hutool.core.util.StrUtil; -import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; -import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; +import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor; +import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage; import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID; diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQConsumeMessageHook.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQConsumeMessageHook.java new file mode 100644 index 000000000..d9d7334e0 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQConsumeMessageHook.java @@ -0,0 +1,46 @@ +package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq; + +import cn.hutool.core.lang.Assert; +import cn.hutool.core.util.StrUtil; +import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; +import org.apache.rocketmq.client.hook.ConsumeMessageContext; +import org.apache.rocketmq.client.hook.ConsumeMessageHook; +import org.apache.rocketmq.common.message.MessageExt; +import org.springframework.messaging.handler.invocation.InvocableHandlerMethod; + +import java.util.List; + +import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID; + +/** + * RocketMQ 消息队列的多租户 {@link ConsumeMessageHook} 实现类 + * + * Consumer 消费消息时,将消息的 Header 的租户编号,添加到 {@link TenantContextHolder} 中,通过 {@link InvocableHandlerMethod} 实现 + * + * @author 芋道源码 + */ +public class TenantRocketMQConsumeMessageHook implements ConsumeMessageHook { + + @Override + public String hookName() { + return getClass().getSimpleName(); + } + + @Override + public void consumeMessageBefore(ConsumeMessageContext context) { + // 校验,消息必须是单条,不然设置租户可能不正确 + List messages = context.getMsgList(); + Assert.isTrue(messages.size() == 1, "消息条数({})不正确", messages.size()); + // 设置租户编号 + String tenantId = messages.get(0).getUserProperty(HEADER_TENANT_ID); + if (StrUtil.isNotEmpty(tenantId)) { + TenantContextHolder.setTenantId(Long.parseLong(tenantId)); + } + } + + @Override + public void consumeMessageAfter(ConsumeMessageContext context) { + TenantContextHolder.clear(); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java new file mode 100644 index 000000000..7f12ac520 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQInitializer.java @@ -0,0 +1,53 @@ +package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq; + +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.rocketmq.client.producer.DefaultMQProducer; +import org.apache.rocketmq.spring.core.RocketMQTemplate; +import org.apache.rocketmq.spring.support.DefaultRocketMQListenerContainer; +import org.springframework.beans.BeansException; +import org.springframework.beans.factory.config.BeanPostProcessor; + +/** + * 多租户的 RocketMQ 初始化器 + * + * @author 芋道源码 + */ +public class TenantRocketMQInitializer implements BeanPostProcessor { + + @Override + public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException { + if (bean instanceof DefaultRocketMQListenerContainer) { + DefaultRocketMQListenerContainer container = (DefaultRocketMQListenerContainer) bean; + initTenantConsumer(container.getConsumer()); + } else if (bean instanceof RocketMQTemplate) { + RocketMQTemplate template = (RocketMQTemplate) bean; + initTenantProducer(template.getProducer()); + } + return bean; + } + + private void initTenantProducer(DefaultMQProducer producer) { + if (producer == null) { + return; + } + DefaultMQProducerImpl producerImpl = producer.getDefaultMQProducerImpl(); + if (producerImpl == null) { + return; + } + producerImpl.registerSendMessageHook(new TenantRocketMQSendMessageHook()); + } + + private void initTenantConsumer(DefaultMQPushConsumer consumer) { + if (consumer == null) { + return; + } + DefaultMQPushConsumerImpl consumerImpl = consumer.getDefaultMQPushConsumerImpl(); + if (consumerImpl == null) { + return; + } + consumerImpl.registerConsumeMessageHook(new TenantRocketMQConsumeMessageHook()); + } + +} \ No newline at end of file diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQSendMessageHook.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQSendMessageHook.java new file mode 100644 index 000000000..4f0307465 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/cn/iocoder/yudao/framework/tenant/core/mq/rocketmq/TenantRocketMQSendMessageHook.java @@ -0,0 +1,36 @@ +package cn.iocoder.yudao.framework.tenant.core.mq.rocketmq; + +import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.hook.SendMessageHook; + +import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID; + +/** + * RocketMQ 消息队列的多租户 {@link SendMessageHook} 实现类 + * + * Producer 发送消息时,将 {@link TenantContextHolder} 租户编号,添加到消息的 Header 中 + * + * @author 芋道源码 + */ +public class TenantRocketMQSendMessageHook implements SendMessageHook { + + @Override + public String hookName() { + return getClass().getSimpleName(); + } + + @Override + public void sendMessageBefore(SendMessageContext sendMessageContext) { + Long tenantId = TenantContextHolder.getTenantId(); + if (tenantId == null) { + return; + } + sendMessageContext.getMessage().putUserProperty(HEADER_TENANT_ID, tenantId.toString()); + } + + @Override + public void sendMessageAfter(SendMessageContext sendMessageContext) { + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/org/springframework/messaging/handler/invocation/InvocableHandlerMethod.java b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/org/springframework/messaging/handler/invocation/InvocableHandlerMethod.java new file mode 100644 index 000000000..059d8f97f --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/java/org/springframework/messaging/handler/invocation/InvocableHandlerMethod.java @@ -0,0 +1,269 @@ +/* + * Copyright 2002-2021 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.messaging.handler.invocation; + +import cn.iocoder.yudao.framework.tenant.core.context.TenantContextHolder; +import cn.iocoder.yudao.framework.tenant.core.util.TenantUtils; +import org.springframework.core.DefaultParameterNameDiscoverer; +import org.springframework.core.MethodParameter; +import org.springframework.core.ParameterNameDiscoverer; +import org.springframework.core.ResolvableType; +import org.springframework.lang.Nullable; +import org.springframework.messaging.Message; +import org.springframework.messaging.handler.HandlerMethod; +import org.springframework.util.ObjectUtils; + +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.lang.reflect.Type; +import java.util.Arrays; + +import static cn.iocoder.yudao.framework.web.core.util.WebFrameworkUtils.HEADER_TENANT_ID; + +/** + * Extension of {@link HandlerMethod} that invokes the underlying method with + * argument values resolved from the current HTTP request through a list of + * {@link HandlerMethodArgumentResolver}. + * + * 针对 rabbitmq-spring 和 kafka-spring,不存在合适的拓展点,可以实现 Consumer 消费前,读取 Header 中的 tenant-id 设置到 {@link TenantContextHolder} 中 + * TODO 芋艿:持续跟进,看看有没新的拓展点 + * + * @author Rossen Stoyanchev + * @author Juergen Hoeller + * @since 4.0 + */ +public class InvocableHandlerMethod extends HandlerMethod { + + private static final Object[] EMPTY_ARGS = new Object[0]; + + private HandlerMethodArgumentResolverComposite resolvers = new HandlerMethodArgumentResolverComposite(); + + private ParameterNameDiscoverer parameterNameDiscoverer = new DefaultParameterNameDiscoverer(); + + /** + * Create an instance from a {@code HandlerMethod}. + */ + public InvocableHandlerMethod(HandlerMethod handlerMethod) { + super(handlerMethod); + } + + /** + * Create an instance from a bean instance and a method. + */ + public InvocableHandlerMethod(Object bean, Method method) { + super(bean, method); + } + + /** + * Construct a new handler method with the given bean instance, method name and parameters. + * @param bean the object bean + * @param methodName the method name + * @param parameterTypes the method parameter types + * @throws NoSuchMethodException when the method cannot be found + */ + public InvocableHandlerMethod(Object bean, String methodName, Class... parameterTypes) + throws NoSuchMethodException { + + super(bean, methodName, parameterTypes); + } + + /** + * Set {@link HandlerMethodArgumentResolver HandlerMethodArgumentResolvers} to use for resolving method argument values. + */ + public void setMessageMethodArgumentResolvers(HandlerMethodArgumentResolverComposite argumentResolvers) { + this.resolvers = argumentResolvers; + } + + /** + * Set the ParameterNameDiscoverer for resolving parameter names when needed + * (e.g. default request attribute name). + *

Default is a {@link DefaultParameterNameDiscoverer}. + */ + public void setParameterNameDiscoverer(ParameterNameDiscoverer parameterNameDiscoverer) { + this.parameterNameDiscoverer = parameterNameDiscoverer; + } + + /** + * Invoke the method after resolving its argument values in the context of the given message. + *

Argument values are commonly resolved through + * {@link HandlerMethodArgumentResolver HandlerMethodArgumentResolvers}. + * The {@code providedArgs} parameter however may supply argument values to be used directly, + * i.e. without argument resolution. + *

Delegates to {@link #getMethodArgumentValues} and calls {@link #doInvoke} with the + * resolved arguments. + * @param message the current message being processed + * @param providedArgs "given" arguments matched by type, not resolved + * @return the raw value returned by the invoked method + * @throws Exception raised if no suitable argument resolver can be found, + * or if the method raised an exception + * @see #getMethodArgumentValues + * @see #doInvoke + */ + @Nullable + public Object invoke(Message message, Object... providedArgs) throws Exception { + Object[] args = getMethodArgumentValues(message, providedArgs); + if (logger.isTraceEnabled()) { + logger.trace("Arguments: " + Arrays.toString(args)); + } + // 注意:如下是本类的改动点!!! + // 情况一:无租户编号的情况 + Long tenantId= parseTenantId(message); + if (tenantId == null) { + return doInvoke(args); + } + // 情况二:有租户的情况下 + return TenantUtils.execute(tenantId, () -> doInvoke(args)); + } + + private Long parseTenantId(Message message) { + Object tenantId = message.getHeaders().get(HEADER_TENANT_ID); + if (tenantId == null) { + return null; + } + if (tenantId instanceof Long) { + return (Long) tenantId; + } + if (tenantId instanceof Number) { + return ((Number) tenantId).longValue(); + } + if (tenantId instanceof String) { + return Long.parseLong((String) tenantId); + } + if (tenantId instanceof byte[]) { + return Long.parseLong(new String((byte[]) tenantId)); + } + throw new IllegalArgumentException("未知的数据类型:" + tenantId); + } + + /** + * Get the method argument values for the current message, checking the provided + * argument values and falling back to the configured argument resolvers. + *

The resulting array will be passed into {@link #doInvoke}. + * @since 5.1.2 + */ + protected Object[] getMethodArgumentValues(Message message, Object... providedArgs) throws Exception { + MethodParameter[] parameters = getMethodParameters(); + if (ObjectUtils.isEmpty(parameters)) { + return EMPTY_ARGS; + } + + Object[] args = new Object[parameters.length]; + for (int i = 0; i < parameters.length; i++) { + MethodParameter parameter = parameters[i]; + parameter.initParameterNameDiscovery(this.parameterNameDiscoverer); + args[i] = findProvidedArgument(parameter, providedArgs); + if (args[i] != null) { + continue; + } + if (!this.resolvers.supportsParameter(parameter)) { + throw new MethodArgumentResolutionException( + message, parameter, formatArgumentError(parameter, "No suitable resolver")); + } + try { + args[i] = this.resolvers.resolveArgument(parameter, message); + } + catch (Exception ex) { + // Leave stack trace for later, exception may actually be resolved and handled... + if (logger.isDebugEnabled()) { + String exMsg = ex.getMessage(); + if (exMsg != null && !exMsg.contains(parameter.getExecutable().toGenericString())) { + logger.debug(formatArgumentError(parameter, exMsg)); + } + } + throw ex; + } + } + return args; + } + + /** + * Invoke the handler method with the given argument values. + */ + @Nullable + protected Object doInvoke(Object... args) throws Exception { + try { + return getBridgedMethod().invoke(getBean(), args); + } + catch (IllegalArgumentException ex) { + assertTargetBean(getBridgedMethod(), getBean(), args); + String text = (ex.getMessage() != null ? ex.getMessage() : "Illegal argument"); + throw new IllegalStateException(formatInvokeError(text, args), ex); + } + catch (InvocationTargetException ex) { + // Unwrap for HandlerExceptionResolvers ... + Throwable targetException = ex.getTargetException(); + if (targetException instanceof RuntimeException) { + throw (RuntimeException) targetException; + } + else if (targetException instanceof Error) { + throw (Error) targetException; + } + else if (targetException instanceof Exception) { + throw (Exception) targetException; + } + else { + throw new IllegalStateException(formatInvokeError("Invocation failure", args), targetException); + } + } + } + + MethodParameter getAsyncReturnValueType(@Nullable Object returnValue) { + return new AsyncResultMethodParameter(returnValue); + } + + private class AsyncResultMethodParameter extends HandlerMethodParameter { + + @Nullable + private final Object returnValue; + + private final ResolvableType returnType; + + public AsyncResultMethodParameter(@Nullable Object returnValue) { + super(-1); + this.returnValue = returnValue; + this.returnType = ResolvableType.forType(super.getGenericParameterType()).getGeneric(); + } + + protected AsyncResultMethodParameter(AsyncResultMethodParameter original) { + super(original); + this.returnValue = original.returnValue; + this.returnType = original.returnType; + } + + @Override + public Class getParameterType() { + if (this.returnValue != null) { + return this.returnValue.getClass(); + } + if (!ResolvableType.NONE.equals(this.returnType)) { + return this.returnType.toClass(); + } + return super.getParameterType(); + } + + @Override + public Type getGenericParameterType() { + return this.returnType.getType(); + } + + @Override + public AsyncResultMethodParameter clone() { + return new AsyncResultMethodParameter(this); + } + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/resources/META-INF/spring.factories b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/resources/META-INF/spring.factories new file mode 100644 index 000000000..a495842a0 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-biz-tenant/src/main/resources/META-INF/spring.factories @@ -0,0 +1,2 @@ +org.springframework.boot.env.EnvironmentPostProcessor=\ + cn.iocoder.yudao.framework.tenant.core.mq.kafka.TenantKafkaEnvironmentPostProcessor diff --git a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml index 75303d4e3..c8972f16b 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/pom.xml +++ b/yudao-framework/yudao-spring-boot-starter-mq/pom.xml @@ -12,7 +12,7 @@ jar ${project.artifactId} - 消息队列,基于 Redis Pub/Sub 实现广播消费,基于 Stream 实现集群消费 + 消息队列,支持 Redis、RocketMQ、RabbitMQ、Kafka 四种 https://github.com/YunaiV/ruoyi-vue-pro @@ -21,6 +21,23 @@ cn.iocoder.boot yudao-spring-boot-starter-redis + + + + org.springframework.kafka + spring-kafka + true + + + org.springframework.amqp + spring-rabbit + true + + + org.apache.rocketmq + rocketmq-spring-boot-starter + true + - + \ No newline at end of file diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java deleted file mode 100644 index fbc2a2826..000000000 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessage.java +++ /dev/null @@ -1,21 +0,0 @@ -package cn.iocoder.yudao.framework.mq.core.pubsub; - -import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; -import com.fasterxml.jackson.annotation.JsonIgnore; - -/** - * Redis Channel Message 抽象类 - * - * @author 芋道源码 - */ -public abstract class AbstractChannelMessage extends AbstractRedisMessage { - - /** - * 获得 Redis Channel - * - * @return Channel - */ - @JsonIgnore // 避免序列化。原因是,Redis 发布 Channel 消息的时候,已经会指定。 - public abstract String getChannel(); - -} diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java deleted file mode 100644 index 29ea833f3..000000000 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/stream/AbstractStreamMessage.java +++ /dev/null @@ -1,21 +0,0 @@ -package cn.iocoder.yudao.framework.mq.core.stream; - -import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; -import com.fasterxml.jackson.annotation.JsonIgnore; - -/** - * Redis Stream Message 抽象类 - * - * @author 芋道源码 - */ -public abstract class AbstractStreamMessage extends AbstractRedisMessage { - - /** - * 获得 Redis Stream Key - * - * @return Channel - */ - @JsonIgnore // 避免序列化 - public abstract String getStreamKey(); - -} diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java index 48eaf2386..3b716cb77 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/package-info.java @@ -1,6 +1,4 @@ /** - * 消息队列,基于 Redis 提供: - * 1. 基于 Pub/Sub 实现广播消费 - * 2. 基于 Stream 实现集群消费 + * 消息队列,支持 Redis、RocketMQ、RabbitMQ、Kafka 四种 */ package cn.iocoder.yudao.framework.mq; diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/config/YudaoRabbitMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/config/YudaoRabbitMQAutoConfiguration.java new file mode 100644 index 000000000..770c50ff7 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/config/YudaoRabbitMQAutoConfiguration.java @@ -0,0 +1,29 @@ +package cn.iocoder.yudao.framework.mq.rabbitmq.config; + +import cn.hutool.core.util.ReflectUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.utils.SerializationUtils; +import org.springframework.boot.autoconfigure.AutoConfiguration; +import org.springframework.boot.autoconfigure.condition.ConditionalOnClass; + +import java.lang.reflect.Field; + +/** + * RabbitMQ 消息队列配置类 + * + * @author 芋道源码 + */ +@AutoConfiguration +@Slf4j +@ConditionalOnClass(name = "org.springframework.amqp.rabbit.core.RabbitTemplate") +public class YudaoRabbitMQAutoConfiguration { + + static { + // 强制设置 SerializationUtils 的 TRUST_ALL 为 true,避免 RabbitMQ Consumer 反序列化消息报错 + // 为什么不通过设置 spring.amqp.deserialization.trust.all 呢?因为可能在 SerializationUtils static 初始化后 + Field trustAllField = ReflectUtil.getField(SerializationUtils.class, "TRUST_ALL"); + ReflectUtil.removeFinalModify(trustAllField); + ReflectUtil.setFieldValue(SerializationUtils.class, trustAllField, true); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/core/package-info.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/core/package-info.java new file mode 100644 index 000000000..2773b5828 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/core/package-info.java @@ -0,0 +1,4 @@ +/** + * 占位符,无特殊逻辑 + */ +package cn.iocoder.yudao.framework.mq.rabbitmq.core; \ No newline at end of file diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/package-info.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/package-info.java new file mode 100644 index 000000000..9f6032c92 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/rabbitmq/package-info.java @@ -0,0 +1,4 @@ +/** + * 消息队列,基于 RabbitMQ 提供 + */ +package cn.iocoder.yudao.framework.mq.rabbitmq; diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java similarity index 77% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java index e300b1ad5..bbc63b719 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/config/YudaoMQAutoConfiguration.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/config/YudaoRedisMQAutoConfiguration.java @@ -1,21 +1,20 @@ -package cn.iocoder.yudao.framework.mq.config; +package cn.iocoder.yudao.framework.mq.redis.config; import cn.hutool.core.map.MapUtil; import cn.hutool.core.util.StrUtil; import cn.hutool.system.SystemUtil; import cn.iocoder.yudao.framework.common.enums.DocumentEnum; -import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; -import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; -import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; -import cn.iocoder.yudao.framework.mq.job.RedisPendingMessageResendJob; +import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor; +import cn.iocoder.yudao.framework.mq.redis.core.job.RedisPendingMessageResendJob; +import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessageListener; +import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener; import cn.iocoder.yudao.framework.redis.config.YudaoRedisAutoConfiguration; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RedissonClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.boot.autoconfigure.AutoConfiguration; import org.springframework.boot.autoconfigure.condition.ConditionalOnBean; -import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.data.redis.connection.RedisServerCommands; import org.springframework.data.redis.connection.stream.Consumer; @@ -27,7 +26,6 @@ import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer; -import org.springframework.data.redis.stream.DefaultStreamMessageListenerContainerX; import org.springframework.data.redis.stream.StreamMessageListenerContainer; import org.springframework.scheduling.annotation.EnableScheduling; @@ -42,7 +40,7 @@ import java.util.Properties; @Slf4j @EnableScheduling // 启用定时任务,用于 RedisPendingMessageResendJob 重发消息 @AutoConfiguration(after = YudaoRedisAutoConfiguration.class) -public class YudaoMQAutoConfiguration { +public class YudaoRedisMQAutoConfiguration { @Bean public RedisMQTemplate redisMQTemplate(StringRedisTemplate redisTemplate, @@ -59,10 +57,9 @@ public class YudaoMQAutoConfiguration { * 创建 Redis Pub/Sub 广播消费的容器 */ @Bean(initMethod = "start", destroyMethod = "stop") - @ConditionalOnBean(AbstractChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听 - @ConditionalOnProperty(prefix = "yudao.mq.redis.pubsub", value = "enable", matchIfMissing = true) // 允许使用 yudao.mq.redis.pubsub.enable=false 禁用多租户 + @ConditionalOnBean(AbstractRedisChannelMessageListener.class) // 只有 AbstractChannelMessageListener 存在的时候,才需要注册 Redis pubsub 监听 public RedisMessageListenerContainer redisMessageListenerContainer( - RedisMQTemplate redisMQTemplate, List> listeners) { + RedisMQTemplate redisMQTemplate, List> listeners) { // 创建 RedisMessageListenerContainer 对象 RedisMessageListenerContainer container = new RedisMessageListenerContainer(); // 设置 RedisConnection 工厂。 @@ -81,9 +78,8 @@ public class YudaoMQAutoConfiguration { * 创建 Redis Stream 重新消费的任务 */ @Bean - @ConditionalOnBean(AbstractStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听 - @ConditionalOnProperty(prefix = "yudao.mq.redis.stream", value = "enable", matchIfMissing = true) // 允许使用 yudao.mq.redis.stream.enable=false 禁用多租户 - public RedisPendingMessageResendJob redisPendingMessageResendJob(List> listeners, + @ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听 + public RedisPendingMessageResendJob redisPendingMessageResendJob(List> listeners, RedisMQTemplate redisTemplate, @Value("${spring.application.name}") String groupName, RedissonClient redissonClient) { @@ -92,14 +88,13 @@ public class YudaoMQAutoConfiguration { /** * 创建 Redis Stream 集群消费的容器 - *

- * Redis Stream 的 xreadgroup 命令:https://www.geek-book.com/src/docs/redis/redis/redis.io/commands/xreadgroup.html + * + * 基础知识:Redis Stream 的 xreadgroup 命令 */ @Bean(initMethod = "start", destroyMethod = "stop") - @ConditionalOnBean(AbstractStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听 - @ConditionalOnProperty(prefix = "yudao.mq.redis.stream", value = "enable", matchIfMissing = true) // 允许使用 yudao.mq.redis.stream.enable=false 禁用多租户 + @ConditionalOnBean(AbstractRedisStreamMessageListener.class) // 只有 AbstractStreamMessageListener 存在的时候,才需要注册 Redis pubsub 监听 public StreamMessageListenerContainer> redisStreamMessageListenerContainer( - RedisMQTemplate redisMQTemplate, List> listeners) { + RedisMQTemplate redisMQTemplate, List> listeners) { RedisTemplate redisTemplate = redisMQTemplate.getRedisTemplate(); checkRedisVersion(redisTemplate); // 第一步,创建 StreamMessageListenerContainer 容器 @@ -111,8 +106,7 @@ public class YudaoMQAutoConfiguration { .build(); // 创建 container 对象 StreamMessageListenerContainer> container = -// StreamMessageListenerContainer.create(redisTemplate.getRequiredConnectionFactory(), containerOptions); - DefaultStreamMessageListenerContainerX.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions); + StreamMessageListenerContainer.create(redisMQTemplate.getRedisTemplate().getRequiredConnectionFactory(), containerOptions); // 第二步,注册监听器,消费对应的 Stream 主题 String consumerName = buildConsumerName(); diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/RedisMQTemplate.java similarity index 80% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/RedisMQTemplate.java index 8a31feda7..5755ffa51 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/RedisMQTemplate.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/RedisMQTemplate.java @@ -1,10 +1,10 @@ -package cn.iocoder.yudao.framework.mq.core; +package cn.iocoder.yudao.framework.mq.redis.core; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; -import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; -import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage; +import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor; +import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage; +import cn.iocoder.yudao.framework.mq.redis.core.pubsub.AbstractRedisChannelMessage; +import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessage; import lombok.AllArgsConstructor; import lombok.Getter; import org.springframework.data.redis.connection.stream.RecordId; @@ -35,7 +35,7 @@ public class RedisMQTemplate { * * @param message 消息 */ - public void send(T message) { + public void send(T message) { try { sendMessageBefore(message); // 发送消息 @@ -51,7 +51,7 @@ public class RedisMQTemplate { * @param message 消息 * @return 消息记录的编号对象 */ - public RecordId send(T message) { + public RecordId send(T message) { try { sendMessageBefore(message); // 发送消息 diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/interceptor/RedisMessageInterceptor.java similarity index 79% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/interceptor/RedisMessageInterceptor.java index 11d8e1337..dbcee7fe2 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/interceptor/RedisMessageInterceptor.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/interceptor/RedisMessageInterceptor.java @@ -1,6 +1,6 @@ -package cn.iocoder.yudao.framework.mq.core.interceptor; +package cn.iocoder.yudao.framework.mq.redis.core.interceptor; -import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; +import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage; /** * {@link AbstractRedisMessage} 消息拦截器 diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java similarity index 93% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java index ea0f53d19..b84f17c15 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/job/RedisPendingMessageResendJob.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/job/RedisPendingMessageResendJob.java @@ -1,8 +1,8 @@ -package cn.iocoder.yudao.framework.mq.job; +package cn.iocoder.yudao.framework.mq.redis.core.job; import cn.hutool.core.collection.CollUtil; -import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; -import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; +import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.mq.redis.core.stream.AbstractRedisStreamMessageListener; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.redisson.api.RLock; @@ -33,7 +33,7 @@ public class RedisPendingMessageResendJob { */ private static final int EXPIRE_TIME = 5 * 60; - private final List> listeners; + private final List> listeners; private final RedisMQTemplate redisTemplate; private final String groupName; private final RedissonClient redissonClient; diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/message/AbstractRedisMessage.java similarity index 88% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/message/AbstractRedisMessage.java index f02e89d6f..ee40814dd 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/message/AbstractRedisMessage.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/message/AbstractRedisMessage.java @@ -1,4 +1,4 @@ -package cn.iocoder.yudao.framework.mq.core.message; +package cn.iocoder.yudao.framework.mq.redis.core.message; import lombok.Data; diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessage.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessage.java new file mode 100644 index 000000000..d5ea5b9d5 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessage.java @@ -0,0 +1,23 @@ +package cn.iocoder.yudao.framework.mq.redis.core.pubsub; + +import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage; +import com.fasterxml.jackson.annotation.JsonIgnore; + +/** + * Redis Channel Message 抽象类 + * + * @author 芋道源码 + */ +public abstract class AbstractRedisChannelMessage extends AbstractRedisMessage { + + /** + * 获得 Redis Channel,默认使用类名 + * + * @return Channel + */ + @JsonIgnore // 避免序列化。原因是,Redis 发布 Channel 消息的时候,已经会指定。 + public String getChannel() { + return getClass().getSimpleName(); + } + +} diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessageListener.java similarity index 85% rename from yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java rename to yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessageListener.java index e7d737d1b..fd7c910c9 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/core/pubsub/AbstractChannelMessageListener.java +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/java/cn/iocoder/yudao/framework/mq/redis/core/pubsub/AbstractRedisChannelMessageListener.java @@ -1,10 +1,10 @@ -package cn.iocoder.yudao.framework.mq.core.pubsub; +package cn.iocoder.yudao.framework.mq.redis.core.pubsub; import cn.hutool.core.util.TypeUtil; import cn.iocoder.yudao.framework.common.util.json.JsonUtils; -import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; -import cn.iocoder.yudao.framework.mq.core.interceptor.RedisMessageInterceptor; -import cn.iocoder.yudao.framework.mq.core.message.AbstractRedisMessage; +import cn.iocoder.yudao.framework.mq.redis.core.RedisMQTemplate; +import cn.iocoder.yudao.framework.mq.redis.core.interceptor.RedisMessageInterceptor; +import cn.iocoder.yudao.framework.mq.redis.core.message.AbstractRedisMessage; import lombok.Setter; import lombok.SneakyThrows; import org.springframework.data.redis.connection.Message; @@ -20,7 +20,7 @@ import java.util.List; * * @author 芋道源码 */ -public abstract class AbstractChannelMessageListener implements MessageListener { +public abstract class AbstractRedisChannelMessageListener implements MessageListener { /** * 消息类型 @@ -37,7 +37,7 @@ public abstract class AbstractChannelMessageListener +public abstract class AbstractRedisStreamMessageListener implements StreamListener> { /** @@ -48,7 +48,7 @@ public abstract class AbstractStreamMessageListener> extends DefaultStreamMessageListenerContainer { - - /** - * 参考 {@link StreamMessageListenerContainer#create(RedisConnectionFactory, StreamMessageListenerContainerOptions)} 的实现 - */ - public static > StreamMessageListenerContainer create(RedisConnectionFactory connectionFactory, StreamMessageListenerContainer.StreamMessageListenerContainerOptions options) { - Assert.notNull(connectionFactory, "RedisConnectionFactory must not be null!"); - Assert.notNull(options, "StreamMessageListenerContainerOptions must not be null!"); - return new DefaultStreamMessageListenerContainerX<>(connectionFactory, options); - } - - public DefaultStreamMessageListenerContainerX(RedisConnectionFactory connectionFactory, StreamMessageListenerContainerOptions containerOptions) { - super(connectionFactory, containerOptions); - } - - /** - * 参考 {@link DefaultStreamMessageListenerContainer#register(StreamReadRequest, StreamListener)} 的实现 - */ - @Override - public Subscription register(StreamReadRequest streamRequest, StreamListener listener) { - return this.doRegisterX(getReadTaskX(streamRequest, listener)); - } - - @SuppressWarnings("unchecked") - private StreamPollTask getReadTaskX(StreamReadRequest streamRequest, StreamListener listener) { - StreamPollTask task = ReflectUtil.invoke(this, "getReadTask", streamRequest, listener); - // 修改 readFunction 方法 - Function> readFunction = (Function>) ReflectUtil.getFieldValue(task, "readFunction"); - ReflectUtil.setFieldValue(task, "readFunction", (Function>) readOffset -> { - List records = readFunction.apply(readOffset); - //【重点】保证 records 不是空,避免 NPE 的问题!!! - return records != null ? records : Collections.emptyList(); - }); - return task; - } - - private Subscription doRegisterX(Task task) { - return ReflectUtil.invoke(this, "doRegister", task); - } - -} - diff --git a/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports b/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports index c47aa4d7b..660865453 100644 --- a/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports +++ b/yudao-framework/yudao-spring-boot-starter-mq/src/main/resources/META-INF/spring/org.springframework.boot.autoconfigure.AutoConfiguration.imports @@ -1 +1,2 @@ -cn.iocoder.yudao.framework.mq.config.YudaoMQAutoConfiguration \ No newline at end of file +cn.iocoder.yudao.framework.mq.redis.config.YudaoRedisMQAutoConfiguration +cn.iocoder.yudao.framework.mq.rabbitmq.config.YudaoRabbitMQAutoConfiguration \ No newline at end of file diff --git a/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 事件机制 Event 入门》.md b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 事件机制 Event 入门》.md new file mode 100644 index 000000000..08586b379 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 事件机制 Event 入门》.md @@ -0,0 +1 @@ + diff --git a/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 Kafka 入门》.md b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 Kafka 入门》.md new file mode 100644 index 000000000..b66d6334c --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 Kafka 入门》.md @@ -0,0 +1 @@ + diff --git a/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RabbitMQ 入门》.md b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RabbitMQ 入门》.md new file mode 100644 index 000000000..eff46e2f7 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RabbitMQ 入门》.md @@ -0,0 +1 @@ + diff --git a/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RocketMQ 入门》.md b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RocketMQ 入门》.md new file mode 100644 index 000000000..08586b379 --- /dev/null +++ b/yudao-framework/yudao-spring-boot-starter-mq/《芋道 Spring Boot 消息队列 RocketMQ 入门》.md @@ -0,0 +1 @@ + diff --git a/yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/consumer/coupon/CouponTakeByRegisterConsumer.java b/yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/consumer/coupon/CouponTakeByRegisterConsumer.java new file mode 100644 index 000000000..4d7a092c4 --- /dev/null +++ b/yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/consumer/coupon/CouponTakeByRegisterConsumer.java @@ -0,0 +1,31 @@ +package cn.iocoder.yudao.module.promotion.mq.consumer.coupon; + +import cn.iocoder.yudao.module.member.message.user.MemberUserCreateMessage; +import cn.iocoder.yudao.module.promotion.service.coupon.CouponService; +import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; +import org.springframework.stereotype.Component; + +import javax.annotation.Resource; + +/** + * 用户注册时,发送优惠劵的消费者,基 {@link MemberUserCreateMessage} 消息 + * + * @author owen + */ +@Component +@Slf4j +public class CouponTakeByRegisterConsumer { + + @Resource + private CouponService couponService; + + @EventListener + @Async // Spring Event 默认在 Producer 发送的线程,通过 @Async 实现异步 + public void onMessage(MemberUserCreateMessage message) { + log.info("[onMessage][消息内容({})]", message); + couponService.takeCouponByRegister(message.getUserId()); + } + +} diff --git a/yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/consumer/coupon/UserCreateConsumer.java b/yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/consumer/coupon/UserCreateConsumer.java deleted file mode 100644 index 5876a4daf..000000000 --- a/yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/consumer/coupon/UserCreateConsumer.java +++ /dev/null @@ -1,29 +0,0 @@ -package cn.iocoder.yudao.module.promotion.mq.consumer.coupon; - -import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; -import cn.iocoder.yudao.module.promotion.mq.message.coupon.UserCreateMessage; -import cn.iocoder.yudao.module.promotion.service.coupon.CouponService; -import lombok.extern.slf4j.Slf4j; -import org.springframework.stereotype.Component; - -import javax.annotation.Resource; - -/** - * 针对 {@link UserCreateMessage} 的消费者 - * - * @author owen - */ -@Component -@Slf4j -public class UserCreateConsumer extends AbstractStreamMessageListener { - - @Resource - private CouponService couponService; - - @Override - public void onMessage(UserCreateMessage message) { - log.info("[onMessage][消息内容({})]", message); - couponService.takeCouponByRegister(message.getUserId()); - } - -} diff --git a/yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/consumer/package-info.java b/yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/consumer/package-info.java new file mode 100644 index 000000000..95c23df74 --- /dev/null +++ b/yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/consumer/package-info.java @@ -0,0 +1,4 @@ +/** + * 消息队列的消费者 + */ +package cn.iocoder.yudao.module.promotion.mq.consumer; diff --git a/yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/message/coupon/UserCreateMessage.java b/yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/message/coupon/UserCreateMessage.java deleted file mode 100644 index 370bdf71a..000000000 --- a/yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/message/coupon/UserCreateMessage.java +++ /dev/null @@ -1,29 +0,0 @@ -package cn.iocoder.yudao.module.promotion.mq.message.coupon; - -import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage; -import lombok.Data; -import lombok.EqualsAndHashCode; - -import javax.validation.constraints.NotNull; - -/** - * 会员用户创建消息 - * - * @author owen - */ -@Data -@EqualsAndHashCode(callSuper = true) -public class UserCreateMessage extends AbstractStreamMessage { - - /** - * 用户编号 - */ - @NotNull(message = "用户编号不能为空") - private Long userId; - - @Override - public String getStreamKey() { - return "member.user.create"; - } - -} diff --git a/yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/message/package-info.java b/yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/message/package-info.java new file mode 100644 index 000000000..912504e76 --- /dev/null +++ b/yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/message/package-info.java @@ -0,0 +1,4 @@ +/** + * 消息队列的消息 + */ +package cn.iocoder.yudao.module.promotion.mq.message; diff --git a/yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/producer/package-info.java b/yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/producer/package-info.java new file mode 100644 index 000000000..e7b8d1b4c --- /dev/null +++ b/yudao-module-mall/yudao-module-promotion-biz/src/main/java/cn/iocoder/yudao/module/promotion/mq/producer/package-info.java @@ -0,0 +1,4 @@ +/** + * 消息队列的生产者 + */ +package cn.iocoder.yudao.module.promotion.mq.producer; diff --git a/yudao-module-member/yudao-module-member-api/src/main/java/cn/iocoder/yudao/module/member/message/package-info.java b/yudao-module-member/yudao-module-member-api/src/main/java/cn/iocoder/yudao/module/member/message/package-info.java new file mode 100644 index 000000000..6ae3b6448 --- /dev/null +++ b/yudao-module-member/yudao-module-member-api/src/main/java/cn/iocoder/yudao/module/member/message/package-info.java @@ -0,0 +1,4 @@ +/** + * 消息队列的消息 + */ +package cn.iocoder.yudao.module.member.message; diff --git a/yudao-module-member/yudao-module-member-api/src/main/java/cn/iocoder/yudao/module/member/message/user/MemberUserCreateMessage.java b/yudao-module-member/yudao-module-member-api/src/main/java/cn/iocoder/yudao/module/member/message/user/MemberUserCreateMessage.java new file mode 100644 index 000000000..cfb24eb72 --- /dev/null +++ b/yudao-module-member/yudao-module-member-api/src/main/java/cn/iocoder/yudao/module/member/message/user/MemberUserCreateMessage.java @@ -0,0 +1,21 @@ +package cn.iocoder.yudao.module.member.message.user; + +import lombok.Data; + +import javax.validation.constraints.NotNull; + +/** + * 会员用户创建消息 + * + * @author owen + */ +@Data +public class MemberUserCreateMessage { + + /** + * 用户编号 + */ + @NotNull(message = "用户编号不能为空") + private Long userId; + +} diff --git a/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/controller/app/auth/AppAuthController.http b/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/controller/app/auth/AppAuthController.http index 998c70c21..5b68d6984 100644 --- a/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/controller/app/auth/AppAuthController.http +++ b/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/controller/app/auth/AppAuthController.http @@ -4,7 +4,7 @@ Content-Type: application/json tenant-id: {{appTenentId}} { - "mobile": "15601691300", + "mobile": "15601691388", "password": "admin123" } @@ -14,7 +14,7 @@ Content-Type: application/json tenant-id: {{appTenentId}} { - "mobile": "15601691399", + "mobile": "15601691388", "scene": 1 } @@ -22,9 +22,10 @@ tenant-id: {{appTenentId}} POST {{appApi}}/member/auth/sms-login Content-Type: application/json tenant-id: {{appTenentId}} +terminal: 30 { - "mobile": "15601691301", + "mobile": "15601691388", "code": 9999 } diff --git a/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/consumer/package-info.java b/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/consumer/package-info.java new file mode 100644 index 000000000..521f60b8c --- /dev/null +++ b/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/consumer/package-info.java @@ -0,0 +1,4 @@ +/** + * 消息队列的消费者 + */ +package cn.iocoder.yudao.module.member.mq.consumer; diff --git a/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/message/package-info.java b/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/message/package-info.java new file mode 100644 index 000000000..6489394a1 --- /dev/null +++ b/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/message/package-info.java @@ -0,0 +1,4 @@ +/** + * 消息队列的消息 + */ +package cn.iocoder.yudao.module.member.mq.message; diff --git a/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/message/user/MemberUserCreateMessage.java b/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/message/user/MemberUserCreateMessage.java deleted file mode 100644 index cd411cc74..000000000 --- a/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/message/user/MemberUserCreateMessage.java +++ /dev/null @@ -1,29 +0,0 @@ -package cn.iocoder.yudao.module.member.mq.message.user; - -import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage; -import lombok.Data; -import lombok.EqualsAndHashCode; - -import javax.validation.constraints.NotNull; - -/** - * 会员用户创建消息 - * - * @author owen - */ -@Data -@EqualsAndHashCode(callSuper = true) -public class MemberUserCreateMessage extends AbstractStreamMessage { - - /** - * 用户编号 - */ - @NotNull(message = "用户编号不能为空") - private Long userId; - - @Override - public String getStreamKey() { - return "member.user.create"; - } - -} diff --git a/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/producer/package-info.java b/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/producer/package-info.java new file mode 100644 index 000000000..dff4c99a9 --- /dev/null +++ b/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/producer/package-info.java @@ -0,0 +1,4 @@ +/** + * 消息队列的生产者 + */ +package cn.iocoder.yudao.module.member.mq.producer; diff --git a/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/producer/user/MemberUserProducer.java b/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/producer/user/MemberUserProducer.java index 4a88f419f..8687bb071 100644 --- a/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/producer/user/MemberUserProducer.java +++ b/yudao-module-member/yudao-module-member-biz/src/main/java/cn/iocoder/yudao/module/member/mq/producer/user/MemberUserProducer.java @@ -1,8 +1,8 @@ package cn.iocoder.yudao.module.member.mq.producer.user; -import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; -import cn.iocoder.yudao.module.member.mq.message.user.MemberUserCreateMessage; +import cn.iocoder.yudao.module.member.message.user.MemberUserCreateMessage; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -17,7 +17,7 @@ import javax.annotation.Resource; public class MemberUserProducer { @Resource - private RedisMQTemplate redisMQTemplate; + private ApplicationContext applicationContext; /** * 发送 {@link MemberUserCreateMessage} 消息 @@ -25,7 +25,7 @@ public class MemberUserProducer { * @param userId 用户编号 */ public void sendUserCreateMessage(Long userId) { - redisMQTemplate.send(new MemberUserCreateMessage().setUserId(userId)); + applicationContext.publishEvent(new MemberUserCreateMessage().setUserId(userId)); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/sms/SmsTemplateController.http b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/sms/SmsTemplateController.http index e8213e5cd..ee24e928b 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/sms/SmsTemplateController.http +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/controller/admin/sms/SmsTemplateController.http @@ -6,9 +6,9 @@ tenant-id: {{adminTenentId}} { "templateCode": "test_01", - "mobile": "156016913900", - "params": { - "key01": "value01", - "key02": "value02" + "mobile": "15601691390", + "templateParams": { + "operation": "value01", + "code": "value02" } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/mail/MailSendConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/mail/MailSendConsumer.java index 3cff145b8..7e7b487fb 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/mail/MailSendConsumer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/mail/MailSendConsumer.java @@ -1,10 +1,10 @@ package cn.iocoder.yudao.module.system.mq.consumer.mail; -import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; import cn.iocoder.yudao.module.system.mq.message.mail.MailSendMessage; -import cn.iocoder.yudao.module.system.mq.message.sms.SmsSendMessage; import cn.iocoder.yudao.module.system.service.mail.MailSendService; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -16,12 +16,13 @@ import javax.annotation.Resource; */ @Component @Slf4j -public class MailSendConsumer extends AbstractStreamMessageListener { +public class MailSendConsumer { @Resource private MailSendService mailSendService; - @Override + @EventListener + @Async // Spring Event 默认在 Producer 发送的线程,通过 @Async 实现异步 public void onMessage(MailSendMessage message) { log.info("[onMessage][消息内容({})]", message); mailSendService.doSendMail(message); diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/sms/SmsSendConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/sms/SmsSendConsumer.java index 3b4ff216b..39753b66e 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/sms/SmsSendConsumer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/sms/SmsSendConsumer.java @@ -2,8 +2,9 @@ package cn.iocoder.yudao.module.system.mq.consumer.sms; import cn.iocoder.yudao.module.system.mq.message.sms.SmsSendMessage; import cn.iocoder.yudao.module.system.service.sms.SmsSendService; -import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessageListener; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.event.EventListener; +import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -15,12 +16,13 @@ import javax.annotation.Resource; */ @Component @Slf4j -public class SmsSendConsumer extends AbstractStreamMessageListener { +public class SmsSendConsumer { @Resource private SmsSendService smsSendService; - @Override + @EventListener + @Async // Spring Event 默认在 Producer 发送的线程,通过 @Async 实现异步 public void onMessage(SmsSendMessage message) { log.info("[onMessage][消息内容({})]", message); smsSendService.doSendSms(message); diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/mail/MailSendMessage.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/mail/MailSendMessage.java index 0adafa409..943d69b09 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/mail/MailSendMessage.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/mail/MailSendMessage.java @@ -1,8 +1,6 @@ package cn.iocoder.yudao.module.system.mq.message.mail; -import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage; import lombok.Data; -import lombok.EqualsAndHashCode; import javax.validation.constraints.NotEmpty; import javax.validation.constraints.NotNull; @@ -13,8 +11,7 @@ import javax.validation.constraints.NotNull; * @author 芋道源码 */ @Data -@EqualsAndHashCode(callSuper = true) -public class MailSendMessage extends AbstractStreamMessage { +public class MailSendMessage { /** * 邮件日志编号 @@ -47,9 +44,4 @@ public class MailSendMessage extends AbstractStreamMessage { @NotEmpty(message = "邮件内容不能为空") private String content; - @Override - public String getStreamKey() { - return "system.mail.send"; - } - } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/sms/SmsSendMessage.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/sms/SmsSendMessage.java index 42a32623a..e4baefcec 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/sms/SmsSendMessage.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/sms/SmsSendMessage.java @@ -1,9 +1,7 @@ package cn.iocoder.yudao.module.system.mq.message.sms; import cn.iocoder.yudao.framework.common.core.KeyValue; -import cn.iocoder.yudao.framework.mq.core.stream.AbstractStreamMessage; import lombok.Data; -import lombok.EqualsAndHashCode; import javax.validation.constraints.NotNull; import java.util.List; @@ -14,8 +12,7 @@ import java.util.List; * @author 芋道源码 */ @Data -@EqualsAndHashCode(callSuper = true) -public class SmsSendMessage extends AbstractStreamMessage { +public class SmsSendMessage { /** * 短信日志编号 @@ -42,9 +39,4 @@ public class SmsSendMessage extends AbstractStreamMessage { */ private List> templateParams; - @Override - public String getStreamKey() { - return "system.sms.send"; - } - } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/mail/MailProducer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/mail/MailProducer.java index 51016e240..5894332cb 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/mail/MailProducer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/mail/MailProducer.java @@ -1,8 +1,8 @@ package cn.iocoder.yudao.module.system.mq.producer.mail; -import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.module.system.mq.message.mail.MailSendMessage; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -18,7 +18,7 @@ import javax.annotation.Resource; public class MailProducer { @Resource - private RedisMQTemplate redisMQTemplate; + private ApplicationContext applicationContext; /** * 发送 {@link MailSendMessage} 消息 @@ -35,7 +35,7 @@ public class MailProducer { MailSendMessage message = new MailSendMessage() .setLogId(sendLogId).setMail(mail).setAccountId(accountId) .setNickname(nickname).setTitle(title).setContent(content); - redisMQTemplate.send(message); + applicationContext.publishEvent(message); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/sms/SmsProducer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/sms/SmsProducer.java index 32bbde369..379f2e229 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/sms/SmsProducer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/sms/SmsProducer.java @@ -1,9 +1,9 @@ package cn.iocoder.yudao.module.system.mq.producer.sms; import cn.iocoder.yudao.framework.common.core.KeyValue; -import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.module.system.mq.message.sms.SmsSendMessage; import lombok.extern.slf4j.Slf4j; +import org.springframework.context.ApplicationContext; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -20,7 +20,7 @@ import java.util.List; public class SmsProducer { @Resource - private RedisMQTemplate redisMQTemplate; + private ApplicationContext applicationContext; /** * 发送 {@link SmsSendMessage} 消息 @@ -35,7 +35,7 @@ public class SmsProducer { Long channelId, String apiTemplateId, List> templateParams) { SmsSendMessage message = new SmsSendMessage().setLogId(logId).setMobile(mobile); message.setChannelId(channelId).setApiTemplateId(apiTemplateId).setTemplateParams(templateParams); - redisMQTemplate.send(message); + applicationContext.publishEvent(message); } } diff --git a/yudao-server/src/main/resources/application-dev.yaml b/yudao-server/src/main/resources/application-dev.yaml index 2e9db5599..13c6f92ab 100644 --- a/yudao-server/src/main/resources/application-dev.yaml +++ b/yudao-server/src/main/resources/application-dev.yaml @@ -94,6 +94,23 @@ spring: jdbc: # 使用 JDBC 的 JobStore 的时候,JDBC 的配置 initialize-schema: NEVER # 是否自动使用 SQL 初始化 Quartz 表结构。这里设置成 never ,我们手动创建表结构。 +--- #################### 消息队列相关 #################### + +# rocketmq 配置项,对应 RocketMQProperties 配置类 +rocketmq: + name-server: 127.0.0.1:9876 # RocketMQ Namesrv + +spring: + # RabbitMQ 配置项,对应 RabbitProperties 配置类 + rabbitmq: + host: 127.0.0.1 # RabbitMQ 服务的地址 + port: 5672 # RabbitMQ 服务的端口 + username: guest # RabbitMQ 服务的账号 + password: guest # RabbitMQ 服务的密码 + # Kafka 配置项,对应 KafkaProperties 配置类 + kafka: + bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔 + --- #################### 服务保障相关配置 #################### # Lock4j 配置项 diff --git a/yudao-server/src/main/resources/application-local.yaml b/yudao-server/src/main/resources/application-local.yaml index e98110a40..4424329a3 100644 --- a/yudao-server/src/main/resources/application-local.yaml +++ b/yudao-server/src/main/resources/application-local.yaml @@ -109,6 +109,23 @@ spring: jdbc: # 使用 JDBC 的 JobStore 的时候,JDBC 的配置 initialize-schema: NEVER # 是否自动使用 SQL 初始化 Quartz 表结构。这里设置成 never ,我们手动创建表结构。 +--- #################### 消息队列相关 #################### + +# rocketmq 配置项,对应 RocketMQProperties 配置类 +rocketmq: + name-server: 127.0.0.1:9876 # RocketMQ Namesrv + +spring: + # RabbitMQ 配置项,对应 RabbitProperties 配置类 + rabbitmq: + host: 127.0.0.1 # RabbitMQ 服务的地址 + port: 5672 # RabbitMQ 服务的端口 + username: guest # RabbitMQ 服务的账号 + password: guest # RabbitMQ 服务的密码 + # Kafka 配置项,对应 KafkaProperties 配置类 + kafka: + bootstrap-servers: 127.0.0.1:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔 + --- #################### 服务保障相关配置 #################### # Lock4j 配置项 diff --git a/yudao-server/src/main/resources/application.yaml b/yudao-server/src/main/resources/application.yaml index 1e507c8ac..9125bd49f 100644 --- a/yudao-server/src/main/resources/application.yaml +++ b/yudao-server/src/main/resources/application.yaml @@ -106,6 +106,32 @@ aj: req-check-minute-limit: 60 # check 接口一分钟内请求数限制 req-verify-minute-limit: 60 # verify 接口一分钟内请求数限制 +--- #################### 消息队列相关 #################### + +# rocketmq 配置项,对应 RocketMQProperties 配置类 +rocketmq: + # Producer 配置项 + producer: + group: ${spring.application.name}_PRODUCER # 生产者分组 + +spring: + # Kafka 配置项,对应 KafkaProperties 配置类 + kafka: + # Kafka Producer 配置项 + producer: + acks: 1 # 0-不应答。1-leader 应答。all-所有 leader 和 follower 应答。 + retries: 3 # 发送失败时,重试发送的次数 + value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 消息的 value 的序列化 + # Kafka Consumer 配置项 + consumer: + auto-offset-reset: earliest # 设置消费者分组最初的消费进度为 earliest 。可参考博客 https://blog.csdn.net/lishuangzhe7047/article/details/74530417 理解 + value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer + properties: + spring.json.trusted.packages: '*' + # Kafka Consumer Listener 监听器配置 + listener: + missing-topics-fatal: false # 消费监听接口监听的主题不存在时,默认会报错。所以通过设置为 false ,解决报错 + --- #################### 芋道相关配置 #################### yudao: @@ -145,12 +171,6 @@ yudao: - cn.iocoder.yudao.module.pay.enums.ErrorCodeConstants - cn.iocoder.yudao.module.system.enums.ErrorCodeConstants - cn.iocoder.yudao.module.mp.enums.ErrorCodeConstants - mq: - redis: - pubsub: - enable: false # 是否开启 Redis pubsub 广播消费,默认为 true。这里设置成 false,可以按需开启 - stream: - enable: false # 是否开启 Redis stream 集群消费,默认为 true。这里设置成 false,可以按需开启 tenant: # 多租户相关配置项 enable: true ignore-urls: