This is an automated email from the ASF dual-hosted git repository.
lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new e6a587a82a [ISSUE #9626] Prevent premature offset commit before
consumer record flush (#9627)
e6a587a82a is described below
commit e6a587a82aee46908be9e2f45d6d7b6d96373d25
Author: lizhimins <[email protected]>
AuthorDate: Fri Aug 22 14:43:50 2025 +0800
[ISSUE #9626] Prevent premature offset commit before consumer record flush
(#9627)
* [ISSUE #9626] Prevent premature offset commit before consumer record flush
---
.../rocketmq/broker/pop/PopConsumerCache.java | 107 +++++++++------------
.../rocketmq/broker/pop/PopConsumerCacheTest.java | 10 +-
2 files changed, 54 insertions(+), 63 deletions(-)
diff --git
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerCache.java
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerCache.java
index e7ce68e019..7f51817167 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerCache.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerCache.java
@@ -20,13 +20,11 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
@@ -128,33 +126,33 @@ public class PopConsumerCache extends ServiceThread {
records.getGroupId(), records.getTopicId());
if (timeout) {
- List<PopConsumerRecord> removeExpiredRecords =
- records.removeExpiredRecords(Long.MAX_VALUE);
- if (removeExpiredRecords != null) {
- consumerRecordStore.writeRecords(removeExpiredRecords);
+ records.stageExpiredRecords(Long.MAX_VALUE);
+ List<PopConsumerRecord> writeConsumerRecords =
+ new ArrayList<>(records.getRemoveTreeMap().values());
+ if (!writeConsumerRecords.isEmpty()) {
+ consumerRecordStore.writeRecords(writeConsumerRecords);
}
+ records.clearStagedRecords();
log.info("PopConsumerOffline, so clean expire records,
groupId={}, topic={}, queueId={}, records={}",
- records.getGroupId(), records.getTopicId(),
records.getQueueId(),
- removeExpiredRecords != null ? removeExpiredRecords.size()
: 0);
+ records.getGroupId(), records.getTopicId(),
records.getQueueId(), records.getInFlightRecordCount());
iterator.remove();
continue;
}
long currentTime = System.currentTimeMillis();
+ records.stageExpiredRecords(currentTime);
List<PopConsumerRecord> writeConsumerRecords = new ArrayList<>();
- List<PopConsumerRecord> consumerRecords =
records.removeExpiredRecords(currentTime);
- if (consumerRecords != null) {
- consumerRecords.forEach(consumerRecord -> {
- if (consumerRecord.getVisibilityTimeout() <= currentTime) {
- consumer.accept(consumerRecord);
- } else {
- writeConsumerRecords.add(consumerRecord);
- }
- });
- }
+ records.getRemoveTreeMap().values().forEach(record -> {
+ if (record.getVisibilityTimeout() <= currentTime) {
+ consumer.accept(record);
+ } else {
+ writeConsumerRecords.add(record);
+ }
+ });
// write to store and handle it later
consumerRecordStore.writeRecords(writeConsumerRecords);
+ records.clearStagedRecords();
// commit min offset in buffer to offset store
long offset = records.getMinOffsetInBuffer();
@@ -209,72 +207,64 @@ public class PopConsumerCache extends ServiceThread {
protected static class ConsumerRecords {
- private final Lock lock;
private final String groupId;
private final String topicId;
private final int queueId;
private final BrokerConfig brokerConfig;
- private final TreeMap<Long /* offset */, PopConsumerRecord>
recordTreeMap;
+ private final ConcurrentSkipListMap<Long /* offset */,
PopConsumerRecord> removeTreeMap;
+ private final ConcurrentSkipListMap<Long /* offset */,
PopConsumerRecord> recordTreeMap;
public ConsumerRecords(BrokerConfig brokerConfig, String groupId,
String topicId, int queueId) {
this.groupId = groupId;
this.topicId = topicId;
this.queueId = queueId;
- this.lock = new ReentrantLock();
this.brokerConfig = brokerConfig;
- this.recordTreeMap = new TreeMap<>();
+ this.removeTreeMap = new ConcurrentSkipListMap<>();
+ this.recordTreeMap = new ConcurrentSkipListMap<>();
}
public void write(PopConsumerRecord record) {
- lock.lock();
- try {
- recordTreeMap.put(record.getOffset(), record);
- } finally {
- lock.unlock();
- }
+ recordTreeMap.put(record.getOffset(), record);
}
public boolean delete(PopConsumerRecord record) {
- PopConsumerRecord popConsumerRecord;
- lock.lock();
- try {
- popConsumerRecord = recordTreeMap.remove(record.getOffset());
- } finally {
- lock.unlock();
- }
- return popConsumerRecord != null;
+ return recordTreeMap.remove(record.getOffset()) != null;
}
public long getMinOffsetInBuffer() {
- Map.Entry<Long, PopConsumerRecord> entry =
recordTreeMap.firstEntry();
+ Map.Entry<Long, PopConsumerRecord> entry =
removeTreeMap.firstEntry();
+ if (entry != null) {
+ return entry.getKey();
+ }
+ entry = recordTreeMap.firstEntry();
return entry != null ? entry.getKey() : OFFSET_NOT_EXIST;
}
public int getInFlightRecordCount() {
- return recordTreeMap.size();
+ return removeTreeMap.size() + recordTreeMap.size();
}
- public List<PopConsumerRecord> removeExpiredRecords(long currentTime) {
- List<PopConsumerRecord> result = null;
- lock.lock();
- try {
- Iterator<Map.Entry<Long, PopConsumerRecord>> iterator =
recordTreeMap.entrySet().iterator();
- while (iterator.hasNext()) {
- Map.Entry<Long, PopConsumerRecord> entry = iterator.next();
- //
org.apache.rocketmq.broker.processor.PopBufferMergeService.scan
- if (entry.getValue().getVisibilityTimeout() <= currentTime
||
- entry.getValue().getPopTime() +
brokerConfig.getPopCkStayBufferTime() <= currentTime) {
- if (result == null) {
- result = new ArrayList<>();
- }
- result.add(entry.getValue());
- iterator.remove();
- }
+ public void stageExpiredRecords(long currentTime) {
+ Iterator<Map.Entry<Long, PopConsumerRecord>>
+ iterator = recordTreeMap.entrySet().iterator();
+
+ // refer:
org.apache.rocketmq.broker.processor.PopBufferMergeService.scan
+ while (iterator.hasNext()) {
+ Map.Entry<Long, PopConsumerRecord> entry = iterator.next();
+ if (entry.getValue().getVisibilityTimeout() <= currentTime ||
+ entry.getValue().getPopTime() +
brokerConfig.getPopCkStayBufferTime() <= currentTime) {
+ removeTreeMap.put(entry.getKey(), entry.getValue());
+ iterator.remove();
}
- } finally {
- lock.unlock();
}
- return result;
+ }
+
+ public void clearStagedRecords() {
+ removeTreeMap.clear();
+ }
+
+ public ConcurrentSkipListMap<Long, PopConsumerRecord>
getRemoveTreeMap() {
+ return removeTreeMap;
}
public String getGroupId() {
@@ -292,7 +282,6 @@ public class PopConsumerCache extends ServiceThread {
@Override
public String toString() {
return "ConsumerRecords{" +
- "lock=" + lock +
", topicId=" + topicId +
", groupId=" + groupId +
", queueId=" + queueId +
diff --git
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java
index 3f6e893a52..28045ca26e 100644
---
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java
+++
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java
@@ -61,10 +61,12 @@ public class PopConsumerCacheTest {
Assert.assertEquals(3, consumerRecords.getInFlightRecordCount());
long bufferTimeout = brokerConfig.getPopCkStayBufferTime();
- Assert.assertEquals(1,
consumerRecords.removeExpiredRecords(bufferTimeout + 2).size());
- Assert.assertNull(consumerRecords.removeExpiredRecords(bufferTimeout +
2));
- Assert.assertEquals(2,
consumerRecords.removeExpiredRecords(bufferTimeout + 4).size());
- Assert.assertNull(consumerRecords.removeExpiredRecords(bufferTimeout +
4));
+ consumerRecords.stageExpiredRecords(bufferTimeout + 2);
+ Assert.assertEquals(1, consumerRecords.getRemoveTreeMap().size());
+ consumerRecords.clearStagedRecords();
+ consumerRecords.stageExpiredRecords(bufferTimeout + 4);
+ Assert.assertEquals(2, consumerRecords.getRemoveTreeMap().size());
+ consumerRecords.clearStagedRecords();
}
@Test