GitHub user jiakme edited a comment on the discussion: single instance with
multi DefaultLitePullConsumer(use same topic and different tag) don't poll
message right
```java
private void initRocketMQPushConsumer() throws MQClientException {
List<DefaultLitePullConsumer> pullConsumers = new ArrayList<>();
DefaultLitePullConsumer defaultLitePullConsumer =
getDefaultLitePullConsumer("high");
pullConsumers.add(defaultLitePullConsumer);
DefaultLitePullConsumer defaultLitePullConsumer_1 =
getDefaultLitePullConsumer("low");
pullConsumers.add(defaultLitePullConsumer_1);
PULL_CONSUMERS = Collections.unmodifiableList(pullConsumers);
}
private DefaultLitePullConsumer getDefaultLitePullConsumer(String tag) throws
MQClientException {
RocketMQProperties.Consumer consumerConfig = new
RocketMQProperties.Consumer();
consumerConfig.setGroup(MqConfig.CATEGORY_SMART_CAMPUS_APP_CENTER +
"_priority");
consumerConfig.setTopic(TOPIC);
consumerConfig.setPullBatchSize(50);
DefaultLitePullConsumer defaultLitePullConsumer;
consumerConfig.setSelectorExpression(tag);
defaultLitePullConsumer = buildPullConsumer(consumerConfig);
return defaultLitePullConsumer;
}
private DefaultLitePullConsumer buildPullConsumer(RocketMQProperties.Consumer
consumerConfig) throws MQClientException {
/**
* {@link
org.apache.rocketmq.spring.autoconfigure.RocketMQAutoConfiguration#defaultLitePullConsumer(RocketMQProperties)}
*/
String nameServer = rocketMQProperties.getNameServer();
String groupName = consumerConfig.getGroup();
String topicName = consumerConfig.getTopic();
Assert.hasText(nameServer, "[rocketmq.name-server] must not be null");
Assert.hasText(groupName, "[rocketmq.pull-consumer.group] must not be
null");
Assert.hasText(topicName, "[rocketmq.pull-consumer.topic] must not be
null");
String accessChannel = rocketMQProperties.getAccessChannel();
MessageModel messageModel =
MessageModel.valueOf(consumerConfig.getMessageModel());
SelectorType selectorType =
SelectorType.valueOf(consumerConfig.getSelectorType());
String selectorExpression = consumerConfig.getSelectorExpression();
String ak = consumerConfig.getAccessKey();
String sk = consumerConfig.getSecretKey();
int pullBatchSize = consumerConfig.getPullBatchSize();
boolean useTLS = consumerConfig.isTlsEnable();
DefaultLitePullConsumer litePullConsumer =
RocketMQUtil.createDefaultLitePullConsumer(nameServer, accessChannel,
groupName, topicName, messageModel, selectorType,
selectorExpression, ak, sk, pullBatchSize, useTLS);
litePullConsumer.setEnableMsgTrace(consumerConfig.isEnableMsgTrace());
litePullConsumer.setCustomizedTraceTopic(consumerConfig.getCustomizedTraceTopic());
litePullConsumer.setNamespace(consumerConfig.getNamespace());
litePullConsumer.setAllocateMessageQueueStrategy(new
AllocateMessageQueueAveragely());
litePullConsumer.setAutoCommit(true);
return litePullConsumer;
}
```
GitHub link:
https://github.com/apache/rocketmq/discussions/6578#discussioncomment-5587162
----
This is an automatically sent email for [email protected].
To unsubscribe, please send an email to: [email protected]