This is an automated email from the ASF dual-hosted git repository.

tianliuliu pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 9337904cdf [ISSUE #9309]opti:Avoid the generation of dirty data in 
#assignResetOffset (#9310)
9337904cdf is described below

commit 9337904cdf44e9de74204bf1bfb1b5e82a4b7f59
Author: hqbfz <[email protected]>
AuthorDate: Wed Jul 9 09:56:26 2025 +0800

    [ISSUE #9309]opti:Avoid the generation of dirty data in #assignResetOffset 
(#9310)
    
    * feat: support clients to reset lmq consumption offset
    
    * fix
    
    * fix
    
    * fix
    
    * fix: clean pull offset in #removeOffset
    
    * fix: clean pull offset in #removeOffset
    
    * rerun test
    
    ---------
    
    Co-authored-by: hqbfzwang <[email protected]>
---
 .../broker/config/v2/ConsumerOffsetManagerV2.java  | 26 ++++++++++++++++++++
 .../broker/offset/ConsumerOffsetManager.java       |  2 +-
 .../broker/offset/LmqConsumerOffsetManager.java    | 28 ++++++++++++++++++++++
 3 files changed, 55 insertions(+), 1 deletion(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
index 28214baf1c..e14ac0bb62 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/config/v2/ConsumerOffsetManagerV2.java
@@ -16,6 +16,7 @@
  */
 package org.apache.rocketmq.broker.config.v2;
 
+import com.google.common.base.Strings;
 import io.netty.buffer.ByteBuf;
 import io.netty.util.internal.PlatformDependent;
 import java.nio.ByteBuffer;
@@ -446,4 +447,29 @@ public class ConsumerOffsetManagerV2 extends 
ConsumerOffsetManager {
         }
         return -1;
     }
+
+    @Override
+    public void assignResetOffset(String topic, String group, int queueId, 
long offset) {
+        if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || 
queueId < 0 || offset < 0) {
+            LOG.warn("Illegal arguments when assigning reset offset. Topic={}, 
group={}, queueId={}, offset={}",
+                    topic, group, queueId, offset);
+            return;
+        }
+        if (!MixAll.isLmq(topic) || !MixAll.isLmq(group)) {
+            super.assignResetOffset(topic, group, queueId, offset);
+        } else {
+            String key = topic + TOPIC_GROUP_SEPARATOR + group;
+            ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
+            if (null == map) {
+                map = new ConcurrentHashMap<>();
+                ConcurrentMap<Integer, Long> previous = 
resetOffsetTable.putIfAbsent(key, map);
+                if (null != previous) {
+                    map = previous;
+                }
+            }
+            map.put(queueId, offset);
+        }
+
+        this.commitOffset(null, topic, group, queueId, offset);
+    }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index 140604f521..a6cd9ad987 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -50,7 +50,7 @@ public class ConsumerOffsetManager extends ConfigManager {
     protected ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer, 
Long>> offsetTable =
         new ConcurrentHashMap<>(512);
 
-    private final ConcurrentMap<String, ConcurrentMap<Integer, Long>> 
resetOffsetTable =
+    protected final ConcurrentMap<String, ConcurrentMap<Integer, Long>> 
resetOffsetTable =
         new ConcurrentHashMap<>(512);
 
     private final ConcurrentMap<String/* topic@group */, 
ConcurrentMap<Integer, Long>> pullOffsetTable =
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
index 53e9e2e063..a565ad07c3 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/LmqConsumerOffsetManager.java
@@ -20,7 +20,9 @@ import java.util.HashMap;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
+import com.google.common.base.Strings;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.BrokerPathConfigHelper;
 import org.apache.rocketmq.common.MixAll;
@@ -132,4 +134,30 @@ public class LmqConsumerOffsetManager extends 
ConsumerOffsetManager {
             }
         }
     }
+
+    @Override
+    public void assignResetOffset(String topic, String group, int queueId, 
long offset) {
+        if (Strings.isNullOrEmpty(topic) || Strings.isNullOrEmpty(group) || 
queueId < 0 || offset < 0) {
+            LOG.warn("Illegal arguments when assigning reset offset. Topic={}, 
group={}, queueId={}, offset={}",
+                    topic, group, queueId, offset);
+            return;
+        }
+        if (!MixAll.isLmq(topic) || !MixAll.isLmq(group)) {
+            super.assignResetOffset(topic, group, queueId, offset);
+            return;
+        }
+
+        String key = topic + TOPIC_GROUP_SEPARATOR + group;
+        ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
+        if (null == map) {
+            map = new ConcurrentHashMap<>();
+            ConcurrentMap<Integer, Long> previous = 
resetOffsetTable.putIfAbsent(key, map);
+            if (null != previous) {
+                map = previous;
+            }
+        }
+        map.put(queueId, offset);
+
+        lmqOffsetTable.computeIfPresent(key, (k, oldValue) -> offset);
+    }
 }

Reply via email to