This is an automated email from the ASF dual-hosted git repository. jinrongtong pushed a commit to branch release-mns-eb-broker-opencore-202506 in repository https://gitbox.apache.org/repos/asf/rocketmq.git
commit 13135b9691523b77ee609993174df97277b9e896 Author: RongtongJin <[email protected]> AuthorDate: Fri Jan 3 12:25:14 2025 +0800 Fix the issue that lmq consumer offset rollback --- .../org/apache/rocketmq/broker/longpolling/PopLongPollingService.java | 4 ++-- .../org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java | 2 +- .../org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java | 4 ++-- .../org/apache/rocketmq/broker/processor/PopBufferMergeService.java | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java index 91185fbe94..d000692ccb 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/longpolling/PopLongPollingService.java @@ -358,7 +358,7 @@ public class PopLongPollingService extends ServiceThread { while (cidMapIter.hasNext()) { Map.Entry<String, Byte> cidEntry = cidMapIter.next(); String cid = cidEntry.getKey(); - if (!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid)) { + if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) { POP_LOGGER.info("remove not exit sub {} of topic {} in topicCidMap!", cid, topic); cidMapIter.remove(); } @@ -384,7 +384,7 @@ public class PopLongPollingService extends ServiceThread { pollingMapIter.remove(); continue; } - if (!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid)) { + if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) { POP_LOGGER.info("remove not exit sub {} of topic {} in pollingMap!", cid, topic); pollingMapIter.remove(); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java index 4eccc6c037..120f5b104c 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOrderInfoManager.java @@ -281,7 +281,7 @@ public class ConsumerOrderInfoManager extends ConfigManager { continue; } - if (this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(group) == null) { + if (!this.brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(group)) { iterator.remove(); log.info("Group not exist, Clean order info, {}:{}", topicAtGroup, qs); continue; diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java index 4c341dde92..a4e34a1e16 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/AdminBrokerProcessor.java @@ -421,7 +421,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { GetSubscriptionGroupConfigRequestHeader requestHeader = (GetSubscriptionGroupConfigRequestHeader) request.decodeCommandCustomHeader(GetSubscriptionGroupConfigRequestHeader.class); final RemotingCommand response = RemotingCommand.createResponseCommand(null); - SubscriptionGroupConfig groupConfig = this.brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(requestHeader.getGroup()); + SubscriptionGroupConfig groupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup()); if (groupConfig == null) { LOGGER.error("No group in this broker, client: {} group: {}", ctx.channel().remoteAddress(), requestHeader.getGroup()); response.setCode(ResponseCode.SYSTEM_ERROR); @@ -2433,7 +2433,7 @@ public class AdminBrokerProcessor implements NettyRequestProcessor { } // groupSysFlag if (StringUtils.isNotEmpty(requestHeader.getConsumerGroup())) { - SubscriptionGroupConfig groupConfig = brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().get(requestHeader.getConsumerGroup()); + SubscriptionGroupConfig groupConfig = brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup()); if (groupConfig != null) { request.addExtField("groupSysFlag", String.valueOf(groupConfig.getGroupSysFlag())); } diff --git a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java index 9f10b483dd..a4ebe267ed 100644 --- a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java +++ b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopBufferMergeService.java @@ -201,7 +201,7 @@ public class PopBufferMergeService extends ServiceThread { iterator.remove(); continue; } - if (!brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable().containsKey(cid)) { + if (!brokerController.getSubscriptionGroupManager().containsSubscriptionGroup(cid)) { POP_LOGGER.info("[PopBuffer]remove not exit sub {} of topic {} in buffer!", cid, topic); iterator.remove(); continue;
