This is an automated email from the ASF dual-hosted git repository.
lollipopjin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new f941dced39 [ISSUE #10521] Use madvise(MADV_RANDOM) to disable kernel
read-ahead during correctMinOffset binary search (#10523)
f941dced39 is described below
commit f941dced39e42f4d43a5a53289f82e999fc71410
Author: lizhimins <[email protected]>
AuthorDate: Wed Jun 17 19:43:16 2026 +0800
[ISSUE #10521] Use madvise(MADV_RANDOM) to disable kernel read-ahead during
correctMinOffset binary search (#10523)
---
.../org/apache/rocketmq/store/ConsumeQueue.java | 36 +++++
.../rocketmq/store/config/MessageStoreConfig.java | 11 ++
.../apache/rocketmq/store/ConsumeQueueTest.java | 157 ++++++++++++++++++++-
3 files changed, 203 insertions(+), 1 deletion(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index d1a36c9e13..5cba37b8ce 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -16,6 +16,9 @@
*/
package org.apache.rocketmq.store;
+import com.sun.jna.NativeLong;
+import com.sun.jna.Pointer;
+import io.netty.util.internal.PlatformDependent;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.Collections;
@@ -40,9 +43,11 @@ import org.apache.rocketmq.store.queue.CqUnit;
import org.apache.rocketmq.store.queue.MultiDispatchUtils;
import org.apache.rocketmq.store.queue.QueueOffsetOperator;
import org.apache.rocketmq.store.queue.ReferredIterator;
+import org.apache.rocketmq.store.util.LibC;
public class ConsumeQueue implements ConsumeQueueInterface {
private static final Logger log =
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+ private static final boolean IS_LINUX = !MixAll.isWindows() &&
!MixAll.isMac();
/**
* ConsumeQueue's store unit. Format:
@@ -611,6 +616,29 @@ public class ConsumeQueue implements ConsumeQueueInterface
{
return;
}
+ // Disable kernel read-ahead for the binary search below.
+ //
+ // correctMinOffset performs binary search on mmap'd ConsumeQueue
files, which is a
+ // random access pattern. The kernel's default read-ahead window
is aggressively large
+ // on NVMe devices, so each page fault pulls in far more data than
needed, producing
+ // periodic disk read pulses. On cloud disks where read/write
bandwidth share a single
+ // quota, these pulses squeeze CommitLog writes and cause send-RT
spikes.
+ //
+ // madvise(MADV_RANDOM) tells the kernel to skip read-ahead for
this VMA; after the
+ // search we restore MADV_NORMAL in the finally block so
sequential consumers are
+ // unaffected. Controlled by correctMinOffsetMadviseEnable
(default: off).
+ // Skipped on Windows where madvise is not available.
+ Pointer pointer = null;
+ if (IS_LINUX &&
messageStore.getMessageStoreConfig().isCorrectMinOffsetMadviseEnable()) {
+ long address =
PlatformDependent.directBufferAddress(mappedFile.getMappedByteBuffer());
+ pointer = new Pointer(address);
+ int ret = LibC.INSTANCE.madvise(pointer, new
NativeLong(mappedFile.getFileSize()), LibC.MADV_RANDOM);
+ if (ret != 0) {
+ log.warn("Failed to set MADV_RANDOM for
ConsumeQueue[topic={}, queueId={}] file: {}, ret={}",
+ topic, queueId, mappedFile.getFileName(), ret);
+ }
+ }
+
try {
// No valid consume entries
if (result.getSize() == 0) {
@@ -670,6 +698,14 @@ public class ConsumeQueue implements ConsumeQueueInterface
{
} catch (Exception e) {
log.error("Exception thrown when correctMinOffset", e);
} finally {
+ // Restore MADV_NORMAL to allow normal readahead for
sequential access
+ if (IS_LINUX && pointer != null) {
+ int ret = LibC.INSTANCE.madvise(pointer, new
NativeLong(mappedFile.getFileSize()), LibC.MADV_NORMAL);
+ if (ret != 0) {
+ log.warn("Failed to restore MADV_NORMAL for
ConsumeQueue[topic={}, queueId={}] file: {}, ret={}",
+ topic, queueId, mappedFile.getFileName(), ret);
+ }
+ }
result.release();
}
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 8cb3b1c908..b6a6b6b334 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -333,6 +333,9 @@ public class MessageStoreConfig {
private int correctLogicMinOffsetSleepInterval = 1;
// Force correct min offset interval
private int correctLogicMinOffsetForceInterval = 5 * 60 * 1000;
+ // Enable madvise(MADV_RANDOM) optimization for correctMinOffset binary
search
+ private boolean correctMinOffsetMadviseEnable = false;
+
// swap
private boolean mappedFileSwapEnable = true;
private long commitLogForceSwapMapInterval = 12L * 60 * 60 * 1000;
@@ -842,6 +845,14 @@ public class MessageStoreConfig {
this.correctLogicMinOffsetForceInterval =
correctLogicMinOffsetForceInterval;
}
+ public boolean isCorrectMinOffsetMadviseEnable() {
+ return correctMinOffsetMadviseEnable;
+ }
+
+ public void setCorrectMinOffsetMadviseEnable(boolean
correctMinOffsetMadviseEnable) {
+ this.correctMinOffsetMadviseEnable = correctMinOffsetMadviseEnable;
+ }
+
public boolean isCheckCRCOnRecover() {
return checkCRCOnRecover;
}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
index 00fbe60a3c..3ad66b3cef 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@ -297,7 +297,7 @@ public class ConsumeQueueTest {
@Test
public void testPutMessagePositionInfoWrapper_MultiQueue() throws
Exception {
- Assume.assumeFalse(MixAll.isWindows());
+ Assume.assumeTrue(!MixAll.isWindows() && !MixAll.isMac());
DefaultMessageStore messageStore = null;
try {
messageStore = genForMultiQueue();
@@ -558,4 +558,159 @@ public class ConsumeQueueTest {
storeConfig.getMappedFileSizeConsumeQueue(), messageStore);
consumeQueue0.destroy();
}
+
+ @Test
+ public void correctMinOffsetWithReadAheadOptimizationTest() throws
IOException {
+ Assume.assumeTrue(!MixAll.isWindows() && !MixAll.isMac());
+ String topic = "ReadAheadOptimizationTestTopic";
+ int queueId = 0;
+ MessageStoreConfig storeConfig = new MessageStoreConfig();
+ File tmpDir = new File(System.getProperty("java.io.tmpdir"),
"correctMinOffsetWithReadAheadOptimizationTest");
+ FileUtils.deleteDirectory(tmpDir);
+ storeConfig.setStorePathRootDir(tmpDir.getAbsolutePath());
+ storeConfig.setEnableConsumeQueueExt(false);
+ storeConfig.setCorrectMinOffsetMadviseEnable(true);
+ DefaultMessageStore messageStore =
Mockito.mock(DefaultMessageStore.class);
+
Mockito.when(messageStore.getMessageStoreConfig()).thenReturn(storeConfig);
+
+ RunningFlags runningFlags = new RunningFlags();
+ Mockito.when(messageStore.getRunningFlags()).thenReturn(runningFlags);
+
+ StoreCheckpoint storeCheckpoint = Mockito.mock(StoreCheckpoint.class);
+
Mockito.when(messageStore.getStoreCheckpoint()).thenReturn(storeCheckpoint);
+
+ ConsumeQueue consumeQueue = new ConsumeQueue(topic, queueId,
storeConfig.getStorePathRootDir(),
+ storeConfig.getMappedFileSizeConsumeQueue(), messageStore);
+
+ // Write 5000 messages to create a substantial CQ file for binary
search
+ int messageCount = 5000;
+ int messageSize = 100;
+ for (int i = 0; i < messageCount; i++) {
+ DispatchRequest dispatchRequest = new DispatchRequest(
+ topic, queueId, messageSize * i, messageSize, 0, 0, i, null,
null, 0, 0, null);
+ consumeQueue.putMessagePositionInfoWrapper(dispatchRequest);
+ }
+
+ // Test 1: correctMinOffset should work correctly with madvise
optimization
+ // Set min offset to 0 and correct with minCommitLogOffset = 0
+ consumeQueue.setMinLogicOffset(0L);
+ consumeQueue.correctMinOffset(0L);
+ Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
+
+ // Test 2: Correct with a mid-range offset to trigger binary search
+ // This will exercise the madvise(MADV_RANDOM) -> binary search ->
madvise(MADV_NORMAL) path
+ int targetOffset = 2500;
+ consumeQueue.setMinLogicOffset(0L);
+ consumeQueue.correctMinOffset(targetOffset * messageSize);
+ Assert.assertEquals(targetOffset, consumeQueue.getMinOffsetInQueue());
+
+ // Test 3: Correct with a high offset near the end
+ int highOffset = 4500;
+ consumeQueue.setMinLogicOffset(0L);
+ consumeQueue.correctMinOffset(highOffset * messageSize);
+ Assert.assertEquals(highOffset, consumeQueue.getMinOffsetInQueue());
+
+ // Test 4: Correct with exact match offset
+ int exactOffset = 1234;
+ consumeQueue.setMinLogicOffset(0L);
+ consumeQueue.correctMinOffset(exactOffset * messageSize);
+ Assert.assertEquals(exactOffset, consumeQueue.getMinOffsetInQueue());
+
+ // Test 5: Correct with offset beyond all messages (should point to
end)
+ consumeQueue.setMinLogicOffset(0L);
+ consumeQueue.correctMinOffset(messageCount * messageSize);
+ Assert.assertEquals(messageCount * ConsumeQueue.CQ_STORE_UNIT_SIZE,
consumeQueue.getMinLogicOffset());
+
+ // Test 6: Multiple sequential corrections to verify madvise restore
works properly
+ for (int i = 0; i < 5; i++) {
+ int testOffset = 1000 * (i + 1);
+ consumeQueue.setMinLogicOffset(0L);
+ consumeQueue.correctMinOffset(testOffset * messageSize);
+ Assert.assertEquals(testOffset,
consumeQueue.getMinOffsetInQueue());
+ }
+
+ consumeQueue.destroy();
+ FileUtils.deleteDirectory(tmpDir);
+ }
+
+ @Test
+ public void correctMinOffsetWithSmallDatasetReadAheadOptimizationTest()
throws IOException {
+ Assume.assumeTrue(!MixAll.isWindows() && !MixAll.isMac());
+ String topic = "SmallDatasetTopic";
+ int queueId = 0;
+ MessageStoreConfig storeConfig = new MessageStoreConfig();
+ File tmpDir = new File(System.getProperty("java.io.tmpdir"),
"correctMinOffsetWithSmallDatasetReadAheadOptimizationTest");
+ FileUtils.deleteDirectory(tmpDir);
+ storeConfig.setStorePathRootDir(tmpDir.getAbsolutePath());
+ storeConfig.setEnableConsumeQueueExt(false);
+ storeConfig.setCorrectMinOffsetMadviseEnable(true);
+ DefaultMessageStore messageStore =
Mockito.mock(DefaultMessageStore.class);
+
Mockito.when(messageStore.getMessageStoreConfig()).thenReturn(storeConfig);
+
+ RunningFlags runningFlags = new RunningFlags();
+ Mockito.when(messageStore.getRunningFlags()).thenReturn(runningFlags);
+
+ StoreCheckpoint storeCheckpoint = Mockito.mock(StoreCheckpoint.class);
+
Mockito.when(messageStore.getStoreCheckpoint()).thenReturn(storeCheckpoint);
+
+ ConsumeQueue consumeQueue = new ConsumeQueue(topic, queueId,
storeConfig.getStorePathRootDir(),
+ storeConfig.getMappedFileSizeConsumeQueue(), messageStore);
+
+ // Test with very small dataset (edge case for binary search)
+ int messageCount = 10;
+ int messageSize = 100;
+ for (int i = 0; i < messageCount; i++) {
+ DispatchRequest dispatchRequest = new DispatchRequest(
+ topic, queueId, messageSize * i, messageSize, 0, 0, i, null,
null, 0, 0, null);
+ consumeQueue.putMessagePositionInfoWrapper(dispatchRequest);
+ }
+
+ // Correct with various offsets
+ consumeQueue.setMinLogicOffset(0L);
+ consumeQueue.correctMinOffset(5 * messageSize);
+ Assert.assertEquals(5, consumeQueue.getMinOffsetInQueue());
+
+ consumeQueue.setMinLogicOffset(0L);
+ consumeQueue.correctMinOffset(0L);
+ Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
+
+ consumeQueue.setMinLogicOffset(0L);
+ consumeQueue.correctMinOffset(9 * messageSize);
+ Assert.assertEquals(9, consumeQueue.getMinOffsetInQueue());
+
+ consumeQueue.destroy();
+ FileUtils.deleteDirectory(tmpDir);
+ }
+
+ @Test
+ public void correctMinOffsetWithEmptyQueueReadAheadOptimizationTest()
throws IOException {
+ Assume.assumeTrue(!MixAll.isWindows() && !MixAll.isMac());
+ String topic = "EmptyQueueTopic";
+ int queueId = 0;
+ MessageStoreConfig storeConfig = new MessageStoreConfig();
+ File tmpDir = new File(System.getProperty("java.io.tmpdir"),
"correctMinOffsetWithEmptyQueueReadAheadOptimizationTest");
+ FileUtils.deleteDirectory(tmpDir);
+ storeConfig.setStorePathRootDir(tmpDir.getAbsolutePath());
+ storeConfig.setEnableConsumeQueueExt(false);
+ storeConfig.setCorrectMinOffsetMadviseEnable(true);
+ DefaultMessageStore messageStore =
Mockito.mock(DefaultMessageStore.class);
+
Mockito.when(messageStore.getMessageStoreConfig()).thenReturn(storeConfig);
+
+ RunningFlags runningFlags = new RunningFlags();
+ Mockito.when(messageStore.getRunningFlags()).thenReturn(runningFlags);
+
+ StoreCheckpoint storeCheckpoint = Mockito.mock(StoreCheckpoint.class);
+
Mockito.when(messageStore.getStoreCheckpoint()).thenReturn(storeCheckpoint);
+
+ ConsumeQueue consumeQueue = new ConsumeQueue(topic, queueId,
storeConfig.getStorePathRootDir(),
+ storeConfig.getMappedFileSizeConsumeQueue(), messageStore);
+
+ // Test with empty queue - should handle gracefully without madvise
errors
+ consumeQueue.setMinLogicOffset(0L);
+ consumeQueue.correctMinOffset(0L);
+ Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
+
+ consumeQueue.destroy();
+ FileUtils.deleteDirectory(tmpDir);
+ }
}