This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch release-mns-eb-broker-opencore-202506
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 12d1cb1333e74182b5088583f4b69d6a15068a92
Author: RongtongJin <[email protected]>
AuthorDate: Fri Jan 3 12:25:14 2025 +0800

    Fix the issue of duplicate consumption in LMQ
---
 .../rocketmq/broker/longpolling/PopLongPollingService.java  | 13 +++++++------
 .../rocketmq/broker/processor/AdminBrokerProcessor.java     |  2 +-
 .../rocketmq/broker/processor/PopBufferMergeService.java    |  4 ++--
 .../broker/offset/ConsumerOrderInfoManagerTest.java         |  4 +---
 4 files changed, 11 insertions(+), 12 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
index d000692ccb..e87a8e803f 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
@@ -52,7 +52,7 @@ public class PopLongPollingService extends ServiceThread {
         LoggerFactory.getLogger(LoggerName.ROCKETMQ_POP_LOGGER_NAME);
     private final BrokerController brokerController;
     private final NettyRequestProcessor processor;
-    private final ConcurrentHashMap<String, ConcurrentHashMap<String, Byte>> 
topicCidMap;
+    private final ConcurrentLinkedHashMap<String, ConcurrentHashMap<String, 
Byte>> topicCidMap;
     private final ConcurrentLinkedHashMap<String, 
ConcurrentSkipListSet<PopRequest>> pollingMap;
     private long lastCleanTime = 0;
 
@@ -63,7 +63,8 @@ public class PopLongPollingService extends ServiceThread {
         this.brokerController = brokerController;
         this.processor = processor;
         // 100000 topic default,  100000 lru topic + cid + qid
-        this.topicCidMap = new 
ConcurrentHashMap<>(brokerController.getBrokerConfig().getPopPollingMapSize());
+        this.topicCidMap = new ConcurrentLinkedHashMap.Builder<String, 
ConcurrentHashMap<String, Byte>>()
+            
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()
 * 2L).build();
         this.pollingMap = new ConcurrentLinkedHashMap.Builder<String, 
ConcurrentSkipListSet<PopRequest>>()
             
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
         this.notifyLast = notifyLast;
@@ -350,7 +351,7 @@ public class PopLongPollingService extends ServiceThread {
                     Map.Entry<String, ConcurrentHashMap<String, Byte>> entry = 
topicCidMapIter.next();
                     String topic = entry.getKey();
                     if 
(brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
-                        POP_LOGGER.info("remove not exit topic {} in 
topicCidMap!", topic);
+                        POP_LOGGER.info("remove nonexistent topic {} in 
topicCidMap!", topic);
                         topicCidMapIter.remove();
                         continue;
                     }
@@ -359,7 +360,7 @@ public class PopLongPollingService extends ServiceThread {
                         Map.Entry<String, Byte> cidEntry = cidMapIter.next();
                         String cid = cidEntry.getKey();
                         if 
(!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid))
 {
-                            POP_LOGGER.info("remove not exit sub {} of topic 
{} in topicCidMap!", cid, topic);
+                            POP_LOGGER.info("remove nonexistent subscription 
group {} of topic {} in topicCidMap!", cid, topic);
                             cidMapIter.remove();
                         }
                     }
@@ -380,12 +381,12 @@ public class PopLongPollingService extends ServiceThread {
                     String topic = keyArray[0];
                     String cid = keyArray[1];
                     if 
(brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
-                        POP_LOGGER.info("remove not exit topic {} in 
pollingMap!", topic);
+                        POP_LOGGER.info("remove nonexistent topic {} in 
pollingMap!", topic);
                         pollingMapIter.remove();
                         continue;
                     }
                     if 
(!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid))
 {
-                        POP_LOGGER.info("remove not exit sub {} of topic {} in 
pollingMap!", cid, topic);
+                        POP_LOGGER.info("remove nonexistent subscription group 
{} of topic {} in pollingMap!", cid, topic);
                         pollingMapIter.remove();
                     }
                 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index a4e34a1e16..4d8b9fea2f 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -2922,7 +2922,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         GetTopicConfigRequestHeader requestHeader = 
(GetTopicConfigRequestHeader) 
request.decodeCommandCustomHeader(GetTopicConfigRequestHeader.class);
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
 
-        TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().getTopicConfigTable().get(requestHeader.getTopic());
+        TopicConfig topicConfig = 
this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
         if (topicConfig == null) {
             LOGGER.error("No topic in this broker, client: {} topic: {}", 
ctx.channel().remoteAddress(), requestHeader.getTopic());
             //be care of the response code, should set "not-exist" explicitly
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index a4ebe267ed..30250a54fc 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -197,12 +197,12 @@ public class PopBufferMergeService extends ServiceThread {
             String topic = keyArray[0];
             String cid = keyArray[1];
             if 
(brokerController.getTopicConfigManager().selectTopicConfig(topic) == null) {
-                POP_LOGGER.info("[PopBuffer]remove not exit topic {} in 
buffer!", topic);
+                POP_LOGGER.info("[PopBuffer]remove nonexistent topic {} in 
buffer!", topic);
                 iterator.remove();
                 continue;
             }
             if 
(!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid))
 {
-                POP_LOGGER.info("[PopBuffer]remove not exit sub {} of topic {} 
in buffer!", cid, topic);
+                POP_LOGGER.info("[PopBuffer]remove nonexistent subscription 
group {} of topic {} in buffer!", cid, topic);
                 iterator.remove();
                 continue;
             }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
index 25b418c934..dbd7117dc2 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManagerTest.java
@@ -384,9 +384,7 @@ public class ConsumerOrderInfoManagerTest {
 
         SubscriptionGroupManager subscriptionGroupManager = 
mock(SubscriptionGroupManager.class);
         
when(brokerController.getSubscriptionGroupManager()).thenReturn(subscriptionGroupManager);
-        ConcurrentMap<String, SubscriptionGroupConfig> 
subscriptionGroupConfigConcurrentMap = new ConcurrentHashMap<>();
-        subscriptionGroupConfigConcurrentMap.put(GROUP, new 
SubscriptionGroupConfig());
-        
when(subscriptionGroupManager.getSubscriptionGroupTable()).thenReturn(subscriptionGroupConfigConcurrentMap);
+        
when(subscriptionGroupManager.containsSubscriptionGroup(GROUP)).thenReturn(true);
 
         TopicConfig topicConfig = new TopicConfig(TOPIC);
         
when(topicConfigManager.selectTopicConfig(eq(TOPIC))).thenReturn(topicConfig);

Reply via email to