This is an automated email from the ASF dual-hosted git repository.

ltamber pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 86e8ed630b [ISSUE #9666] Add accelerated startup recovery feature for 
RocksDB store with SYNC_FLUSH (#9667)
86e8ed630b is described below

commit 86e8ed630bd20c8ea2c9ab662af9b97389c41a20
Author: rongtong <[email protected]>
AuthorDate: Tue Sep 9 14:11:24 2025 +0800

    [ISSUE #9666] Add accelerated startup recovery feature for RocksDB store 
with SYNC_FLUSH (#9667)
    
    * feat: Add accelerated startup recovery feature
    
    Add accelerated startup recovery functionality when using RocksDB store 
with SYNC_FLUSH configuration:
    
    - Add enableAcceleratedRecovery configuration option in MessageStoreConfig
    - Implement accelerated recovery logic in CommitLog for both normal and 
abnormal recovery
    - Add protective fallback mechanism to handle edge cases
    - Improve isMappedFileMatchedRecover method for better robustness
    - Add comprehensive unit tests for the accelerated recovery feature
    
    This feature significantly reduces startup time when recovering from 
RocksDB-based storage
    with synchronous flushing enabled, while maintaining data consistency and 
safety.
    
    * Fix the issue of accelerated startup failure
    
    * refactor: Remove problematic unit test
    
    Remove AcceleratedRecoveryTest.java as the test implementation was not 
appropriate for the accelerated startup recovery feature.
    
    * Delete useless code
---
 .../java/org/apache/rocketmq/store/CommitLog.java  | 54 +++++++++++++++++-----
 .../rocketmq/store/config/MessageStoreConfig.java  | 18 +++++++-
 2 files changed, 60 insertions(+), 12 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 2e3dbbadc3..a4bdb7851d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -344,6 +344,16 @@ public class CommitLog implements Swappable {
             long processOffset = mappedFile.getFileFromOffset();
             long mappedFileOffset = 0;
             long lastValidMsgPhyOffset = this.getConfirmOffset();
+
+            if 
(defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore()
+                && 
defaultMessageStore.getMessageStoreConfig().isEnableAcceleratedRecovery()) {
+                mappedFileOffset = dispatchFromPhyOffset - 
mappedFile.getFileFromOffset();
+                if (mappedFileOffset > 0) {
+                    log.info("recover using acceleration, recovery offset is 
{}", dispatchFromPhyOffset);
+                    lastValidMsgPhyOffset = dispatchFromPhyOffset;
+                    byteBuffer.position((int) mappedFileOffset);
+                }
+            }
             while (true) {
                 DispatchRequest dispatchRequest = 
this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
                 int size = dispatchRequest.getMsgSize();
@@ -728,9 +738,29 @@ public class CommitLog implements Swappable {
 
             ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
             long processOffset = mappedFile.getFileFromOffset();
-            long mappedFileOffset = 0;
-            long lastValidMsgPhyOffset = processOffset;
-            long lastConfirmValidMsgPhyOffset = processOffset;
+            long mappedFileOffset;
+            long lastValidMsgPhyOffset;
+            long lastConfirmValidMsgPhyOffset;
+
+            if 
(defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore()
+                && 
defaultMessageStore.getMessageStoreConfig().isEnableAcceleratedRecovery()) {
+                mappedFileOffset = maxPhyOffsetOfConsumeQueue - 
mappedFile.getFileFromOffset();
+                // Protective measures, falling back to non-accelerated mode, 
which is extremely unlikely to occur
+                if (mappedFileOffset < 0) {
+                    mappedFileOffset = 0;
+                    lastValidMsgPhyOffset = processOffset;
+                    lastConfirmValidMsgPhyOffset = processOffset;
+                } else {
+                    log.info("recover using acceleration, recovery offset is 
{}", maxPhyOffsetOfConsumeQueue);
+                    lastValidMsgPhyOffset = maxPhyOffsetOfConsumeQueue;
+                    lastConfirmValidMsgPhyOffset = maxPhyOffsetOfConsumeQueue;
+                    byteBuffer.position((int) mappedFileOffset);
+                }
+            } else {
+                mappedFileOffset = 0;
+                lastValidMsgPhyOffset = processOffset;
+                lastConfirmValidMsgPhyOffset = processOffset;
+            }
             // abnormal recover require dispatching
             boolean doDispatch = true;
             while (true) {
@@ -840,19 +870,21 @@ public class CommitLog implements Swappable {
         boolean recoverNormally) throws RocksDBException {
         ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
 
-        int magicCode = 
byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSITION);
-        if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE && magicCode != 
MessageDecoder.MESSAGE_MAGIC_CODE_V2) {
+        boolean checkCRCOnRecover = 
this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
+        boolean checkDupInfo = 
this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable();
+
+        DispatchRequest dispatchRequest = 
this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
+
+        if (!dispatchRequest.isSuccess()) {
             return false;
         }
 
-        int sysFlag = byteBuffer.getInt(MessageDecoder.SYSFLAG_POSITION);
-        int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 
? 8 : 20;
-        int msgStoreTimePos = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 + 
bornHostLength;
-        long storeTimestamp = byteBuffer.getLong(msgStoreTimePos);
-        if (0 == storeTimestamp) {
+        long storeTimestamp = dispatchRequest.getStoreTimestamp();
+        long phyOffset = dispatchRequest.getCommitLogOffset();
+
+        if (0 == dispatchRequest.getStoreTimestamp()) {
             return false;
         }
-        long phyOffset = 
byteBuffer.getLong(MessageDecoder.MESSAGE_PHYSIC_OFFSET_POSITION);
 
         if 
(this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() &&
             
this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 60f6a90381..a142eca86f 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -464,6 +464,13 @@ public class MessageStoreConfig {
 
     private long rocksdbWalFileRollingThreshold = SizeUnit.GB;
 
+    /**
+     * Note: For correctness, this switch should be enabled only if the 
previous startup was configured with SYNC_FLUSH
+     * and the storeType was defaultRocksDB. This switch is not recommended 
for normal use cases (include master-slave
+     * or controller mode).
+     */
+    private boolean enableAcceleratedRecovery = false;
+
     public String getRocksdbCompressionType() {
         return rocksdbCompressionType;
     }
@@ -2008,7 +2015,16 @@ public class MessageStoreConfig {
         return enableLogConsumeQueueRepeatedlyBuildWhenRecover;
     }
 
-    public void setEnableLogConsumeQueueRepeatedlyBuildWhenRecover(boolean 
enableLogConsumeQueueRepeatedlyBuildWhenRecover) {
+    public void setEnableLogConsumeQueueRepeatedlyBuildWhenRecover(
+        boolean enableLogConsumeQueueRepeatedlyBuildWhenRecover) {
         this.enableLogConsumeQueueRepeatedlyBuildWhenRecover = 
enableLogConsumeQueueRepeatedlyBuildWhenRecover;
     }
+
+    public boolean isEnableAcceleratedRecovery() {
+        return enableAcceleratedRecovery;
+    }
+
+    public void setEnableAcceleratedRecovery(boolean 
enableAcceleratedRecovery) {
+        this.enableAcceleratedRecovery = enableAcceleratedRecovery;
+    }
 }

Reply via email to