This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 36adf1251f [ISSUE #10017] Validate commitlog offset in 
recoverAbnormally to prevent processing … (#10018)
36adf1251f is described below

commit 36adf1251ff5d3684083c2dbd2debf5ba2c104a8
Author: guyinyou <[email protected]>
AuthorDate: Tue Jan 20 14:44:22 2026 +0800

    [ISSUE #10017] Validate commitlog offset in recoverAbnormally to prevent 
processing … (#10018)
    
    * validate commitlog offset in recoverAbnormally to prevent processing old 
file data that passes CRC checks
    
    Change-Id: If4b1881f82d26ce8d374472d73ec9ce3d51ba643
    
    * fix
    
    Change-Id: Idc4bf7ec476cc9b6529619c2aa9afd6a980b819c
    
    * add checkCommitLogOffsetOnRecover in MessageStoreConfig
    
    Change-Id: Iac9afbb8b3ffb03fa15890decaf502afbfa44cf9
    
    ---------
    
    Co-authored-by: guyinyou <[email protected]>
---
 .../main/java/org/apache/rocketmq/store/CommitLog.java | 18 +++++++++++++++---
 .../rocketmq/store/config/MessageStoreConfig.java      | 11 +++++++++++
 2 files changed, 26 insertions(+), 3 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 286f31cd4a..3b92f1a745 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -738,6 +738,7 @@ public class CommitLog implements Swappable {
         // recover by the minimum time stamp
         boolean checkCRCOnRecover = 
this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
         boolean checkDupInfo = 
this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable();
+        boolean checkCommitLogOffsetOnRecover = 
this.defaultMessageStore.getMessageStoreConfig().isCheckCommitLogOffsetOnRecover();
         int maxRecoverNum = 
this.defaultMessageStore.getMessageStoreConfig().getCommitLogRecoverMaxNum();
         if (maxRecoverNum <= 0) {
             maxRecoverNum = 10;
@@ -792,8 +793,18 @@ public class CommitLog implements Swappable {
             while (true) {
                 DispatchRequest dispatchRequest = 
this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
                 int size = dispatchRequest.getMsgSize();
-
                 if (dispatchRequest.isSuccess()) {
+                    // Check commitlog offset validity if enabled
+                    if (checkCommitLogOffsetOnRecover) {
+                        if (dispatchRequest.getCommitLogOffset() < 
mappedFile.getFileFromOffset()
+                            || dispatchRequest.getCommitLogOffset() > 
mappedFile.getFileFromOffset() + mappedFile.getFileSize()) {
+                            log.warn("found illegal commitlog offset {} in {}, 
current pos={}, it will be truncated.",
+                                dispatchRequest.getCommitLogOffset(), 
mappedFile.getFileName(), processOffset + mappedFileOffset);
+                            log.info("recover physics file end, {} pos={}", 
mappedFile.getFileName(), byteBuffer.position());
+
+                            break;
+                        }
+                    }
                     // Normal data
                     if (size > 0) {
                         lastValidMsgPhyOffset = processOffset + 
mappedFileOffset;
@@ -925,7 +936,8 @@ public class CommitLog implements Swappable {
         return isMappedFileMatchedRecover(phyOffset, storeTimestamp, 
recoverNormally);
     }
 
-    private boolean isMappedFileMatchedRecover(long phyOffset, long 
storeTimestamp, boolean recoverNormally) throws RocksDBException {
+    private boolean isMappedFileMatchedRecover(long phyOffset, long 
storeTimestamp,
+        boolean recoverNormally) throws RocksDBException {
         boolean result = 
this.defaultMessageStore.getQueueStore().isMappedFileMatchedRecover(phyOffset, 
storeTimestamp, recoverNormally);
         if (null != this.defaultMessageStore.getTransMessageRocksDBStore() && 
defaultMessageStore.getMessageStoreConfig().isTransRocksDBEnable() && 
!defaultMessageStore.getMessageStoreConfig().isTransWriteOriginTransHalfEnable())
 {
             result = result && 
this.defaultMessageStore.getTransMessageRocksDBStore().isMappedFileMatchedRecover(phyOffset);
@@ -2386,7 +2398,7 @@ public class CommitLog implements Swappable {
                     long costTime = this.systemClock.now() - 
beginClockTimestamp;
                     log.info("[{}] scanFilesInPageCache-cost {} ms.", costTime 
> 30 * 1000 ? "NOTIFYME" : "OK", costTime);
                 } catch (Throwable e) {
-                    log.warn("{} service has e: ", this.getServiceName() , e);
+                    log.warn("{} service has e: ", this.getServiceName(), e);
                 }
             }
             log.info("{} service end", this.getServiceName());
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 65dba5390d..d7f17efd64 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -194,6 +194,9 @@ public class MessageStoreConfig {
     // This ensures no on-the-wire or on-disk corruption to the messages 
occurred.
     // This check adds some overhead,so it may be disabled in cases seeking 
extreme performance.
     private boolean checkCRCOnRecover = true;
+    // Whether check the commitlog offset validity during abnormal recovery.
+    // This helps detect and truncate old file data that may pass CRC checks 
but contains invalid offsets.
+    private boolean checkCommitLogOffsetOnRecover = false;
     // How many pages are to be flushed when flush CommitLog
     private int flushCommitLogLeastPages = 4;
     // How many pages are to be committed when commit data to file
@@ -796,6 +799,14 @@ public class MessageStoreConfig {
         this.checkCRCOnRecover = checkCRCOnRecover;
     }
 
+    public boolean isCheckCommitLogOffsetOnRecover() {
+        return checkCommitLogOffsetOnRecover;
+    }
+
+    public void setCheckCommitLogOffsetOnRecover(boolean 
checkCommitLogOffsetOnRecover) {
+        this.checkCommitLogOffsetOnRecover = checkCommitLogOffsetOnRecover;
+    }
+
     public boolean isForceVerifyPropCRC() {
         return forceVerifyPropCRC;
     }

Reply via email to