侧边栏壁纸
  • 累计撰写 22 篇文章
  • 累计创建 10 个标签
  • 累计收到 5 条评论

目 录CONTENT

文章目录

rocketmq整合springboot多实例配置

AF
AF
2023-11-27 / 1 评论 / 0 点赞 / 206 阅读 / 3347 字

SpringBoot 整合rocketmq 多实例

线上有个项目需要整合多个rocketmq的实例,按网上的教程配置了之后,发现存在都发到同一个broker中,并没有按预期的不同的消息进入不同的broker中

故进行源码分析:

本项目的依赖是:rocketmq-spring-boot-starter:2.2.0

<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.2.0</version>
</dependency>

配置如下:

@Bean("otherRocketmqTemplate") 
public RocketMQTemplate twoRocketmqTemplate() {
        RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
        DefaultMQProducer producer = new DefaultMQProducer("other_group");
        producer.setNamesrvAddr("127.0.0.1:8888");
        rocketMQTemplate.setProducer(producer);
        return rocketMQTemplate;
    }

按此配置进行调用,出现了上诉的问题。

产生问题的原因是:

org.apache.rocketmq.client.ClientConfig

public String buildMQClientId() {
        StringBuilder sb = new StringBuilder();
        sb.append(this.getClientIP());

        sb.append("@");
        sb.append(this.getInstanceName());
        if (!UtilAll.isBlank(this.unitName)) {
            sb.append("@");
            sb.append(this.unitName);
        }

        return sb.toString();
    }

观察源码发现:如果未配置instanceName 或者unitName,那么配置的clientId 尽管nameServer不一样,都会生成相同的clientId,clientId 的目的是为了和netty通信,那就会走到同一个netty。所以出现了消息发送永远发到同一个broker中。

解决办法:

@Configuration
public class RocketmqConfig {
    @Value("${rocketmq.name-server-other}")
    private String otherNameServer;

    @Bean("two")
    public RocketMQTemplate twoRocketmqTemplate() {
        RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
        DefaultMQProducer producer = new DefaultMQProducer("other_group");
        // 第一种:此处模拟consumer初始化的时候使用的方法,按nameserver形成新的instanceName
        producer.setInstanceName(RocketMQUtil.getInstanceName(otherNameServer)); 
        // 第二种,配置unitName
        // producer.setUnitName("随便定义一个")
        producer.setNamesrvAddr(otherNameServer);
        rocketMQTemplate.setProducer(producer);
        return rocketMQTemplate;
    }
}

重新定义一个rocketmqtemplate的目的是为了和默认的rocketmqtemplate一起使用,互不干扰。

而且DefaultMQProducer在源码中,如果定义了此bean,默认的DefaultMQProducer就不会加载。所以没有选择重新配置DefaultMQProducer的方式。

重新配置DefaultMQProducer的方式如下:

@Bean("two")
    public DefaultMQProducer twoMqProducer() throws MQClientException {
        DefaultMQProducer defaultMqProducer = new DefaultMQProducer("other_group");
        defaultMqProducer.setNamesrvAddr(otherNameServer);
        defaultMqProducer.setUnitName("B");
        defaultMqProducer.start();
        return defaultMqProducer;
    }

    @Bean("one")
    public DefaultMQProducer oneMQProducer() throws MQClientException {
        DefaultMQProducer defaultMqProducer = new DefaultMQProducer("default_message_group");
        defaultMqProducer.setNamesrvAddr("10.9.50.214:31003");
        defaultMqProducer.setUnitName("A");
        defaultMqProducer.start();
        return defaultMqProducer;
    }

另外说一点:在rocketmq-spring-boot-starter:2.2.2中,已经默认实现了clientId的唯一性,此问题出现在2.2.0或以下版本中

0

评论区