This is an automated email from the ASF dual-hosted git repository.

RongtongJin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 49102a9985 [ISSUE #10549] Fix lite topic reset offset: memory leak, 
FIFO block bypass, and offset-0 reset failure (#10550)
49102a9985 is described below

commit 49102a99852147c2fa90d49250f4ffb0d288f3c8
Author: Quan <[email protected]>
AuthorDate: Mon Jun 29 15:34:00 2026 +0800

    [ISSUE #10549] Fix lite topic reset offset: memory leak, FIFO block bypass, 
and offset-0 reset failure (#10550)
    
    * [ISSUE #10549] Fix lite topic reset offset: memory leak, FIFO block 
bypass, and offset-0 reset failure
    
    - Fix memory leak in removeResetOffset: clean up empty inner map entries 
from resetOffsetTable
    - Add eraseResetOffset for precise cleanup on lite topic removal
    - Skip FIFO block check in isFifoBlocked when server-side reset offset is 
pending
    - Fix ResetOffsetByTimeCommand: change resetOffset > 0 to >= 0 to allow 
resetting to offset 0
    - Add unit tests for eraseResetOffset and isFifoBlocked reset bypass
    
    * chore: empty commit to trigger CI pipeline
---
 .../broker/lite/AbstractLiteLifecycleManager.java   |  1 +
 .../broker/offset/ConsumerOffsetManager.java        | 17 +++++++++++++++--
 .../broker/processor/PopLiteMessageProcessor.java   |  4 ++++
 .../broker/offset/ConsumerOffsetManagerTest.java    | 21 +++++++++++++++++++++
 .../processor/PopLiteMessageProcessorTest.java      | 11 +++++++++++
 .../command/offset/ResetOffsetByTimeCommand.java    |  2 +-
 6 files changed, 53 insertions(+), 3 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java
index b038a692d3..f7f522b833 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java
@@ -205,6 +205,7 @@ public abstract class AbstractLiteLifecycleManager extends 
ServiceThread {
             groups.forEach(group -> {
                 String topicAtGroup = lmqName + TOPIC_GROUP_SEPARATOR + group;
                 
brokerController.getConsumerOffsetManager().getOffsetTable().remove(topicAtGroup);
+                
brokerController.getConsumerOffsetManager().eraseResetOffset(lmqName, group, 0);
                 
brokerController.getConsumerOffsetManager().removeConsumerOffset(topicAtGroup); 
// no iteration
                 
brokerController.getPopLiteMessageProcessor().getConsumerOrderInfoManager().remove(lmqName,
 group);
             });
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index f9debf3857..1d3bf7bed0 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -466,8 +466,21 @@ public class ConsumerOffsetManager extends ConfigManager {
         ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
         if (null == map) {
             return null;
-        } else {
-            return map.remove(queueId);
         }
+        Long offset = map.remove(queueId);
+        if (map.isEmpty()) {
+            resetOffsetTable.computeIfPresent(key, (k, _map) ->
+                _map.isEmpty() ? null : _map
+            );
+        }
+        return offset;
+    }
+
+    public void eraseResetOffset(String topic, String group, int queueId) {
+        String key = topic + TOPIC_GROUP_SEPARATOR + group;
+        resetOffsetTable.computeIfPresent(key, (k, map) -> {
+            map.remove(queueId);
+            return map.isEmpty() ? null : map;
+        });
     }
 }
diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
index a1fa417152..4da72ef4bd 100644
--- 
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
+++ 
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
@@ -311,6 +311,10 @@ public class PopLiteMessageProcessor implements 
NettyRequestProcessor {
     }
 
     public boolean isFifoBlocked(String attemptId, String group, String 
lmqName, long invisibleTime) {
+        if (brokerController.getBrokerConfig().isUseServerSideResetOffset() &&
+            
this.brokerController.getConsumerOffsetManager().hasOffsetReset(lmqName, group, 
0)) {
+            return false;
+        }
         return consumerOrderInfoManager.checkBlock(attemptId, lmqName, group, 
0, invisibleTime);
     }
 
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
index 3ddd369c7f..7e4faa4e42 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
@@ -100,4 +100,25 @@ public class ConsumerOffsetManagerTest {
         ConcurrentMap<Integer, Long> offsetTableLoaded = 
manager.getOffsetTable().get(group);
         Assert.assertEquals(table, offsetTableLoaded);
     }
+
+    @Test
+    public void testEraseResetOffset() {
+        String topic = "Topic";
+        String group = "Group";
+        String key = topic + TOPIC_GROUP_SEPARATOR + group;
+        consumerOffsetManager.assignResetOffset(topic, group, 0, 100L);
+        consumerOffsetManager.assignResetOffset(topic, group, 1, 200L);
+
+        Assert.assertTrue(consumerOffsetManager.hasOffsetReset(topic, group, 
0));
+        Assert.assertTrue(consumerOffsetManager.hasOffsetReset(topic, group, 
1));
+
+        consumerOffsetManager.eraseResetOffset(topic, group, 0);
+        Assert.assertFalse(consumerOffsetManager.hasOffsetReset(topic, group, 
0));
+        Assert.assertTrue(consumerOffsetManager.hasOffsetReset(topic, group, 
1));
+        
Assert.assertTrue(consumerOffsetManager.resetOffsetTable.containsKey(key));
+
+        consumerOffsetManager.eraseResetOffset(topic, group, 1);
+        Assert.assertFalse(consumerOffsetManager.hasOffsetReset(topic, group, 
1));
+        
Assert.assertFalse(consumerOffsetManager.resetOffsetTable.containsKey(key));
+    }
 }
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java
 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java
index 9705ab4f5a..957386b166 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java
@@ -56,6 +56,7 @@ import java.util.Iterator;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
@@ -149,6 +150,16 @@ public class PopLiteMessageProcessorTest {
         verify(consumerOrderInfoManager).checkBlock("attemptId", "lmqName", 
"group", 0, 1000L);
     }
 
+    @Test
+    public void testIsFifoBlocked_hasResetOffset() {
+        brokerConfig.setUseServerSideResetOffset(true);
+        when(consumerOffsetManager.hasOffsetReset("lmqName", "group", 
0)).thenReturn(true);
+
+        assertFalse(popLiteMessageProcessor.isFifoBlocked("attemptId", 
"group", "lmqName", 1000L));
+        verify(consumerOffsetManager).hasOffsetReset("lmqName", "group", 0);
+        verify(consumerOrderInfoManager, never()).checkBlock(anyString(), 
anyString(), anyString(), anyInt(), anyLong());
+    }
+
     @Test
     public void testGetPopOffset_normal() throws ConsumeQueueException {
         String group = "group";
diff --git 
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
 
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
index 84a301bd60..f0326fdb75 100644
--- 
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
+++ 
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
@@ -140,7 +140,7 @@ public class ResetOffsetByTimeCommand implements SubCommand 
{
                     defaultMQAdminExt.searchOffset(brokerAddr, topic, queueId, 
timestamp, 3000);
 
                 System.out.printf("reset consumer offset to %d%n", 
resetOffset);
-                if (resetOffset > 0) {
+                if (resetOffset >= 0) {
                     defaultMQAdminExt.resetOffsetByQueueId(brokerAddr, group, 
topic, queueId, resetOffset);
                 }
                 return;

Reply via email to