This is an automated email from the ASF dual-hosted git repository.

lollipopjin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new f941dced39 [ISSUE #10521] Use madvise(MADV_RANDOM) to disable kernel 
read-ahead during correctMinOffset binary search (#10523)
f941dced39 is described below

commit f941dced39e42f4d43a5a53289f82e999fc71410
Author: lizhimins <[email protected]>
AuthorDate: Wed Jun 17 19:43:16 2026 +0800

    [ISSUE #10521] Use madvise(MADV_RANDOM) to disable kernel read-ahead during 
correctMinOffset binary search (#10523)
---
 .../org/apache/rocketmq/store/ConsumeQueue.java    |  36 +++++
 .../rocketmq/store/config/MessageStoreConfig.java  |  11 ++
 .../apache/rocketmq/store/ConsumeQueueTest.java    | 157 ++++++++++++++++++++-
 3 files changed, 203 insertions(+), 1 deletion(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index d1a36c9e13..5cba37b8ce 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -16,6 +16,9 @@
  */
 package org.apache.rocketmq.store;
 
+import com.sun.jna.NativeLong;
+import com.sun.jna.Pointer;
+import io.netty.util.internal.PlatformDependent;
 import java.io.File;
 import java.nio.ByteBuffer;
 import java.util.Collections;
@@ -40,9 +43,11 @@ import org.apache.rocketmq.store.queue.CqUnit;
 import org.apache.rocketmq.store.queue.MultiDispatchUtils;
 import org.apache.rocketmq.store.queue.QueueOffsetOperator;
 import org.apache.rocketmq.store.queue.ReferredIterator;
+import org.apache.rocketmq.store.util.LibC;
 
 public class ConsumeQueue implements ConsumeQueueInterface {
     private static final Logger log = 
LoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
+    private static final boolean IS_LINUX = !MixAll.isWindows() && 
!MixAll.isMac();
 
     /**
      * ConsumeQueue's store unit. Format:
@@ -611,6 +616,29 @@ public class ConsumeQueue implements ConsumeQueueInterface 
{
                 return;
             }
 
+            // Disable kernel read-ahead for the binary search below.
+            //
+            // correctMinOffset performs binary search on mmap'd ConsumeQueue 
files, which is a
+            // random access pattern. The kernel's default read-ahead window 
is aggressively large
+            // on NVMe devices, so each page fault pulls in far more data than 
needed, producing
+            // periodic disk read pulses. On cloud disks where read/write 
bandwidth share a single
+            // quota, these pulses squeeze CommitLog writes and cause send-RT 
spikes.
+            //
+            // madvise(MADV_RANDOM) tells the kernel to skip read-ahead for 
this VMA; after the
+            // search we restore MADV_NORMAL in the finally block so 
sequential consumers are
+            // unaffected. Controlled by correctMinOffsetMadviseEnable 
(default: off).
+            // Skipped on Windows where madvise is not available.
+            Pointer pointer = null;
+            if (IS_LINUX && 
messageStore.getMessageStoreConfig().isCorrectMinOffsetMadviseEnable()) {
+                long address = 
PlatformDependent.directBufferAddress(mappedFile.getMappedByteBuffer());
+                pointer = new Pointer(address);
+                int ret = LibC.INSTANCE.madvise(pointer, new 
NativeLong(mappedFile.getFileSize()), LibC.MADV_RANDOM);
+                if (ret != 0) {
+                    log.warn("Failed to set MADV_RANDOM for 
ConsumeQueue[topic={}, queueId={}] file: {}, ret={}",
+                        topic, queueId, mappedFile.getFileName(), ret);
+                }
+            }
+
             try {
                 // No valid consume entries
                 if (result.getSize() == 0) {
@@ -670,6 +698,14 @@ public class ConsumeQueue implements ConsumeQueueInterface 
{
             } catch (Exception e) {
                 log.error("Exception thrown when correctMinOffset", e);
             } finally {
+                // Restore MADV_NORMAL to allow normal readahead for 
sequential access
+                if (IS_LINUX && pointer != null) {
+                    int ret = LibC.INSTANCE.madvise(pointer, new 
NativeLong(mappedFile.getFileSize()), LibC.MADV_NORMAL);
+                    if (ret != 0) {
+                        log.warn("Failed to restore MADV_NORMAL for 
ConsumeQueue[topic={}, queueId={}] file: {}, ret={}",
+                            topic, queueId, mappedFile.getFileName(), ret);
+                    }
+                }
                 result.release();
             }
         }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 8cb3b1c908..b6a6b6b334 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -333,6 +333,9 @@ public class MessageStoreConfig {
     private int correctLogicMinOffsetSleepInterval = 1;
     // Force correct min offset interval
     private int correctLogicMinOffsetForceInterval = 5 * 60 * 1000;
+    // Enable madvise(MADV_RANDOM) optimization for correctMinOffset binary 
search
+    private boolean correctMinOffsetMadviseEnable = false;
+
     // swap
     private boolean mappedFileSwapEnable = true;
     private long commitLogForceSwapMapInterval = 12L * 60 * 60 * 1000;
@@ -842,6 +845,14 @@ public class MessageStoreConfig {
         this.correctLogicMinOffsetForceInterval = 
correctLogicMinOffsetForceInterval;
     }
 
+    public boolean isCorrectMinOffsetMadviseEnable() {
+        return correctMinOffsetMadviseEnable;
+    }
+
+    public void setCorrectMinOffsetMadviseEnable(boolean 
correctMinOffsetMadviseEnable) {
+        this.correctMinOffsetMadviseEnable = correctMinOffsetMadviseEnable;
+    }
+
     public boolean isCheckCRCOnRecover() {
         return checkCRCOnRecover;
     }
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java 
b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
index 00fbe60a3c..3ad66b3cef 100644
--- a/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/ConsumeQueueTest.java
@@ -297,7 +297,7 @@ public class ConsumeQueueTest {
 
     @Test
     public void testPutMessagePositionInfoWrapper_MultiQueue() throws 
Exception {
-        Assume.assumeFalse(MixAll.isWindows());
+        Assume.assumeTrue(!MixAll.isWindows() && !MixAll.isMac());
         DefaultMessageStore messageStore = null;
         try {
             messageStore = genForMultiQueue();
@@ -558,4 +558,159 @@ public class ConsumeQueueTest {
             storeConfig.getMappedFileSizeConsumeQueue(), messageStore);
         consumeQueue0.destroy();
     }
+
+    @Test
+    public void correctMinOffsetWithReadAheadOptimizationTest() throws 
IOException {
+        Assume.assumeTrue(!MixAll.isWindows() && !MixAll.isMac());
+        String topic = "ReadAheadOptimizationTestTopic";
+        int queueId = 0;
+        MessageStoreConfig storeConfig = new MessageStoreConfig();
+        File tmpDir = new File(System.getProperty("java.io.tmpdir"), 
"correctMinOffsetWithReadAheadOptimizationTest");
+        FileUtils.deleteDirectory(tmpDir);
+        storeConfig.setStorePathRootDir(tmpDir.getAbsolutePath());
+        storeConfig.setEnableConsumeQueueExt(false);
+        storeConfig.setCorrectMinOffsetMadviseEnable(true);
+        DefaultMessageStore messageStore = 
Mockito.mock(DefaultMessageStore.class);
+        
Mockito.when(messageStore.getMessageStoreConfig()).thenReturn(storeConfig);
+
+        RunningFlags runningFlags = new RunningFlags();
+        Mockito.when(messageStore.getRunningFlags()).thenReturn(runningFlags);
+
+        StoreCheckpoint storeCheckpoint = Mockito.mock(StoreCheckpoint.class);
+        
Mockito.when(messageStore.getStoreCheckpoint()).thenReturn(storeCheckpoint);
+
+        ConsumeQueue consumeQueue = new ConsumeQueue(topic, queueId, 
storeConfig.getStorePathRootDir(),
+            storeConfig.getMappedFileSizeConsumeQueue(), messageStore);
+
+        // Write 5000 messages to create a substantial CQ file for binary 
search
+        int messageCount = 5000;
+        int messageSize = 100;
+        for (int i = 0; i < messageCount; i++) {
+            DispatchRequest dispatchRequest = new DispatchRequest(
+                topic, queueId, messageSize * i, messageSize, 0, 0, i, null, 
null, 0, 0, null);
+            consumeQueue.putMessagePositionInfoWrapper(dispatchRequest);
+        }
+
+        // Test 1: correctMinOffset should work correctly with madvise 
optimization
+        // Set min offset to 0 and correct with minCommitLogOffset = 0
+        consumeQueue.setMinLogicOffset(0L);
+        consumeQueue.correctMinOffset(0L);
+        Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
+
+        // Test 2: Correct with a mid-range offset to trigger binary search
+        // This will exercise the madvise(MADV_RANDOM) -> binary search -> 
madvise(MADV_NORMAL) path
+        int targetOffset = 2500;
+        consumeQueue.setMinLogicOffset(0L);
+        consumeQueue.correctMinOffset(targetOffset * messageSize);
+        Assert.assertEquals(targetOffset, consumeQueue.getMinOffsetInQueue());
+
+        // Test 3: Correct with a high offset near the end
+        int highOffset = 4500;
+        consumeQueue.setMinLogicOffset(0L);
+        consumeQueue.correctMinOffset(highOffset * messageSize);
+        Assert.assertEquals(highOffset, consumeQueue.getMinOffsetInQueue());
+
+        // Test 4: Correct with exact match offset
+        int exactOffset = 1234;
+        consumeQueue.setMinLogicOffset(0L);
+        consumeQueue.correctMinOffset(exactOffset * messageSize);
+        Assert.assertEquals(exactOffset, consumeQueue.getMinOffsetInQueue());
+
+        // Test 5: Correct with offset beyond all messages (should point to 
end)
+        consumeQueue.setMinLogicOffset(0L);
+        consumeQueue.correctMinOffset(messageCount * messageSize);
+        Assert.assertEquals(messageCount * ConsumeQueue.CQ_STORE_UNIT_SIZE, 
consumeQueue.getMinLogicOffset());
+
+        // Test 6: Multiple sequential corrections to verify madvise restore 
works properly
+        for (int i = 0; i < 5; i++) {
+            int testOffset = 1000 * (i + 1);
+            consumeQueue.setMinLogicOffset(0L);
+            consumeQueue.correctMinOffset(testOffset * messageSize);
+            Assert.assertEquals(testOffset, 
consumeQueue.getMinOffsetInQueue());
+        }
+
+        consumeQueue.destroy();
+        FileUtils.deleteDirectory(tmpDir);
+    }
+
+    @Test
+    public void correctMinOffsetWithSmallDatasetReadAheadOptimizationTest() 
throws IOException {
+        Assume.assumeTrue(!MixAll.isWindows() && !MixAll.isMac());
+        String topic = "SmallDatasetTopic";
+        int queueId = 0;
+        MessageStoreConfig storeConfig = new MessageStoreConfig();
+        File tmpDir = new File(System.getProperty("java.io.tmpdir"), 
"correctMinOffsetWithSmallDatasetReadAheadOptimizationTest");
+        FileUtils.deleteDirectory(tmpDir);
+        storeConfig.setStorePathRootDir(tmpDir.getAbsolutePath());
+        storeConfig.setEnableConsumeQueueExt(false);
+        storeConfig.setCorrectMinOffsetMadviseEnable(true);
+        DefaultMessageStore messageStore = 
Mockito.mock(DefaultMessageStore.class);
+        
Mockito.when(messageStore.getMessageStoreConfig()).thenReturn(storeConfig);
+
+        RunningFlags runningFlags = new RunningFlags();
+        Mockito.when(messageStore.getRunningFlags()).thenReturn(runningFlags);
+
+        StoreCheckpoint storeCheckpoint = Mockito.mock(StoreCheckpoint.class);
+        
Mockito.when(messageStore.getStoreCheckpoint()).thenReturn(storeCheckpoint);
+
+        ConsumeQueue consumeQueue = new ConsumeQueue(topic, queueId, 
storeConfig.getStorePathRootDir(),
+            storeConfig.getMappedFileSizeConsumeQueue(), messageStore);
+
+        // Test with very small dataset (edge case for binary search)
+        int messageCount = 10;
+        int messageSize = 100;
+        for (int i = 0; i < messageCount; i++) {
+            DispatchRequest dispatchRequest = new DispatchRequest(
+                topic, queueId, messageSize * i, messageSize, 0, 0, i, null, 
null, 0, 0, null);
+            consumeQueue.putMessagePositionInfoWrapper(dispatchRequest);
+        }
+
+        // Correct with various offsets
+        consumeQueue.setMinLogicOffset(0L);
+        consumeQueue.correctMinOffset(5 * messageSize);
+        Assert.assertEquals(5, consumeQueue.getMinOffsetInQueue());
+
+        consumeQueue.setMinLogicOffset(0L);
+        consumeQueue.correctMinOffset(0L);
+        Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
+
+        consumeQueue.setMinLogicOffset(0L);
+        consumeQueue.correctMinOffset(9 * messageSize);
+        Assert.assertEquals(9, consumeQueue.getMinOffsetInQueue());
+
+        consumeQueue.destroy();
+        FileUtils.deleteDirectory(tmpDir);
+    }
+
+    @Test
+    public void correctMinOffsetWithEmptyQueueReadAheadOptimizationTest() 
throws IOException {
+        Assume.assumeTrue(!MixAll.isWindows() && !MixAll.isMac());
+        String topic = "EmptyQueueTopic";
+        int queueId = 0;
+        MessageStoreConfig storeConfig = new MessageStoreConfig();
+        File tmpDir = new File(System.getProperty("java.io.tmpdir"), 
"correctMinOffsetWithEmptyQueueReadAheadOptimizationTest");
+        FileUtils.deleteDirectory(tmpDir);
+        storeConfig.setStorePathRootDir(tmpDir.getAbsolutePath());
+        storeConfig.setEnableConsumeQueueExt(false);
+        storeConfig.setCorrectMinOffsetMadviseEnable(true);
+        DefaultMessageStore messageStore = 
Mockito.mock(DefaultMessageStore.class);
+        
Mockito.when(messageStore.getMessageStoreConfig()).thenReturn(storeConfig);
+
+        RunningFlags runningFlags = new RunningFlags();
+        Mockito.when(messageStore.getRunningFlags()).thenReturn(runningFlags);
+
+        StoreCheckpoint storeCheckpoint = Mockito.mock(StoreCheckpoint.class);
+        
Mockito.when(messageStore.getStoreCheckpoint()).thenReturn(storeCheckpoint);
+
+        ConsumeQueue consumeQueue = new ConsumeQueue(topic, queueId, 
storeConfig.getStorePathRootDir(),
+            storeConfig.getMappedFileSizeConsumeQueue(), messageStore);
+
+        // Test with empty queue - should handle gracefully without madvise 
errors
+        consumeQueue.setMinLogicOffset(0L);
+        consumeQueue.correctMinOffset(0L);
+        Assert.assertEquals(0, consumeQueue.getMinOffsetInQueue());
+
+        consumeQueue.destroy();
+        FileUtils.deleteDirectory(tmpDir);
+    }
 }

Reply via email to