This is an automated email from the ASF dual-hosted git repository.
RongtongJin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 49102a9985 [ISSUE #10549] Fix lite topic reset offset: memory leak,
FIFO block bypass, and offset-0 reset failure (#10550)
49102a9985 is described below
commit 49102a99852147c2fa90d49250f4ffb0d288f3c8
Author: Quan <[email protected]>
AuthorDate: Mon Jun 29 15:34:00 2026 +0800
[ISSUE #10549] Fix lite topic reset offset: memory leak, FIFO block bypass,
and offset-0 reset failure (#10550)
* [ISSUE #10549] Fix lite topic reset offset: memory leak, FIFO block
bypass, and offset-0 reset failure
- Fix memory leak in removeResetOffset: clean up empty inner map entries
from resetOffsetTable
- Add eraseResetOffset for precise cleanup on lite topic removal
- Skip FIFO block check in isFifoBlocked when server-side reset offset is
pending
- Fix ResetOffsetByTimeCommand: change resetOffset > 0 to >= 0 to allow
resetting to offset 0
- Add unit tests for eraseResetOffset and isFifoBlocked reset bypass
* chore: empty commit to trigger CI pipeline
---
.../broker/lite/AbstractLiteLifecycleManager.java | 1 +
.../broker/offset/ConsumerOffsetManager.java | 17 +++++++++++++++--
.../broker/processor/PopLiteMessageProcessor.java | 4 ++++
.../broker/offset/ConsumerOffsetManagerTest.java | 21 +++++++++++++++++++++
.../processor/PopLiteMessageProcessorTest.java | 11 +++++++++++
.../command/offset/ResetOffsetByTimeCommand.java | 2 +-
6 files changed, 53 insertions(+), 3 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java
index b038a692d3..f7f522b833 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/lite/AbstractLiteLifecycleManager.java
@@ -205,6 +205,7 @@ public abstract class AbstractLiteLifecycleManager extends
ServiceThread {
groups.forEach(group -> {
String topicAtGroup = lmqName + TOPIC_GROUP_SEPARATOR + group;
brokerController.getConsumerOffsetManager().getOffsetTable().remove(topicAtGroup);
+
brokerController.getConsumerOffsetManager().eraseResetOffset(lmqName, group, 0);
brokerController.getConsumerOffsetManager().removeConsumerOffset(topicAtGroup);
// no iteration
brokerController.getPopLiteMessageProcessor().getConsumerOrderInfoManager().remove(lmqName,
group);
});
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
index f9debf3857..1d3bf7bed0 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManager.java
@@ -466,8 +466,21 @@ public class ConsumerOffsetManager extends ConfigManager {
ConcurrentMap<Integer, Long> map = resetOffsetTable.get(key);
if (null == map) {
return null;
- } else {
- return map.remove(queueId);
}
+ Long offset = map.remove(queueId);
+ if (map.isEmpty()) {
+ resetOffsetTable.computeIfPresent(key, (k, _map) ->
+ _map.isEmpty() ? null : _map
+ );
+ }
+ return offset;
+ }
+
+ public void eraseResetOffset(String topic, String group, int queueId) {
+ String key = topic + TOPIC_GROUP_SEPARATOR + group;
+ resetOffsetTable.computeIfPresent(key, (k, map) -> {
+ map.remove(queueId);
+ return map.isEmpty() ? null : map;
+ });
}
}
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
index a1fa417152..4da72ef4bd 100644
---
a/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
+++
b/broker/src/main/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessor.java
@@ -311,6 +311,10 @@ public class PopLiteMessageProcessor implements
NettyRequestProcessor {
}
public boolean isFifoBlocked(String attemptId, String group, String
lmqName, long invisibleTime) {
+ if (brokerController.getBrokerConfig().isUseServerSideResetOffset() &&
+
this.brokerController.getConsumerOffsetManager().hasOffsetReset(lmqName, group,
0)) {
+ return false;
+ }
return consumerOrderInfoManager.checkBlock(attemptId, lmqName, group,
0, invisibleTime);
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
index 3ddd369c7f..7e4faa4e42 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/offset/ConsumerOffsetManagerTest.java
@@ -100,4 +100,25 @@ public class ConsumerOffsetManagerTest {
ConcurrentMap<Integer, Long> offsetTableLoaded =
manager.getOffsetTable().get(group);
Assert.assertEquals(table, offsetTableLoaded);
}
+
+ @Test
+ public void testEraseResetOffset() {
+ String topic = "Topic";
+ String group = "Group";
+ String key = topic + TOPIC_GROUP_SEPARATOR + group;
+ consumerOffsetManager.assignResetOffset(topic, group, 0, 100L);
+ consumerOffsetManager.assignResetOffset(topic, group, 1, 200L);
+
+ Assert.assertTrue(consumerOffsetManager.hasOffsetReset(topic, group,
0));
+ Assert.assertTrue(consumerOffsetManager.hasOffsetReset(topic, group,
1));
+
+ consumerOffsetManager.eraseResetOffset(topic, group, 0);
+ Assert.assertFalse(consumerOffsetManager.hasOffsetReset(topic, group,
0));
+ Assert.assertTrue(consumerOffsetManager.hasOffsetReset(topic, group,
1));
+
Assert.assertTrue(consumerOffsetManager.resetOffsetTable.containsKey(key));
+
+ consumerOffsetManager.eraseResetOffset(topic, group, 1);
+ Assert.assertFalse(consumerOffsetManager.hasOffsetReset(topic, group,
1));
+
Assert.assertFalse(consumerOffsetManager.resetOffsetTable.containsKey(key));
+ }
}
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java
index 9705ab4f5a..957386b166 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/processor/PopLiteMessageProcessorTest.java
@@ -56,6 +56,7 @@ import java.util.Iterator;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@@ -149,6 +150,16 @@ public class PopLiteMessageProcessorTest {
verify(consumerOrderInfoManager).checkBlock("attemptId", "lmqName",
"group", 0, 1000L);
}
+ @Test
+ public void testIsFifoBlocked_hasResetOffset() {
+ brokerConfig.setUseServerSideResetOffset(true);
+ when(consumerOffsetManager.hasOffsetReset("lmqName", "group",
0)).thenReturn(true);
+
+ assertFalse(popLiteMessageProcessor.isFifoBlocked("attemptId",
"group", "lmqName", 1000L));
+ verify(consumerOffsetManager).hasOffsetReset("lmqName", "group", 0);
+ verify(consumerOrderInfoManager, never()).checkBlock(anyString(),
anyString(), anyString(), anyInt(), anyLong());
+ }
+
@Test
public void testGetPopOffset_normal() throws ConsumeQueueException {
String group = "group";
diff --git
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
index 84a301bd60..f0326fdb75 100644
---
a/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
+++
b/tools/src/main/java/org/apache/rocketmq/tools/command/offset/ResetOffsetByTimeCommand.java
@@ -140,7 +140,7 @@ public class ResetOffsetByTimeCommand implements SubCommand
{
defaultMQAdminExt.searchOffset(brokerAddr, topic, queueId,
timestamp, 3000);
System.out.printf("reset consumer offset to %d%n",
resetOffset);
- if (resetOffset > 0) {
+ if (resetOffset >= 0) {
defaultMQAdminExt.resetOffsetByQueueId(brokerAddr, group,
topic, queueId, resetOffset);
}
return;