This is an automated email from the ASF dual-hosted git repository.

lollipop pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new e6a587a82a [ISSUE #9626] Prevent premature offset commit before 
consumer record flush (#9627)
e6a587a82a is described below

commit e6a587a82aee46908be9e2f45d6d7b6d96373d25
Author: lizhimins <[email protected]>
AuthorDate: Fri Aug 22 14:43:50 2025 +0800

    [ISSUE #9626] Prevent premature offset commit before consumer record flush 
(#9627)
    
    * [ISSUE #9626] Prevent premature offset commit before consumer record flush
---
 .../rocketmq/broker/pop/PopConsumerCache.java      | 107 +++++++++------------
 .../rocketmq/broker/pop/PopConsumerCacheTest.java  |  10 +-
 2 files changed, 54 insertions(+), 63 deletions(-)

diff --git 
a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerCache.java 
b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerCache.java
index e7ce68e019..7f51817167 100644
--- a/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerCache.java
+++ b/broker/src/main/java/org/apache/rocketmq/broker/pop/PopConsumerCache.java
@@ -20,13 +20,11 @@ import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 import org.apache.rocketmq.broker.BrokerController;
 import org.apache.rocketmq.broker.offset.ConsumerOffsetManager;
@@ -128,33 +126,33 @@ public class PopConsumerCache extends ServiceThread {
                 records.getGroupId(), records.getTopicId());
 
             if (timeout) {
-                List<PopConsumerRecord> removeExpiredRecords =
-                    records.removeExpiredRecords(Long.MAX_VALUE);
-                if (removeExpiredRecords != null) {
-                    consumerRecordStore.writeRecords(removeExpiredRecords);
+                records.stageExpiredRecords(Long.MAX_VALUE);
+                List<PopConsumerRecord> writeConsumerRecords =
+                    new ArrayList<>(records.getRemoveTreeMap().values());
+                if (!writeConsumerRecords.isEmpty()) {
+                    consumerRecordStore.writeRecords(writeConsumerRecords);
                 }
+                records.clearStagedRecords();
                 log.info("PopConsumerOffline, so clean expire records, 
groupId={}, topic={}, queueId={}, records={}",
-                    records.getGroupId(), records.getTopicId(), 
records.getQueueId(),
-                    removeExpiredRecords != null ? removeExpiredRecords.size() 
: 0);
+                    records.getGroupId(), records.getTopicId(), 
records.getQueueId(), records.getInFlightRecordCount());
                 iterator.remove();
                 continue;
             }
 
             long currentTime = System.currentTimeMillis();
+            records.stageExpiredRecords(currentTime);
             List<PopConsumerRecord> writeConsumerRecords = new ArrayList<>();
-            List<PopConsumerRecord> consumerRecords = 
records.removeExpiredRecords(currentTime);
-            if (consumerRecords != null) {
-                consumerRecords.forEach(consumerRecord -> {
-                    if (consumerRecord.getVisibilityTimeout() <= currentTime) {
-                        consumer.accept(consumerRecord);
-                    } else {
-                        writeConsumerRecords.add(consumerRecord);
-                    }
-                });
-            }
+            records.getRemoveTreeMap().values().forEach(record -> {
+                if (record.getVisibilityTimeout() <= currentTime) {
+                    consumer.accept(record);
+                } else {
+                    writeConsumerRecords.add(record);
+                }
+            });
 
             // write to store and handle it later
             consumerRecordStore.writeRecords(writeConsumerRecords);
+            records.clearStagedRecords();
 
             // commit min offset in buffer to offset store
             long offset = records.getMinOffsetInBuffer();
@@ -209,72 +207,64 @@ public class PopConsumerCache extends ServiceThread {
 
     protected static class ConsumerRecords {
 
-        private final Lock lock;
         private final String groupId;
         private final String topicId;
         private final int queueId;
         private final BrokerConfig brokerConfig;
-        private final TreeMap<Long /* offset */, PopConsumerRecord> 
recordTreeMap;
+        private final ConcurrentSkipListMap<Long /* offset */, 
PopConsumerRecord> removeTreeMap;
+        private final ConcurrentSkipListMap<Long /* offset */, 
PopConsumerRecord> recordTreeMap;
 
         public ConsumerRecords(BrokerConfig brokerConfig, String groupId, 
String topicId, int queueId) {
             this.groupId = groupId;
             this.topicId = topicId;
             this.queueId = queueId;
-            this.lock = new ReentrantLock();
             this.brokerConfig = brokerConfig;
-            this.recordTreeMap = new TreeMap<>();
+            this.removeTreeMap = new ConcurrentSkipListMap<>();
+            this.recordTreeMap = new ConcurrentSkipListMap<>();
         }
 
         public void write(PopConsumerRecord record) {
-            lock.lock();
-            try {
-                recordTreeMap.put(record.getOffset(), record);
-            } finally {
-                lock.unlock();
-            }
+            recordTreeMap.put(record.getOffset(), record);
         }
 
         public boolean delete(PopConsumerRecord record) {
-            PopConsumerRecord popConsumerRecord;
-            lock.lock();
-            try {
-                popConsumerRecord = recordTreeMap.remove(record.getOffset());
-            } finally {
-                lock.unlock();
-            }
-            return popConsumerRecord != null;
+            return recordTreeMap.remove(record.getOffset()) != null;
         }
 
         public long getMinOffsetInBuffer() {
-            Map.Entry<Long, PopConsumerRecord> entry = 
recordTreeMap.firstEntry();
+            Map.Entry<Long, PopConsumerRecord> entry = 
removeTreeMap.firstEntry();
+            if (entry != null) {
+                return entry.getKey();
+            }
+            entry = recordTreeMap.firstEntry();
             return entry != null ? entry.getKey() : OFFSET_NOT_EXIST;
         }
 
         public int getInFlightRecordCount() {
-            return recordTreeMap.size();
+            return removeTreeMap.size() + recordTreeMap.size();
         }
 
-        public List<PopConsumerRecord> removeExpiredRecords(long currentTime) {
-            List<PopConsumerRecord> result = null;
-            lock.lock();
-            try {
-                Iterator<Map.Entry<Long, PopConsumerRecord>> iterator = 
recordTreeMap.entrySet().iterator();
-                while (iterator.hasNext()) {
-                    Map.Entry<Long, PopConsumerRecord> entry = iterator.next();
-                    // 
org.apache.rocketmq.broker.processor.PopBufferMergeService.scan
-                    if (entry.getValue().getVisibilityTimeout() <= currentTime 
||
-                        entry.getValue().getPopTime() + 
brokerConfig.getPopCkStayBufferTime() <= currentTime) {
-                        if (result == null) {
-                            result = new ArrayList<>();
-                        }
-                        result.add(entry.getValue());
-                        iterator.remove();
-                    }
+        public void stageExpiredRecords(long currentTime) {
+            Iterator<Map.Entry<Long, PopConsumerRecord>>
+                iterator = recordTreeMap.entrySet().iterator();
+
+            // refer: 
org.apache.rocketmq.broker.processor.PopBufferMergeService.scan
+            while (iterator.hasNext()) {
+                Map.Entry<Long, PopConsumerRecord> entry = iterator.next();
+                if (entry.getValue().getVisibilityTimeout() <= currentTime ||
+                    entry.getValue().getPopTime() + 
brokerConfig.getPopCkStayBufferTime() <= currentTime) {
+                    removeTreeMap.put(entry.getKey(), entry.getValue());
+                    iterator.remove();
                 }
-            } finally {
-                lock.unlock();
             }
-            return result;
+        }
+
+        public void clearStagedRecords() {
+            removeTreeMap.clear();
+        }
+
+        public ConcurrentSkipListMap<Long, PopConsumerRecord> 
getRemoveTreeMap() {
+            return removeTreeMap;
         }
 
         public String getGroupId() {
@@ -292,7 +282,6 @@ public class PopConsumerCache extends ServiceThread {
         @Override
         public String toString() {
             return "ConsumerRecords{" +
-                "lock=" + lock +
                 ", topicId=" + topicId +
                 ", groupId=" + groupId +
                 ", queueId=" + queueId +
diff --git 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java
index 3f6e893a52..28045ca26e 100644
--- 
a/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java
+++ 
b/broker/src/test/java/org/apache/rocketmq/broker/pop/PopConsumerCacheTest.java
@@ -61,10 +61,12 @@ public class PopConsumerCacheTest {
         Assert.assertEquals(3, consumerRecords.getInFlightRecordCount());
 
         long bufferTimeout = brokerConfig.getPopCkStayBufferTime();
-        Assert.assertEquals(1, 
consumerRecords.removeExpiredRecords(bufferTimeout + 2).size());
-        Assert.assertNull(consumerRecords.removeExpiredRecords(bufferTimeout + 
2));
-        Assert.assertEquals(2, 
consumerRecords.removeExpiredRecords(bufferTimeout + 4).size());
-        Assert.assertNull(consumerRecords.removeExpiredRecords(bufferTimeout + 
4));
+        consumerRecords.stageExpiredRecords(bufferTimeout + 2);
+        Assert.assertEquals(1, consumerRecords.getRemoveTreeMap().size());
+        consumerRecords.clearStagedRecords();
+        consumerRecords.stageExpiredRecords(bufferTimeout + 4);
+        Assert.assertEquals(2, consumerRecords.getRemoveTreeMap().size());
+        consumerRecords.clearStagedRecords();
     }
 
     @Test

Reply via email to