From 7b36eca60945832c27146f1c6eb89a4b1042012f Mon Sep 17 00:00:00 2001 From: YunaiV Date: Sat, 18 Jun 2022 22:02:11 +0800 Subject: [PATCH] =?UTF-8?q?=E9=9B=86=E6=88=90=20spring-cloud-starter-strea?= =?UTF-8?q?m-rocketmq=20=E7=BB=84=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../mq/consumer/mail/MailSendConsumer.java | 8 +-- .../permission/MenuRefreshConsumer.java | 2 +- .../permission/RoleMenuRefreshConsumer.java | 8 +-- .../permission/RoleRefreshConsumer.java | 2 +- .../permission/UserRoleRefreshConsumer.java | 9 ++-- .../mq/message/mail/MailSendMessage.java | 7 +-- .../permission/MenuRefreshMessage.java | 9 +--- .../permission/RoleMenuRefreshMessage.java | 9 +--- .../permission/UserRoleRefreshMessage.java | 11 +--- .../mq/producer/permission/MenuProducer.java | 2 +- .../permission/PermissionProducer.java | 7 +-- .../mq/producer/permission/RoleProducer.java | 2 +- .../src/main/resources/application.yaml | 52 +++++++++++-------- 13 files changed, 56 insertions(+), 72 deletions(-) diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/mail/MailSendConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/mail/MailSendConsumer.java index 4b02f760e..ad52b4ed2 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/mail/MailSendConsumer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/mail/MailSendConsumer.java @@ -5,14 +5,16 @@ import cn.iocoder.yudao.module.system.mq.message.mail.MailSendMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; +import java.util.function.Consumer; + // TODO 芋艿:这个暂未实现 @Component @Slf4j -public class MailSendConsumer extends AbstractStreamMessageListener { +public class MailSendConsumer implements Consumer { @Override - public void onMessage(MailSendMessage message) { - log.info("[onMessage][消息内容({})]", message); + public void accept(MailSendMessage message) { + log.info("[accept][消息内容({})]", message); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/MenuRefreshConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/MenuRefreshConsumer.java index 406e3d0ba..cf0373e05 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/MenuRefreshConsumer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/MenuRefreshConsumer.java @@ -22,7 +22,7 @@ public class MenuRefreshConsumer implements Consumer { @Override public void accept(MenuRefreshMessage menuRefreshMessage) { - log.info("[onMessage][收到 Menu 刷新消息]"); + log.info("[accept][收到 Menu 刷新消息]"); menuService.initLocalCache(); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleMenuRefreshConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleMenuRefreshConsumer.java index d9f0e9220..1bce693f7 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleMenuRefreshConsumer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleMenuRefreshConsumer.java @@ -7,6 +7,7 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.util.function.Consumer; /** * 针对 {@link RoleMenuRefreshMessage} 的消费者 @@ -15,15 +16,14 @@ import javax.annotation.Resource; */ @Component @Slf4j -public class RoleMenuRefreshConsumer extends AbstractChannelMessageListener { +public class RoleMenuRefreshConsumer implements Consumer { @Resource private PermissionService permissionService; @Override - public void onMessage(RoleMenuRefreshMessage message) { - log.info("[onMessage][收到 Role 与 Menu 的关联刷新消息]"); + public void accept(RoleMenuRefreshMessage roleMenuRefreshMessage) { + log.info("[accept][收到 Role 与 Menu 的关联刷新消息]"); permissionService.initLocalCache(); } - } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleRefreshConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleRefreshConsumer.java index 5acf367d3..8c744397b 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleRefreshConsumer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/RoleRefreshConsumer.java @@ -22,7 +22,7 @@ public class RoleRefreshConsumer implements Consumer { @Override public void accept(RoleRefreshMessage message) { - log.info("[onMessage][收到 Role 刷新消息]"); + log.info("[accept][收到 Role 刷新消息]"); roleService.initLocalCache(); } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/UserRoleRefreshConsumer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/UserRoleRefreshConsumer.java index d580f58e2..f5d44b36f 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/UserRoleRefreshConsumer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/consumer/permission/UserRoleRefreshConsumer.java @@ -1,12 +1,12 @@ package cn.iocoder.yudao.module.system.mq.consumer.permission; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessageListener; import cn.iocoder.yudao.module.system.mq.message.permission.UserRoleRefreshMessage; import cn.iocoder.yudao.module.system.service.permission.PermissionService; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.Resource; +import java.util.function.Consumer; /** * 针对 {@link UserRoleRefreshMessage} 的消费者 @@ -15,15 +15,14 @@ import javax.annotation.Resource; */ @Component @Slf4j -public class UserRoleRefreshConsumer extends AbstractChannelMessageListener { +public class UserRoleRefreshConsumer implements Consumer { @Resource private PermissionService permissionService; @Override - public void onMessage(UserRoleRefreshMessage message) { - log.info("[onMessage][收到 User 与 Role 的关联刷新消息]"); + public void accept(UserRoleRefreshMessage userRoleRefreshMessage) { + log.info("[accept][收到 User 与 Role 的关联刷新消息]"); permissionService.initLocalCache(); } - } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/mail/MailSendMessage.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/mail/MailSendMessage.java index aee02c76e..42a10ba83 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/mail/MailSendMessage.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/mail/MailSendMessage.java @@ -13,8 +13,7 @@ import java.util.Map; * @author 芋道源码 */ @Data -@EqualsAndHashCode(callSuper = true) -public class MailSendMessage extends AbstractStreamMessage { +public class MailSendMessage { /** * 邮箱地址 @@ -40,9 +39,5 @@ public class MailSendMessage extends AbstractStreamMessage { */ private Integer userType; - @Override - public String getStreamKey() { - return "system.mail.send"; - } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/MenuRefreshMessage.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/MenuRefreshMessage.java index abd75dbaa..4805c4bae 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/MenuRefreshMessage.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/MenuRefreshMessage.java @@ -10,12 +10,5 @@ import lombok.EqualsAndHashCode; * @author 芋道源码 */ @Data -@EqualsAndHashCode(callSuper = true) -public class MenuRefreshMessage extends AbstractChannelMessage { - - @Override - public String getChannel() { - return "system.menu.refresh"; - } - +public class MenuRefreshMessage { } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/RoleMenuRefreshMessage.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/RoleMenuRefreshMessage.java index 0982775bc..947b4cd18 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/RoleMenuRefreshMessage.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/RoleMenuRefreshMessage.java @@ -10,12 +10,5 @@ import lombok.EqualsAndHashCode; * @author 芋道源码 */ @Data -@EqualsAndHashCode(callSuper = true) -public class RoleMenuRefreshMessage extends AbstractChannelMessage { - - @Override - public String getChannel() { - return "system.role-menu.refresh"; - } - +public class RoleMenuRefreshMessage { } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/UserRoleRefreshMessage.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/UserRoleRefreshMessage.java index 1644f5c00..8b6566d6f 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/UserRoleRefreshMessage.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/message/permission/UserRoleRefreshMessage.java @@ -1,8 +1,6 @@ package cn.iocoder.yudao.module.system.mq.message.permission; -import cn.iocoder.yudao.framework.mq.core.pubsub.AbstractChannelMessage; import lombok.Data; -import lombok.EqualsAndHashCode; /** * 用户与角色的数据刷新 Message @@ -10,12 +8,5 @@ import lombok.EqualsAndHashCode; * @author 芋道源码 */ @Data -@EqualsAndHashCode(callSuper = true) -public class UserRoleRefreshMessage extends AbstractChannelMessage { - - @Override - public String getChannel() { - return "system.user-role.refresh"; - } - +public class UserRoleRefreshMessage { } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/MenuProducer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/MenuProducer.java index 7f6939116..ca093c029 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/MenuProducer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/MenuProducer.java @@ -21,7 +21,7 @@ public class MenuProducer { */ public void sendMenuRefreshMessage() { MenuRefreshMessage message = new MenuRefreshMessage(); - streamBridge.send("demo02-output", message); + streamBridge.send("menuRefresh-out-0", message); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/PermissionProducer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/PermissionProducer.java index e3c4047c2..3ce7c52a2 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/PermissionProducer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/PermissionProducer.java @@ -3,6 +3,7 @@ package cn.iocoder.yudao.module.system.mq.producer.permission; import cn.iocoder.yudao.module.system.mq.message.permission.RoleMenuRefreshMessage; import cn.iocoder.yudao.framework.mq.core.RedisMQTemplate; import cn.iocoder.yudao.module.system.mq.message.permission.UserRoleRefreshMessage; +import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.stereotype.Component; import javax.annotation.Resource; @@ -14,14 +15,14 @@ import javax.annotation.Resource; public class PermissionProducer { @Resource - private RedisMQTemplate redisMQTemplate; + private StreamBridge streamBridge; /** * 发送 {@link RoleMenuRefreshMessage} 消息 */ public void sendRoleMenuRefreshMessage() { RoleMenuRefreshMessage message = new RoleMenuRefreshMessage(); - redisMQTemplate.send(message); + streamBridge.send("roleMenuRefresh-out-0", message); } /** @@ -29,7 +30,7 @@ public class PermissionProducer { */ public void sendUserRoleRefreshMessage() { UserRoleRefreshMessage message = new UserRoleRefreshMessage(); - redisMQTemplate.send(message); + streamBridge.send("userRoleRefresh-out-0", message); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/RoleProducer.java b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/RoleProducer.java index def7e4157..0c43fa47f 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/RoleProducer.java +++ b/yudao-module-system/yudao-module-system-biz/src/main/java/cn/iocoder/yudao/module/system/mq/producer/permission/RoleProducer.java @@ -26,7 +26,7 @@ public class RoleProducer { */ public void sendRoleRefreshMessage() { RoleRefreshMessage message = new RoleRefreshMessage(); - streamBridge.send("demo01-output", message); + streamBridge.send("roleRefresh-out-0", message); } } diff --git a/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml b/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml index b0f39ab29..8be1339e8 100644 --- a/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml +++ b/yudao-module-system/yudao-module-system-biz/src/main/resources/application.yaml @@ -56,42 +56,52 @@ spring: # Spring Cloud Stream 配置项,对应 BindingServiceProperties 类 stream: function: - definition: roleRefreshConsumer;menuRefreshConsumer; + definition: roleRefreshConsumer;menuRefreshConsumer;roleMenuRefreshConsumer;userRoleRefreshConsumer; # Binding 配置项,对应 BindingProperties Map bindings: - demo01-output: - destination: TEST + roleRefresh-out-0: + destination: system_role_refresh roleRefreshConsumer-in-0: - destination: TEST - group: roleRefreshConsumer - demo02-output: - destination: TEST2 + destination: system_role_refresh + group: system_role_refresh_consumer_group + menuRefresh-out-0: + destination: system_menu_refresh menuRefreshConsumer-in-0: - destination: TEST2 - group: menuRefreshConsumer + destination: system_menu_refresh + group: system_menu_refresh_consumer_group + roleMenuRefresh-out-0: + destination: system_role_menu_refresh + roleMenuRefreshConsumer-in-0: + destination: system_role_menu_refresh + group: system_role_menu_refresh_consumer_group + userRoleRefresh-out-0: + destination: system_user_role_refresh + userRoleRefreshConsumer-in-0: + destination: system_user_role_refresh + group: system_user_role_refresh_consumer_group # Spring Cloud Stream RocketMQ 配置项 rocketmq: # RocketMQ Binder 配置项,对应 RocketMQBinderConfigurationProperties 类 binder: name-server: 127.0.0.1:9876 # RocketMQ Namesrv 地址 + default: # 默认 bindings 全局配置 + producer: # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类 + group: system_producer_group # 生产者分组 + send-type: SYNC # 发送模式,SYNC 同步 # RocketMQ 自定义 Binding 配置项,对应 RocketMQBindingProperties Map bindings: - demo01-output: - # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类 - producer: - group: test # 生产者分组 - sync: true # 是否同步发送消息,默认为 false 异步。 roleRefreshConsumer-in-0: consumer: - message-model: BROADCASTING - demo02-output: - # RocketMQ Producer 配置项,对应 RocketMQProducerProperties 类 - producer: - group: test # 生产者分组 - sync: true # 是否同步发送消息,默认为 false 异步。 + message-model: BROADCASTING # 广播消费 menuRefreshConsumer-in-0: consumer: - message-model: BROADCASTING + message-model: BROADCASTING # 广播消费 + roleMenuRefreshConsumer-in-0: + consumer: + message-model: BROADCASTING # 广播消费 + userRoleRefreshConsumer-in-0: + consumer: + message-model: BROADCASTING # 广播消费 --- #################### 芋道相关配置 ####################