This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch release-mns-eb-broker-opencore-202506
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit 13135b9691523b77ee609993174df97277b9e896
Author: RongtongJin <[email protected]>
AuthorDate: Fri Jan 3 12:25:14 2025 +0800

    Fix the issue that lmq consumer offset rollback
---
 .../org/apache/rocketmq/broker/longpolling/PopLongPollingService.java | 4 ++--
 .../org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java   | 2 +-
 .../org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java    | 4 ++--
 .../org/apache/rocketmq/broker/processor/PopBufferMergeService.java   | 2 +-
 4 files changed, 6 insertions(+), 6 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
index 91185fbe94..d000692ccb 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java
@@ -358,7 +358,7 @@ public class PopLongPollingService extends ServiceThread {
                     while (cidMapIter.hasNext()) {
                         Map.Entry<String, Byte> cidEntry = cidMapIter.next();
                         String cid = cidEntry.getKey();
-                        if 
(!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid))
 {
+                        if 
(!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid))
 {
                             POP_LOGGER.info("remove not exit sub {} of topic 
{} in topicCidMap!", cid, topic);
                             cidMapIter.remove();
                         }
@@ -384,7 +384,7 @@ public class PopLongPollingService extends ServiceThread {
                         pollingMapIter.remove();
                         continue;
                     }
-                    if 
(!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid))
 {
+                    if 
(!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid))
 {
                         POP_LOGGER.info("remove not exit sub {} of topic {} in 
pollingMap!", cid, topic);
                         pollingMapIter.remove();
                     }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
index 4eccc6c037..120f5b104c 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java
@@ -281,7 +281,7 @@ public class ConsumerOrderInfoManager extends ConfigManager 
{
                 continue;
             }
 
-            if 
(this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(group)
 == null) {
+            if 
(!this.brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(group))
 {
                 iterator.remove();
                 log.info("Group not exist, Clean order info, {}:{}", 
topicAtGroup, qs);
                 continue;
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
index 4c341dde92..a4e34a1e16 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java
@@ -421,7 +421,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         GetSubscriptionGroupConfigRequestHeader requestHeader = 
(GetSubscriptionGroupConfigRequestHeader) 
request.decodeCommandCustomHeader(GetSubscriptionGroupConfigRequestHeader.class);
         final RemotingCommand response = 
RemotingCommand.createResponseCommand(null);
 
-        SubscriptionGroupConfig groupConfig = 
this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(requestHeader.getGroup());
+        SubscriptionGroupConfig groupConfig = 
this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
         if (groupConfig == null) {
             LOGGER.error("No group in this broker, client: {} group: {}", 
ctx.channel().remoteAddress(), requestHeader.getGroup());
             response.setCode(ResponseCode.SYSTEM_ERROR);
@@ -2433,7 +2433,7 @@ public class AdminBrokerProcessor implements 
NettyRequestProcessor {
         }
         // groupSysFlag
         if (StringUtils.isNotEmpty(requestHeader.getConsumerGroup())) {
-            SubscriptionGroupConfig groupConfig = 
brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(requestHeader.getConsumerGroup());
+            SubscriptionGroupConfig groupConfig = 
brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());
             if (groupConfig != null) {
                 request.addExtField("groupSysFlag", 
String.valueOf(groupConfig.getGroupSysFlag()));
             }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
index 9f10b483dd..a4ebe267ed 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java
@@ -201,7 +201,7 @@ public class PopBufferMergeService extends ServiceThread {
                 iterator.remove();
                 continue;
             }
-            if 
(!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid))
 {
+            if 
(!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid))
 {
                 POP_LOGGER.info("[PopBuffer]remove not exit sub {} of topic {} 
in buffer!", cid, topic);
                 iterator.remove();
                 continue;

Reply via email to