SpringBoot 整合rocketmq 多实例
线上有个项目需要整合多个rocketmq的实例,按网上的教程配置了之后,发现存在都发到同一个broker中,并没有按预期的不同的消息进入不同的broker中
故进行源码分析:
本项目的依赖是:rocketmq-spring-boot-starter:2.2.0
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.0</version>
</dependency>
配置如下:
@Bean("otherRocketmqTemplate")
public RocketMQTemplate twoRocketmqTemplate() {
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
DefaultMQProducer producer = new DefaultMQProducer("other_group");
producer.setNamesrvAddr("127.0.0.1:8888");
rocketMQTemplate.setProducer(producer);
return rocketMQTemplate;
}
按此配置进行调用,出现了上诉的问题。
产生问题的原因是:
org.apache.rocketmq.client.ClientConfig
public String buildMQClientId() {
StringBuilder sb = new StringBuilder();
sb.append(this.getClientIP());
sb.append("@");
sb.append(this.getInstanceName());
if (!UtilAll.isBlank(this.unitName)) {
sb.append("@");
sb.append(this.unitName);
}
return sb.toString();
}
观察源码发现:如果未配置instanceName 或者unitName,那么配置的clientId 尽管nameServer不一样,都会生成相同的clientId,clientId 的目的是为了和netty通信,那就会走到同一个netty。所以出现了消息发送永远发到同一个broker中。
解决办法:
@Configuration
public class RocketmqConfig {
@Value("${rocketmq.name-server-other}")
private String otherNameServer;
@Bean("two")
public RocketMQTemplate twoRocketmqTemplate() {
RocketMQTemplate rocketMQTemplate = new RocketMQTemplate();
DefaultMQProducer producer = new DefaultMQProducer("other_group");
// 第一种:此处模拟consumer初始化的时候使用的方法,按nameserver形成新的instanceName
producer.setInstanceName(RocketMQUtil.getInstanceName(otherNameServer));
// 第二种,配置unitName
// producer.setUnitName("随便定义一个")
producer.setNamesrvAddr(otherNameServer);
rocketMQTemplate.setProducer(producer);
return rocketMQTemplate;
}
}
重新定义一个rocketmqtemplate的目的是为了和默认的rocketmqtemplate一起使用,互不干扰。
而且DefaultMQProducer在源码中,如果定义了此bean,默认的DefaultMQProducer就不会加载。所以没有选择重新配置DefaultMQProducer的方式。
重新配置DefaultMQProducer的方式如下:
@Bean("two")
public DefaultMQProducer twoMqProducer() throws MQClientException {
DefaultMQProducer defaultMqProducer = new DefaultMQProducer("other_group");
defaultMqProducer.setNamesrvAddr(otherNameServer);
defaultMqProducer.setUnitName("B");
defaultMqProducer.start();
return defaultMqProducer;
}
@Bean("one")
public DefaultMQProducer oneMQProducer() throws MQClientException {
DefaultMQProducer defaultMqProducer = new DefaultMQProducer("default_message_group");
defaultMqProducer.setNamesrvAddr("10.9.50.214:31003");
defaultMqProducer.setUnitName("A");
defaultMqProducer.start();
return defaultMqProducer;
}
另外说一点:在rocketmq-spring-boot-starter:2.2.2中,已经默认实现了clientId的唯一性,此问题出现在2.2.0或以下版本中
评论区