This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 10c650c487 [ISSUE #10034] Optimizing cq iterator and calculating lag
(#10056)
10c650c487 is described below
commit 10c650c48724d052b1b5c6406b8c28cbcd86d3bf
Author: lizhimins <[email protected]>
AuthorDate: Mon Feb 2 21:01:46 2026 +0800
[ISSUE #10034] Optimizing cq iterator and calculating lag (#10056)
---
.../common/config/AbstractRocksDBStorage.java | 7 +-
.../rocketmq/store/config/MessageStoreConfig.java | 10 ++
.../rocketmq/store/queue/RocksDBConsumeQueue.java | 136 ++++++++++++++++-----
3 files changed, 122 insertions(+), 31 deletions(-)
diff --git
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
index d3f41930b9..bc4a18006f 100644
---
a/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
+++
b/common/src/main/java/org/apache/rocketmq/common/config/AbstractRocksDBStorage.java
@@ -337,12 +337,13 @@ public abstract class AbstractRocksDBStorage {
final byte[] start, final byte[] end, BiConsumer<byte[], byte[]>
callback) throws RocksDBException {
if (ArrayUtils.isEmpty(prefix) && ArrayUtils.isEmpty(start)) {
- throw new RocksDBException("To determine lower boundary, prefix
and start may not be null at the same "
- + "time.");
+ throw new RocksDBException(
+ "To determine lower boundary, prefix and start may not be null
at the same time.");
}
if (ArrayUtils.isEmpty(prefix) && ArrayUtils.isEmpty(end)) {
- throw new RocksDBException("To determine upper boundary, prefix
and end may not be null at the same time.");
+ throw new RocksDBException(
+ "To determine upper boundary, prefix and end may not be null
at the same time.");
}
if (columnFamilyHandle == null) {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index ffc261aa17..8be3e51d20 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -132,6 +132,8 @@ public class MessageStoreConfig {
@ImportantField
private String storeType = StoreType.DEFAULT.getStoreType();
+ private boolean iteratorWhenUseRocksdbConsumeQueue = true;
+
// ConsumeQueue file size,default is 30W
private int mappedFileSizeConsumeQueue = 300000 *
ConsumeQueue.CQ_STORE_UNIT_SIZE;
// enable consume queue ext
@@ -667,6 +669,14 @@ public class MessageStoreConfig {
this.storeType = storeType;
}
+ public boolean isIteratorWhenUseRocksdbConsumeQueue() {
+ return iteratorWhenUseRocksdbConsumeQueue;
+ }
+
+ public void setIteratorWhenUseRocksdbConsumeQueue(boolean
iteratorWhenUseRocksdbConsumeQueue) {
+ this.iteratorWhenUseRocksdbConsumeQueue =
iteratorWhenUseRocksdbConsumeQueue;
+ }
+
public int getMappedFileSizeConsumeQueue() {
int factor = (int) Math.ceil(this.mappedFileSizeConsumeQueue /
(ConsumeQueue.CQ_STORE_UNIT_SIZE * 1.0));
return (int) (factor * ConsumeQueue.CQ_STORE_UNIT_SIZE);
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
index 0d58d9a693..86b4d3ef8b 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueue.java
@@ -17,6 +17,7 @@
package org.apache.rocketmq.store.queue;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.common.BoundaryType;
import org.apache.rocketmq.common.Pair;
@@ -226,46 +227,54 @@ public class RocksDBConsumeQueue implements
ConsumeQueueInterface {
queueOffsetOperator.increaseQueueOffset(topicQueueKey, messageNum);
}
+ /**
+ * It is CPU-intensive with many offline group
+ * Optimize by caching their estimated info
+ */
@Override
public long estimateMessageCount(long from, long to, MessageFilter filter)
{
- // Check from and to offset validity
- Pair<CqUnit, Long> fromUnit = getCqUnitAndStoreTime(from);
- if (fromUnit == null) {
- return -1;
- }
- if (from >= to) {
- return -1;
+ Pair<CqUnit, Long> fromUnit = getCqUnitAndStoreTime(from);
+ if (fromUnit == null || from >= to) {
+ return -1L;
}
if (to > getMaxOffsetInQueue()) {
to = getMaxOffsetInQueue();
}
- int maxSampleSize = messageStoreConfig.getMaxConsumeQueueScan();
- int sampleSize = to - from > maxSampleSize ? maxSampleSize : (int) (to
- from);
+ int sampleCount = 0;
+ int sampleTotal = Math.min((int) (to - from),
messageStoreConfig.getMaxConsumeQueueScan());
- int matchThreshold = messageStoreConfig.getSampleCountThreshold();
- int matchSize = 0;
+ int matchCount = 0;
+ int matchTotal = messageStoreConfig.getSampleCountThreshold();
- for (int i = 0; i < sampleSize; i++) {
- long index = from + i;
- Pair<CqUnit, Long> pair = getCqUnitAndStoreTime(index);
- if (pair == null) {
- continue;
- }
- CqUnit cqUnit = pair.getObject1();
- if (filter.isMatchedByConsumeQueue(cqUnit.getTagsCode(),
cqUnit.getCqExtUnit())) {
- matchSize++;
- // if matchSize is plenty, early exit estimate
- if (matchSize > matchThreshold) {
- sampleSize = i;
- break;
+ try {
+ ReferredIterator<CqUnit> iterator = this.iterateFrom(from,
matchTotal);
+ while (iterator != null && iterator.hasNext() && sampleCount++ <
sampleTotal) {
+ CqUnit cqUnit = iterator.next();
+ if (filter.isMatchedByConsumeQueue(
+ cqUnit.getTagsCode(), cqUnit.getCqExtUnit())) {
+ if (++matchCount > matchTotal) {
+ sampleTotal = sampleCount;
+ break;
+ }
}
}
+ } catch (Throwable t) {
+ log.error("EstimateLag error, from={}, to={}", from, to, t);
+ }
+
+ long result = sampleTotal == 0 ? 0 :
+ (long) ((to - from) * (matchCount / (sampleTotal * 1.0)));
+
+ if (log.isTraceEnabled()) {
+ log.trace("EstimateLag, topic={}, queueId={}, offset={}-{},
total={}, hit rate={}/{}({}%), result={}",
+ topic, queueId, from, to, to - from,
+ matchCount, sampleCount, String.format("%.1f", (double)
matchCount * 100.0 / sampleCount), result);
}
- // Make sure the second half is a floating point number, otherwise it
will be truncated to 0
- return sampleSize == 0 ? 0 : (long) ((to - from) * (matchSize /
(sampleSize * 1.0)));
+
+ return result;
}
@@ -302,7 +311,7 @@ public class RocksDBConsumeQueue implements
ConsumeQueueInterface {
long maxCqOffset = getMaxOffsetInQueue();
if (startIndex < maxCqOffset) {
int num = Math.min((int)(maxCqOffset - startIndex), count);
- return iterateFrom0(startIndex, num);
+ return iterateFrom0(startIndex, num, maxCqOffset);
}
return null;
}
@@ -365,7 +374,13 @@ public class RocksDBConsumeQueue implements
ConsumeQueueInterface {
return getMaxPhysicOffset();
}
- private ReferredIterator<CqUnit> iterateFrom0(final long startIndex, final
int count) throws RocksDBException {
+ private ReferredIterator<CqUnit> iterateFrom0(
+ final long startIndex, final int count, final long maxOffset) throws
RocksDBException {
+
+ if (messageStoreConfig.isIteratorWhenUseRocksdbConsumeQueue()) {
+ return new RocksDBReusableIterator(topic, queueId, startIndex,
count, maxOffset);
+ }
+
List<ByteBuffer> byteBufferList =
this.consumeQueueStore.rangeQuery(topic, queueId, startIndex, count);
if (byteBufferList == null || byteBufferList.isEmpty()) {
if (this.messageStoreConfig.isEnableRocksDBLog()) {
@@ -386,6 +401,71 @@ public class RocksDBConsumeQueue implements
ConsumeQueueInterface {
return queueId;
}
+ private class RocksDBReusableIterator implements ReferredIterator<CqUnit> {
+
+ private final String topic;
+ private final int queueId;
+ private long offset;
+ private final int count;
+ private final long maxOffset;
+
+ private int bufferIndex;
+ private List<ByteBuffer> buffers;
+
+ // offset + count <= max offset
+ public RocksDBReusableIterator(String topic, int queueId, long offset,
int count, long maxOffset) {
+ this.topic = topic;
+ this.queueId = queueId;
+ this.offset = offset;
+ this.count = count;
+ this.maxOffset = maxOffset;
+
+ this.bufferIndex = 0;
+ this.buffers = new ArrayList<>(count);
+ }
+
+ @Override
+ public void release() {
+ }
+
+ @Override
+ public CqUnit nextAndRelease() {
+ try {
+ return next();
+ } finally {
+ release();
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return offset < maxOffset;
+ }
+
+ @Override
+ public CqUnit next() {
+ try {
+ if (buffers.isEmpty() || bufferIndex >= buffers.size()) {
+ int batchSize = (int) Math.min(count, maxOffset - offset);
+ if (batchSize == 0) {
+ return null;
+ } else {
+ bufferIndex = 0;
+ buffers = consumeQueueStore.rangeQuery(topic, queueId,
offset, batchSize);
+ }
+ }
+ if (bufferIndex < buffers.size()) {
+ ByteBuffer buffer = buffers.get(bufferIndex++);
+ return new CqUnit(offset++, buffer.getLong(),
buffer.getInt(), buffer.getLong());
+ }
+ } catch (Throwable t) {
+ log.error("RocksDB reusable iterator search error, " +
+ "topic={}, queueId={}, offset={}, count={}", topic,
queueId, offset, count, maxOffset, t);
+ }
+ return null;
+ }
+ }
+
private class RocksDBConsumeQueueIterator implements
ReferredIterator<CqUnit> {
private final List<ByteBuffer> byteBufferList;
private final long startIndex;