This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch release-mns-eb-broker-opencore-202506 in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 12d1cb1333e74182b5088583f4b69d6a15068a92 Author: RongtongJin <[email protected]> AuthorDate: Fri Jan 3 12:25:14 2025 +0800 Fix the issue of duplicate consumption in LMQ --- .../rocketmq/broker/longpolling/PopLongPollingService.java | 13 +++++++------ .../rocketmq/broker/processor/AdminBrokerProcessor.java | 2 +- .../rocketmq/broker/processor/PopBufferMergeService.java | 4 ++-- .../broker/offset/ConsumerOrderInfoManagerTest.java | 4 +--- 4 files changed, 11 insertions(+), 12 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java index d000692ccb..e87a8e803f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java @@ -52,7 +52,7 @@ public class PopLongPollingService extends ServiceThread { LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME); private final BrokerController brokerController; private final NettyRequestProcessor processor; - private final ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap; + private final ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, Byte>> topicCidMap; private final ConcurrentLinkedHashMap<String, ConcurrentSkipListSet<PopRequest>> pollingMap; private long lastCleanTime = 0; @@ -63,7 +63,8 @@ public class PopLongPollingService extends ServiceThread { this.brokerController = brokerController; this.processor = processor; // 100000 topic default, 100000 lru topic + cid + qid - this.topicCidMap = new ConcurrentHashMap<>(brokerController.getBrokerConfig().getPopPollingMapSize()); + this.topicCidMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentHashMap<String, Byte>>() + .maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize() * 2L).build(); this.pollingMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentSkipListSet<PopRequest>>() .maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build(); this.notifyLast = notifyLast; @@ -350,7 +351,7 @@ public class PopLongPollingService extends ServiceThread { Map.Entry<String, ConcurrentHashMap<String, Byte>> entry = topicCidMapIter.next(); String topic = entry.getKey(); if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) { - POP_LOGGER.info("remove not exit topic {} in topicCidMap!", topic); + POP_LOGGER.info("remove nonexistent topic {} in topicCidMap!", topic); topicCidMapIter.remove(); continue; } @@ -359,7 +360,7 @@ public class PopLongPollingService extends ServiceThread { Map.Entry<String, Byte> cidEntry = cidMapIter.next(); String cid = cidEntry.getKey(); if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) { - POP_LOGGER.info("remove not exit sub {} of topic {} in topicCidMap!", cid, topic); + POP_LOGGER.info("remove nonexistent subscription group {} of topic {} in topicCidMap!", cid, topic); cidMapIter.remove(); } } @@ -380,12 +381,12 @@ public class PopLongPollingService extends ServiceThread { String topic = keyArray[0]; String cid = keyArray[1]; if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) { - POP_LOGGER.info("remove not exit topic {} in pollingMap!", topic); + POP_LOGGER.info("remove nonexistent topic {} in pollingMap!", topic); pollingMapIter.remove(); continue; } if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) { - POP_LOGGER.info("remove not exit sub {} of topic {} in pollingMap!", cid, topic); + POP_LOGGER.info("remove nonexistent subscription group {} of topic {} in pollingMap!", cid, topic); pollingMapIter.remove(); } } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index a4e34a1e16..4d8b9fea2f 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -2922,7 +2922,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { GetTopicConfigRequestHeader requestHeader = (GetTopicConfigRequestHeader) request.decodeCommandCustomHeader(GetTopicConfigRequestHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(null); - TopicConfig topicConfig = this.brokerController.getTopicConfigManager().getTopicConfigTable().get(requestHeader.getTopic()); + TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic()); if (topicConfig == null) { LOGGER.error("No topic in this broker, client: {} topic: {}", ctx.channel().remoteAddress(), requestHeader.getTopic()); //be care of the response code, should set "not-exist" explicitly diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java index a4ebe267ed..30250a54fc 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java @@ -197,12 +197,12 @@ public class PopBufferMergeService extends ServiceThread { String topic = keyArray[0]; String cid = keyArray[1]; if (brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) { - POP_LOGGER.info("[PopBuffer]remove not exit topic {} in buffer!", topic); + POP_LOGGER.info("[PopBuffer]remove nonexistent topic {} in buffer!", topic); iterator.remove(); continue; } if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) { - POP_LOGGER.info("[PopBuffer]remove not exit sub {} of topic {} in buffer!", cid, topic); + POP_LOGGER.info("[PopBuffer]remove nonexistent subscription group {} of topic {} in buffer!", cid, topic); iterator.remove(); continue; } diff --git a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java index 25b418c934..dbd7117dc2 100644 --- a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java +++ b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java @@ -384,9 +384,7 @@ public class ConsumerOrderInfoManagerTest { SubscriptionGroupManager subscriptionGroupManager = mock(SubscriptionGroupManager.class); when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager); - ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroupConfigConcurrentMap = new ConcurrentHashMap<>(); - subscriptionGroupConfigConcurrentMap.put(GROUP, new SubscriptionGroupConfig()); - when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(subscriptionGroupConfigConcurrentMap); + when(subscriptionGroupManager.containsSubscriptionGroup(GROUP)).thenReturn(true); TopicConfig topicConfig = new TopicConfig(TOPIC); when(topicConfigManager.selectTopicConfig(eq(TOPIC))).thenReturn(topicConfig);
