This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 93f60db5f6 [ISSUE #9713] Improve data consistency in DefaultMappedFile 
flush method
93f60db5f6 is described below

commit 93f60db5f6e6350237b40e9173f2b4894c803135
Author: guyinyou <[email protected]>
AuthorDate: Tue Sep 16 21:34:50 2025 +0800

    [ISSUE #9713] Improve data consistency in DefaultMappedFile flush method
    
    - Move FLUSHED_POSITION_UPDATER.set() inside try block
    - Prevent false positive flush success when actual flush operation fails
    - Ensure data consistency and prevent potential data loss on system crash
    
    Previously, flushed position was updated even when flush operation failed,
    which could lead to data loss as the system would incorrectly assume
    data was persisted to disk when it was still in memory.
    
    Co-authored-by: guyinyou <[email protected]>
---
 .../org/apache/rocketmq/store/logfile/DefaultMappedFile.java  | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java 
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index f2383993d4..889eb25b0f 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@ -123,6 +123,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
     private static final SharedByteBuffer[] SHARED_BYTE_BUFFER;
 
     protected RunningFlags runningFlags;
+
     static class SharedByteBuffer {
         private final ReentrantLock lock;
         private final ByteBuffer buffer;
@@ -532,14 +533,13 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
      */
     @Override
     public int flush(final int flushLeastPages) {
+        if (!isWriteable()) {
+            return this.getFlushedPosition();
+        }
         if (this.isAbleToFlush(flushLeastPages)) {
             if (this.hold()) {
                 int value = getReadPosition();
 
-                if (!isWriteable()) {
-                    return this.getFlushedPosition();
-                }
-
                 try {
                     this.mappedByteBufferAccessCountSinceLastSwap++;
 
@@ -555,14 +555,13 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
                         }
                     }
                     this.lastFlushTime = System.currentTimeMillis();
+                    FLUSHED_POSITION_UPDATER.set(this, value);
                 } catch (Throwable e) {
                     if (e instanceof IOException) {
                         getAndMakeNotWriteable();
                     }
                     log.error("Error occurred when force data to disk.", e);
                 }
-
-                FLUSHED_POSITION_UPDATER.set(this, value);
                 this.release();
             } else {
                 log.warn("in flush, hold failed, flush offset = " + 
FLUSHED_POSITION_UPDATER.get(this));

Reply via email to