This is an automated email from the ASF dual-hosted git repository.

lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 10c650c487 [ISSUE #10034] Optimizing cq iterator and calculating lag 
(#10056)
10c650c487 is described below

commit 10c650c48724d052b1b5c6406b8c28cbcd86d3bf
Author: lizhimins <[email protected]>
AuthorDate: Mon Feb 2 21:01:46 2026 +0800

    [ISSUE #10034] Optimizing cq iterator and calculating lag (#10056)
---
 .../common/config/AbstractRocksDBStorage.java      |   7 +-
 .../rocketmq/store/config/MessageStoreConfig.java  |  10 ++
 .../rocketmq/store/queue/RocksDBConsumeQueue.java  | 136 ++++++++++++++++-----
 3 files changed, 122 insertions(+), 31 deletions(-)

diff --git 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index d3f41930b9..bc4a18006f 100644
--- 
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++ 
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -337,12 +337,13 @@ public abstract class AbstractRocksDBStorage {
         final byte[] start, final byte[] end, BiConsumer<byte[], byte[]> 
callback) throws RocksDBException {
 
         if (ArrayUtils.isEmpty(prefix) && ArrayUtils.isEmpty(start)) {
-            throw new RocksDBException("To determine lower boundary, prefix 
and start may not be null at the same "
-                + "time.");
+            throw new RocksDBException(
+                "To determine lower boundary, prefix and start may not be null 
at the same time.");
         }
 
         if (ArrayUtils.isEmpty(prefix) && ArrayUtils.isEmpty(end)) {
-            throw new RocksDBException("To determine upper boundary, prefix 
and end may not be null at the same time.");
+            throw new RocksDBException(
+                "To determine upper boundary, prefix and end may not be null 
at the same time.");
         }
 
         if (columnFamilyHandle == null) {
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index ffc261aa17..8be3e51d20 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -132,6 +132,8 @@ public class MessageStoreConfig {
     @ImportantField
     private String storeType = StoreType.DEFAULT.getStoreType();
 
+    private boolean iteratorWhenUseRocksdbConsumeQueue = true;
+
     // ConsumeQueue file size,default is 30W
     private int mappedFileSizeConsumeQueue = 300000 * 
ConsumeQueue.CQ_STORE_UNIT_SIZE;
     // enable consume queue ext
@@ -667,6 +669,14 @@ public class MessageStoreConfig {
         this.storeType = storeType;
     }
 
+    public boolean isIteratorWhenUseRocksdbConsumeQueue() {
+        return iteratorWhenUseRocksdbConsumeQueue;
+    }
+
+    public void setIteratorWhenUseRocksdbConsumeQueue(boolean 
iteratorWhenUseRocksdbConsumeQueue) {
+        this.iteratorWhenUseRocksdbConsumeQueue = 
iteratorWhenUseRocksdbConsumeQueue;
+    }
+
     public int getMappedFileSizeConsumeQueue() {
         int factor = (int) Math.ceil(this.mappedFileSizeConsumeQueue / 
(ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0));
         return (int) (factor * ConsumeQueue.CQ_STORE_UNIT_SIZE);
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
index 0d58d9a693..86b4d3ef8b 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
@@ -17,6 +17,7 @@
 package org.apache.rocketmq.store.queue;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.rocketmq.common.BoundaryType;
 import org.apache.rocketmq.common.Pair;
@@ -226,46 +227,54 @@ public class RocksDBConsumeQueue implements 
ConsumeQueueInterface {
         queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum);
     }
 
+    /**
+     * It is CPU-intensive with many offline group
+     * Optimize by caching their estimated info
+     */
     @Override
     public long estimateMessageCount(long from, long to, MessageFilter filter) 
{
-        // Check from and to offset validity
-        Pair<CqUnit, Long> fromUnit = getCqUnitAndStoreTime(from);
-        if (fromUnit == null) {
-            return -1;
-        }
 
-        if (from >= to) {
-            return -1;
+        Pair<CqUnit, Long> fromUnit = getCqUnitAndStoreTime(from);
+        if (fromUnit == null || from >= to) {
+            return -1L;
         }
 
         if (to > getMaxOffsetInQueue()) {
             to = getMaxOffsetInQueue();
         }
 
-        int maxSampleSize = messageStoreConfig.getMaxConsumeQueueScan();
-        int sampleSize = to - from > maxSampleSize ? maxSampleSize : (int) (to 
- from);
+        int sampleCount = 0;
+        int sampleTotal = Math.min((int) (to - from), 
messageStoreConfig.getMaxConsumeQueueScan());
 
-        int matchThreshold = messageStoreConfig.getSampleCountThreshold();
-        int matchSize = 0;
+        int matchCount = 0;
+        int matchTotal = messageStoreConfig.getSampleCountThreshold();
 
-        for (int i = 0; i < sampleSize; i++) {
-            long index = from + i;
-            Pair<CqUnit, Long> pair = getCqUnitAndStoreTime(index);
-            if (pair == null) {
-                continue;
-            }
-            CqUnit cqUnit = pair.getObject1();
-            if (filter.isMatchedByConsumeQueue(cqUnit.getTagsCode(), 
cqUnit.getCqExtUnit())) {
-                matchSize++;
-                // if matchSize is plenty, early exit estimate
-                if (matchSize > matchThreshold) {
-                    sampleSize = i;
-                    break;
+        try {
+            ReferredIterator<CqUnit> iterator = this.iterateFrom(from, 
matchTotal);
+            while (iterator != null && iterator.hasNext() && sampleCount++ < 
sampleTotal) {
+                CqUnit cqUnit = iterator.next();
+                if (filter.isMatchedByConsumeQueue(
+                    cqUnit.getTagsCode(), cqUnit.getCqExtUnit())) {
+                    if (++matchCount > matchTotal) {
+                        sampleTotal = sampleCount;
+                        break;
+                    }
                 }
             }
+        } catch (Throwable t) {
+            log.error("EstimateLag error, from={}, to={}", from, to, t);
+        }
+
+        long result = sampleTotal == 0 ? 0 :
+            (long) ((to - from) * (matchCount / (sampleTotal * 1.0)));
+
+        if (log.isTraceEnabled()) {
+            log.trace("EstimateLag, topic={}, queueId={}, offset={}-{}, 
total={}, hit rate={}/{}({}%), result={}",
+                topic, queueId, from, to, to - from,
+                matchCount, sampleCount, String.format("%.1f", (double) 
matchCount * 100.0 / sampleCount), result);
         }
-        // Make sure the second half is a floating point number, otherwise it 
will be truncated to 0
-        return sampleSize == 0 ? 0 : (long) ((to - from) * (matchSize / 
(sampleSize * 1.0)));
+
+        return result;
     }
 
 
@@ -302,7 +311,7 @@ public class RocksDBConsumeQueue implements 
ConsumeQueueInterface {
         long maxCqOffset = getMaxOffsetInQueue();
         if (startIndex < maxCqOffset) {
             int num = Math.min((int)(maxCqOffset - startIndex), count);
-            return iterateFrom0(startIndex, num);
+            return iterateFrom0(startIndex, num, maxCqOffset);
         }
         return null;
     }
@@ -365,7 +374,13 @@ public class RocksDBConsumeQueue implements 
ConsumeQueueInterface {
         return getMaxPhysicOffset();
     }
 
-    private ReferredIterator<CqUnit> iterateFrom0(final long startIndex, final 
int count) throws RocksDBException {
+    private ReferredIterator<CqUnit> iterateFrom0(
+        final long startIndex, final int count, final long maxOffset) throws 
RocksDBException {
+
+        if (messageStoreConfig.isIteratorWhenUseRocksdbConsumeQueue()) {
+            return new RocksDBReusableIterator(topic, queueId, startIndex, 
count, maxOffset);
+        }
+
         List<ByteBuffer> byteBufferList = 
this.consumeQueueStore.rangeQuery(topic, queueId, startIndex, count);
         if (byteBufferList == null || byteBufferList.isEmpty()) {
             if (this.messageStoreConfig.isEnableRocksDBLog()) {
@@ -386,6 +401,71 @@ public class RocksDBConsumeQueue implements 
ConsumeQueueInterface {
         return queueId;
     }
 
+    private class RocksDBReusableIterator implements ReferredIterator<CqUnit> {
+
+        private final String topic;
+        private final int queueId;
+        private long offset;
+        private final int count;
+        private final long maxOffset;
+
+        private int bufferIndex;
+        private List<ByteBuffer> buffers;
+
+        // offset + count <= max offset
+        public RocksDBReusableIterator(String topic, int queueId, long offset, 
int count, long maxOffset) {
+            this.topic = topic;
+            this.queueId = queueId;
+            this.offset = offset;
+            this.count = count;
+            this.maxOffset = maxOffset;
+
+            this.bufferIndex = 0;
+            this.buffers = new ArrayList<>(count);
+        }
+
+        @Override
+        public void release() {
+        }
+
+        @Override
+        public CqUnit nextAndRelease() {
+            try {
+                return next();
+            } finally {
+                release();
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return offset < maxOffset;
+        }
+
+        @Override
+        public CqUnit next() {
+            try {
+                if (buffers.isEmpty() || bufferIndex >= buffers.size()) {
+                    int batchSize = (int) Math.min(count, maxOffset - offset);
+                    if (batchSize == 0) {
+                        return null;
+                    } else {
+                        bufferIndex = 0;
+                        buffers = consumeQueueStore.rangeQuery(topic, queueId, 
offset, batchSize);
+                    }
+                }
+                if (bufferIndex < buffers.size()) {
+                    ByteBuffer buffer = buffers.get(bufferIndex++);
+                    return new CqUnit(offset++, buffer.getLong(), 
buffer.getInt(), buffer.getLong());
+                }
+            } catch (Throwable t) {
+                log.error("RocksDB reusable iterator search error, " +
+                    "topic={}, queueId={}, offset={}, count={}", topic, 
queueId, offset, count, maxOffset, t);
+            }
+            return null;
+        }
+    }
+
     private class RocksDBConsumeQueueIterator implements 
ReferredIterator<CqUnit> {
         private final List<ByteBuffer> byteBufferList;
         private final long startIndex;

Reply via email to