This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 4ae02942e5 [ISSUE #9693] Add writeWithoutMmap configuration to prevent 
JVM crash when device becomes read-only (#9694)
4ae02942e5 is described below

commit 4ae02942e58a519829a4b50c6a4d50e6c631cab2
Author: guyinyou <[email protected]>
AuthorDate: Sun Sep 14 14:43:06 2025 +0800

    [ISSUE #9693] Add writeWithoutMmap configuration to prevent JVM crash when 
device becomes read-only (#9694)
    
    * add "writeWithoutMmap"
    
    * fix npe
    
    ---------
    
    Co-authored-by: guyinyou <[email protected]>
---
 .../rocketmq/store/AllocateMappedFileService.java  |   9 +-
 .../java/org/apache/rocketmq/store/CommitLog.java  |   3 +-
 .../org/apache/rocketmq/store/ConsumeQueue.java    |  10 +-
 .../org/apache/rocketmq/store/ConsumeQueueExt.java |  36 ++++
 .../org/apache/rocketmq/store/MappedFileQueue.java |  17 +-
 .../rocketmq/store/MultiPathMappedFileQueue.java   |   3 +-
 .../rocketmq/store/config/MessageStoreConfig.java  |  15 ++
 .../rocketmq/store/logfile/DefaultMappedFile.java  | 200 ++++++++++++++---
 .../rocketmq/store/queue/BatchConsumeQueue.java    |  11 +-
 .../logfile/DefaultMappedFileConcurrencyTest.java  | 191 +++++++++++++++++
 .../DefaultMappedFileErrorHandlingTest.java        | 210 ++++++++++++++++++
 .../logfile/DefaultMappedFilePerformanceTest.java  | 236 +++++++++++++++++++++
 .../DefaultMappedFileWriteWithoutMmapTest.java     | 148 +++++++++++++
 13 files changed, 1052 insertions(+), 37 deletions(-)

diff --git 
a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java 
b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
index d9cd602a65..a56fa46157 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
@@ -172,16 +172,17 @@ public class AllocateMappedFileService extends 
ServiceThread {
                 long beginTime = System.currentTimeMillis();
 
                 MappedFile mappedFile;
+                boolean writeWithoutMmap = 
messageStore.getMessageStoreConfig().isWriteWithoutMmap();
                 if (messageStore.isTransientStorePoolEnable()) {
                     try {
                         mappedFile = 
ServiceLoader.load(MappedFile.class).iterator().next();
                         mappedFile.init(req.getFilePath(), req.getFileSize(), 
messageStore.getTransientStorePool());
                     } catch (RuntimeException e) {
                         log.warn("Use default implementation.");
-                        mappedFile = new DefaultMappedFile(req.getFilePath(), 
req.getFileSize(), messageStore.getTransientStorePool());
+                        mappedFile = new DefaultMappedFile(req.getFilePath(), 
req.getFileSize(), messageStore.getTransientStorePool(), writeWithoutMmap);
                     }
                 } else {
-                    mappedFile = new DefaultMappedFile(req.getFilePath(), 
req.getFileSize());
+                    mappedFile = new DefaultMappedFile(req.getFilePath(), 
req.getFileSize(), writeWithoutMmap);
                 }
 
                 long elapsedTime = 
UtilAll.computeElapsedTimeMilliseconds(beginTime);
@@ -195,7 +196,9 @@ public class AllocateMappedFileService extends 
ServiceThread {
                 if (mappedFile.getFileSize() >= 
this.messageStore.getMessageStoreConfig()
                     .getMappedFileSizeCommitLog()
                     &&
-                    
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
+                    
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()
+                    &&
+                    
!this.messageStore.getMessageStoreConfig().isWriteWithoutMmap()) {
                     
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
                         
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
                 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java 
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index a4bdb7851d..3b26afcc09 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -118,7 +118,8 @@ public class CommitLog implements Swappable {
         } else {
             this.mappedFileQueue = new MappedFileQueue(storePath,
                 
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
-                messageStore.getAllocateMappedFileService());
+                messageStore.getAllocateMappedFileService(),
+                messageStore.getMessageStoreConfig().isWriteWithoutMmap());
         }
 
         this.defaultMessageStore = messageStore;
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 2850299b7d..02f90cef1d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -98,7 +98,12 @@ public class ConsumeQueue implements ConsumeQueueInterface, 
FileQueueLifeCycle {
             + File.separator + topic
             + File.separator + queueId;
 
-        this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, 
null);
+        boolean writeWithoutMmap = false;
+        if (messageStore.getMessageStoreConfig() != null) {
+            writeWithoutMmap = 
messageStore.getMessageStoreConfig().isWriteWithoutMmap();
+        }
+
+        this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, 
null, writeWithoutMmap);
 
         this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
 
@@ -108,7 +113,8 @@ public class ConsumeQueue implements ConsumeQueueInterface, 
FileQueueLifeCycle {
                 queueId,
                 
StorePathConfigHelper.getStorePathConsumeQueueExt(messageStore.getMessageStoreConfig().getStorePathRootDir()),
                 
messageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
-                
messageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
+                
messageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt(),
+                writeWithoutMmap
             );
         }
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java 
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
index 3f266378df..641f672bba 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
@@ -90,6 +90,42 @@ public class ConsumeQueueExt {
         }
     }
 
+    /**
+     * Constructor with writeWithoutMmap support.
+     *
+     * @param topic topic
+     * @param queueId id of queue
+     * @param storePath root dir of files to store.
+     * @param mappedFileSize file size
+     * @param bitMapLength bit map length.
+     * @param writeWithoutMmap whether to use RandomAccessFile instead of 
MappedByteBuffer
+     */
+    public ConsumeQueueExt(final String topic,
+        final int queueId,
+        final String storePath,
+        final int mappedFileSize,
+        final int bitMapLength,
+        final boolean writeWithoutMmap) {
+
+        this.storePath = storePath;
+        this.mappedFileSize = mappedFileSize;
+
+        this.topic = topic;
+        this.queueId = queueId;
+
+        String queueDir = this.storePath
+            + File.separator + topic
+            + File.separator + queueId;
+
+        this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, 
null, writeWithoutMmap);
+
+        if (bitMapLength > 0) {
+            this.tempContainer = ByteBuffer.allocate(
+                bitMapLength / Byte.SIZE
+            );
+        }
+    }
+
     public long getTotalSize() {
         return this.mappedFileQueue.getTotalFileSize();
     }
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index e32c16a82a..320e842154 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -53,6 +53,11 @@ public class MappedFileQueue implements Swappable {
     protected long committedWhere = 0;
 
     protected volatile long storeTimestamp = 0;
+    
+    /**
+     * Configuration flag to use RandomAccessFile instead of MappedByteBuffer 
for writing
+     */
+    protected boolean writeWithoutMmap = false;
 
     public MappedFileQueue(final String storePath, int mappedFileSize,
         AllocateMappedFileService allocateMappedFileService) {
@@ -61,6 +66,14 @@ public class MappedFileQueue implements Swappable {
         this.allocateMappedFileService = allocateMappedFileService;
     }
 
+    public MappedFileQueue(final String storePath, int mappedFileSize,
+        AllocateMappedFileService allocateMappedFileService, boolean 
writeWithoutMmap) {
+        this.storePath = storePath;
+        this.mappedFileSize = mappedFileSize;
+        this.allocateMappedFileService = allocateMappedFileService;
+        this.writeWithoutMmap = writeWithoutMmap;
+    }
+
     public void checkSelf() {
         List<MappedFile> mappedFiles = new ArrayList<>(this.mappedFiles);
         if (!mappedFiles.isEmpty()) {
@@ -266,7 +279,7 @@ public class MappedFileQueue implements Swappable {
             }
 
             try {
-                MappedFile mappedFile = new DefaultMappedFile(file.getPath(), 
mappedFileSize);
+                MappedFile mappedFile = new DefaultMappedFile(file.getPath(), 
mappedFileSize, writeWithoutMmap);
 
                 mappedFile.setWrotePosition(this.mappedFileSize);
                 mappedFile.setFlushedPosition(this.mappedFileSize);
@@ -356,7 +369,7 @@ public class MappedFileQueue implements Swappable {
                     nextNextFilePath, this.mappedFileSize);
         } else {
             try {
-                mappedFile = new DefaultMappedFile(nextFilePath, 
this.mappedFileSize);
+                mappedFile = new DefaultMappedFile(nextFilePath, 
this.mappedFileSize, this.writeWithoutMmap);
             } catch (IOException e) {
                 log.error("create mappedFile exception", e);
             }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
index 8ff050dfe3..72ec8820a6 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
@@ -39,7 +39,8 @@ public class MultiPathMappedFileQueue extends MappedFileQueue 
{
     public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int 
mappedFileSize,
                                     AllocateMappedFileService 
allocateMappedFileService,
                                     Supplier<Set<String>> 
fullStorePathsSupplier) {
-        super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, 
allocateMappedFileService);
+        super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize, 
allocateMappedFileService, 
+              messageStoreConfig.isWriteWithoutMmap());
         this.config = messageStoreConfig;
         this.fullStorePathsSupplier = fullStorePathsSupplier;
     }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index a142eca86f..e4c3a4045f 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -238,6 +238,13 @@ public class MessageStoreConfig {
     private int transientStorePoolSize = 5;
     private boolean fastFailIfNoBufferInStorePool = false;
 
+    /**
+     * When true, use RandomAccessFile for writing instead of MappedByteBuffer.
+     * This can be useful for certain scenarios where mmap is not desired.
+     */
+    @ImportantField
+    private boolean writeWithoutMmap = false;
+
     // DLedger message store config
     private boolean enableDLegerCommitLog = false;
     private String dLegerGroup;
@@ -1147,6 +1154,14 @@ public class MessageStoreConfig {
         this.transientStorePoolEnable = transientStorePoolEnable;
     }
 
+    public boolean isWriteWithoutMmap() {
+        return writeWithoutMmap;
+    }
+
+    public void setWriteWithoutMmap(final boolean writeWithoutMmap) {
+        this.writeWithoutMmap = writeWithoutMmap;
+    }
+
     public int getTransientStorePoolSize() {
         return transientStorePoolSize;
     }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java 
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index c490d093a1..b2d89108b4 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@ -33,9 +33,11 @@ import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.StandardCopyOption;
 import java.util.Iterator;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
 import java.util.function.Consumer;
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.rocketmq.common.UtilAll;
@@ -79,11 +81,19 @@ public class DefaultMappedFile extends AbstractMappedFile {
     protected volatile int flushedPosition;
     protected int fileSize;
     protected FileChannel fileChannel;
+    /**
+     * RandomAccessFile for writing when writeWithoutMmap is enabled
+     */
+    protected RandomAccessFile randomAccessFile = null;
     /**
      * Message will put to here first, and then reput to FileChannel if 
writeBuffer is not null.
      */
     protected ByteBuffer writeBuffer = null;
     protected TransientStorePool transientStorePool = null;
+    /**
+     * Configuration flag to use RandomAccessFile instead of MappedByteBuffer 
for writing
+     */
+    protected boolean writeWithoutMmap = false;
     protected String fileName;
     protected long fileFromOffset;
     protected File file;
@@ -108,6 +118,28 @@ public class DefaultMappedFile extends AbstractMappedFile {
      */
     private long stopTimestamp = -1;
 
+    private static int maxSharedNum = 16;
+    private static final SharedByteBuffer[] SHARED_BYTE_BUFFER;
+
+    static class SharedByteBuffer {
+        private final ReentrantLock lock;
+        private final ByteBuffer buffer;
+
+        public SharedByteBuffer(int size) {
+            this.lock = new ReentrantLock();
+            this.buffer = ByteBuffer.allocate(size);
+        }
+
+        public void release() {
+            this.lock.unlock();
+        }
+
+        public ByteBuffer acquire() {
+            this.lock.lock();
+            return buffer;
+        }
+    }
+
     static {
         WROTE_POSITION_UPDATER = 
AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, "wrotePosition");
         COMMITTED_POSITION_UPDATER = 
AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, 
"committedPosition");
@@ -124,6 +156,17 @@ public class DefaultMappedFile extends AbstractMappedFile {
             }
         }
         IS_LOADED_METHOD = isLoaded0method;
+
+        SHARED_BYTE_BUFFER = new SharedByteBuffer[maxSharedNum];
+        for (int i = 0; i < maxSharedNum; i++) {
+            SHARED_BYTE_BUFFER[i] = new SharedByteBuffer(4 * 1024 * 1024);
+        }
+    }
+
+    private static SharedByteBuffer borrowSharedByteBuffer() {
+        int idx = ThreadLocalRandom.current().nextInt(maxSharedNum);
+        SharedByteBuffer buffer = SHARED_BYTE_BUFFER[idx];
+        return buffer;
     }
 
     public DefaultMappedFile() {
@@ -138,6 +181,18 @@ public class DefaultMappedFile extends AbstractMappedFile {
         init(fileName, fileSize, transientStorePool);
     }
 
+    public DefaultMappedFile(final String fileName, final int fileSize,
+        final boolean writeWithoutMmap) throws IOException {
+        this.writeWithoutMmap = writeWithoutMmap;
+        init(fileName, fileSize);
+    }
+
+    public DefaultMappedFile(final String fileName, final int fileSize,
+        final TransientStorePool transientStorePool, final boolean 
writeWithoutMmap) throws IOException {
+        this.writeWithoutMmap = writeWithoutMmap;
+        init(fileName, fileSize, transientStorePool);
+    }
+
     public static int getTotalMappedFiles() {
         return TOTAL_MAPPED_FILES.get();
     }
@@ -150,8 +205,10 @@ public class DefaultMappedFile extends AbstractMappedFile {
     public void init(final String fileName, final int fileSize,
         final TransientStorePool transientStorePool) throws IOException {
         init(fileName, fileSize);
-        this.writeBuffer = transientStorePool.borrowBuffer();
-        this.transientStorePool = transientStorePool;
+        if (transientStorePool != null) {
+            this.writeBuffer = transientStorePool.borrowBuffer();
+            this.transientStorePool = transientStorePool;
+        }
     }
 
     private void init(final String fileName, final int fileSize) throws 
IOException {
@@ -165,7 +222,17 @@ public class DefaultMappedFile extends AbstractMappedFile {
 
         try {
             this.fileChannel = new RandomAccessFile(this.file, 
"rw").getChannel();
-            this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 
0, fileSize);
+
+            if (writeWithoutMmap) {
+                // Use RandomAccessFile for writing instead of MappedByteBuffer
+                this.randomAccessFile = new RandomAccessFile(this.file, "rw");
+                // Still create MappedByteBuffer for reading operations
+                this.mappedByteBuffer = 
this.fileChannel.map(MapMode.READ_ONLY, 0, fileSize);
+            } else {
+                // Use MappedByteBuffer for both reading and writing (default 
behavior)
+                this.mappedByteBuffer = 
this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
+            }
+
             TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
             TOTAL_MAPPED_FILES.incrementAndGet();
             ok = true;
@@ -179,6 +246,9 @@ public class DefaultMappedFile extends AbstractMappedFile {
             if (!ok && this.fileChannel != null) {
                 this.fileChannel.close();
             }
+            if (!ok && this.randomAccessFile != null) {
+                this.randomAccessFile.close();
+            }
         }
     }
 
@@ -243,10 +313,35 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
         assert cb != null;
 
         int currentPos = WROTE_POSITION_UPDATER.get(this);
+        long fileFromOffset = this.getFileFromOffset();
+
         if (currentPos < this.fileSize) {
-            ByteBuffer byteBuffer = appendMessageBuffer().slice();
-            byteBuffer.position(currentPos);
-            AppendMessageResult result = cb.doAppend(byteBuffer, 
this.fileFromOffset, this.fileSize - currentPos, byteBufferMsg);
+            SharedByteBuffer sharedByteBuffer = null;
+            ByteBuffer byteBuffer;
+            if (writeWithoutMmap && randomAccessFile != null) {
+                sharedByteBuffer = borrowSharedByteBuffer();
+                byteBuffer = sharedByteBuffer.acquire();
+                byteBuffer.position(0).limit(byteBuffer.capacity());
+                fileFromOffset += currentPos;
+            } else {
+                byteBuffer = appendMessageBuffer().slice();
+                byteBuffer.position(currentPos);
+            }
+
+            AppendMessageResult result = cb.doAppend(byteBuffer, 
fileFromOffset, this.fileSize - currentPos, byteBufferMsg);
+
+            if (sharedByteBuffer != null) {
+                try {
+                    randomAccessFile.seek(currentPos);
+                    randomAccessFile.write(byteBuffer.array(), 0, 
result.getWroteBytes());
+                } catch (Throwable t) {
+                    log.error("Failed to write to mappedFile {}", 
this.fileName, t);
+                    return new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+                } finally {
+                    sharedByteBuffer.release();
+                }
+            }
+
             WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
             this.storeTimestamp = result.getStoreTimestamp();
             return result;
@@ -273,22 +368,46 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
         assert cb != null;
 
         int currentPos = WROTE_POSITION_UPDATER.get(this);
+        long fileFromOffset = this.getFileFromOffset();
 
         if (currentPos < this.fileSize) {
-            ByteBuffer byteBuffer = appendMessageBuffer().slice();
-            byteBuffer.position(currentPos);
+            SharedByteBuffer sharedByteBuffer = null;
+            ByteBuffer byteBuffer;
+            if (writeWithoutMmap && randomAccessFile != null) {
+                sharedByteBuffer = borrowSharedByteBuffer();
+                byteBuffer = sharedByteBuffer.acquire();
+                byteBuffer.position(0).limit(byteBuffer.capacity());
+                fileFromOffset += currentPos;
+            } else {
+                byteBuffer = appendMessageBuffer().slice();
+                byteBuffer.position(currentPos);
+            }
+
             AppendMessageResult result;
             if (messageExt instanceof MessageExtBatch && !((MessageExtBatch) 
messageExt).isInnerBatch()) {
                 // traditional batch message
-                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, 
this.fileSize - currentPos,
+                result = cb.doAppend(fileFromOffset, byteBuffer, this.fileSize 
- currentPos,
                     (MessageExtBatch) messageExt, putMessageContext);
             } else if (messageExt instanceof MessageExtBrokerInner) {
                 // traditional single message or newly introduced inner-batch 
message
-                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, 
this.fileSize - currentPos,
+                result = cb.doAppend(fileFromOffset, byteBuffer, this.fileSize 
- currentPos,
                     (MessageExtBrokerInner) messageExt, putMessageContext);
             } else {
                 return new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
             }
+
+            if (sharedByteBuffer != null) {
+                try {
+                    randomAccessFile.seek(currentPos);
+                    randomAccessFile.write(byteBuffer.array(), 0, 
result.getWroteBytes());
+                } catch (Throwable t) {
+                    log.error("Failed to write to mappedFile {}", 
this.fileName, t);
+                    return new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+                } finally {
+                    sharedByteBuffer.release();
+                }
+            }
+
             WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
             this.storeTimestamp = result.getStoreTimestamp();
             return result;
@@ -319,15 +438,25 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
 
         if ((currentPos + remaining) <= this.fileSize) {
             try {
-                this.fileChannel.position(currentPos);
-                while (data.hasRemaining()) {
-                    this.fileChannel.write(data);
+                if (writeWithoutMmap && randomAccessFile != null) {
+                    // Use RandomAccessFile for writing
+                    randomAccessFile.seek(currentPos);
+                    byte[] buffer = new byte[remaining];
+                    data.get(buffer);
+                    randomAccessFile.write(buffer);
+                } else {
+                    // Use FileChannel for writing (default behavior)
+                    this.fileChannel.position(currentPos);
+                    while (data.hasRemaining()) {
+                        this.fileChannel.write(data);
+                    }
                 }
+                WROTE_POSITION_UPDATER.addAndGet(this, remaining);
+                return true;
             } catch (Throwable e) {
                 log.error("Error occurred when append message to mappedFile.", 
e);
+                return false;
             }
-            WROTE_POSITION_UPDATER.addAndGet(this, remaining);
-            return true;
         }
         return false;
     }
@@ -344,14 +473,22 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
 
         if ((currentPos + length) <= this.fileSize) {
             try {
-                ByteBuffer buf = this.mappedByteBuffer.slice();
-                buf.position(currentPos);
-                buf.put(data, offset, length);
+                if (writeWithoutMmap && randomAccessFile != null) {
+                    // Use RandomAccessFile for writing
+                    randomAccessFile.seek(currentPos);
+                    randomAccessFile.write(data, offset, length);
+                } else {
+                    // Use MappedByteBuffer for writing (default behavior)
+                    ByteBuffer buf = this.mappedByteBuffer.slice();
+                    buf.position(currentPos);
+                    buf.put(data, offset, length);
+                }
+                WROTE_POSITION_UPDATER.addAndGet(this, length);
+                return true;
             } catch (Throwable e) {
                 log.error("Error occurred when append message to mappedFile.", 
e);
+                return false;
             }
-            WROTE_POSITION_UPDATER.addAndGet(this, length);
-            return true;
         }
 
         return false;
@@ -365,11 +502,12 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
             try {
                 this.fileChannel.position(currentPos);
                 this.fileChannel.write(ByteBuffer.wrap(data, 0, data.length));
+                WROTE_POSITION_UPDATER.addAndGet(this, data.length);
+                return true;
             } catch (Throwable e) {
                 log.error("Error occurred when append message to mappedFile.", 
e);
+                return false;
             }
-            WROTE_POSITION_UPDATER.addAndGet(this, data.length);
-            return true;
         }
 
         return false;
@@ -387,11 +525,16 @@ public class DefaultMappedFile extends AbstractMappedFile 
{
                 try {
                     this.mappedByteBufferAccessCountSinceLastSwap++;
 
-                    //We only append data to fileChannel or mappedByteBuffer, 
never both.
-                    if (writeBuffer != null || this.fileChannel.position() != 
0) {
-                        this.fileChannel.force(false);
+                    if (writeWithoutMmap && randomAccessFile != null) {
+                        // Use RandomAccessFile for flushing
+                        randomAccessFile.getChannel().force(false);
                     } else {
-                        this.mappedByteBuffer.force();
+                        //We only append data to fileChannel or 
mappedByteBuffer, never both.
+                        if (writeBuffer != null || this.fileChannel.position() 
!= 0) {
+                            this.fileChannel.force(false);
+                        } else {
+                            this.mappedByteBuffer.force();
+                        }
                     }
                     this.lastFlushTime = System.currentTimeMillis();
                 } catch (Throwable e) {
@@ -574,6 +717,11 @@ public class DefaultMappedFile extends AbstractMappedFile {
                 this.fileChannel.close();
                 log.info("close file channel " + this.fileName + " OK");
 
+                if (this.randomAccessFile != null) {
+                    this.randomAccessFile.close();
+                    log.info("close random access file " + this.fileName + " 
OK");
+                }
+
                 long beginTime = System.currentTimeMillis();
                 boolean result = this.file.delete();
                 log.info("delete file[REF:" + this.getRefCount() + "] " + 
this.fileName
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index 21181faf3b..3f1dc237d6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -105,12 +105,19 @@ public class BatchConsumeQueue implements 
ConsumeQueueInterface {
         this.topic = topic;
         this.queueId = queueId;
 
+        boolean writeWithoutMmap = false;
+        if (messageStore.getMessageStoreConfig() != null) {
+            writeWithoutMmap = 
messageStore.getMessageStoreConfig().isWriteWithoutMmap();
+        }
+
         if (StringUtils.isBlank(subfolder)) {
             String queueDir = this.storePath + File.separator + topic + 
File.separator + queueId;
-            this.mappedFileQueue = new MappedFileQueue(queueDir, 
mappedFileSize, null);
+            this.mappedFileQueue = new MappedFileQueue(queueDir, 
mappedFileSize, null,
+                writeWithoutMmap);
         } else {
             String queueDir = this.storePath + File.separator + topic + 
File.separator + queueId + File.separator + subfolder;
-            this.mappedFileQueue = new MappedFileQueue(queueDir, 
mappedFileSize, null);
+            this.mappedFileQueue = new MappedFileQueue(queueDir, 
mappedFileSize, null,
+                writeWithoutMmap);
         }
 
         this.byteBufferItem = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileConcurrencyTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileConcurrencyTest.java
new file mode 100644
index 0000000000..06f94727d6
--- /dev/null
+++ 
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileConcurrencyTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.logfile;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.UtilAll;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DefaultMappedFileConcurrencyTest {
+
+    private String storePath;
+    private String fileName;
+    private int fileSize = 1024 * 1024; // 1MB
+    private static final int THREAD_COUNT = 10;
+    private static final int OPERATIONS_PER_THREAD = 100;
+
+    @Before
+    public void setUp() throws Exception {
+        storePath = System.getProperty("user.home") + File.separator + 
"unitteststore" + System.currentTimeMillis();
+        fileName = storePath + File.separator + "00000000000000000000";
+        UtilAll.ensureDirOK(storePath);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        UtilAll.deleteFile(new File(storePath));
+    }
+
+    @Test
+    public void testConcurrentWriteWithoutMmap() throws Exception {
+        DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, 
fileSize, true);
+        
+        try {
+            ExecutorService executor = 
Executors.newFixedThreadPool(THREAD_COUNT);
+            CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
+            AtomicInteger successCount = new AtomicInteger(0);
+            AtomicInteger errorCount = new AtomicInteger(0);
+            
+            for (int i = 0; i < THREAD_COUNT; i++) {
+                final int threadId = i;
+                executor.submit(() -> {
+                    try {
+                        for (int j = 0; j < OPERATIONS_PER_THREAD; j++) {
+                            String data = 
String.format("Thread-%d-Operation-%d", threadId, j);
+                            byte[] bytes = data.getBytes();
+                            
+                            boolean result = mappedFile.appendMessage(bytes);
+                            if (result) {
+                                successCount.incrementAndGet();
+                            } else {
+                                errorCount.incrementAndGet();
+                            }
+                        }
+                    } catch (Exception e) {
+                        errorCount.incrementAndGet();
+                        e.printStackTrace();
+                    } finally {
+                        latch.countDown();
+                    }
+                });
+            }
+            
+            latch.await();
+            executor.shutdown();
+            
+            // Success count: successCount.get()
+            // Error count: errorCount.get()
+            // Final wrote position: mappedFile.getWrotePosition()
+            
+            // All operations should succeed
+            Assert.assertEquals("All write operations should succeed", 
+                THREAD_COUNT * OPERATIONS_PER_THREAD, successCount.get());
+            Assert.assertEquals("No errors should occur", 0, errorCount.get());
+            
+        } finally {
+            mappedFile.destroy(0);
+        }
+    }
+
+    @Test
+    public void testConcurrentWriteWithMmap() throws Exception {
+        DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, 
fileSize, false);
+        
+        try {
+            ExecutorService executor = 
Executors.newFixedThreadPool(THREAD_COUNT);
+            CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
+            AtomicInteger successCount = new AtomicInteger(0);
+            AtomicInteger errorCount = new AtomicInteger(0);
+            
+            for (int i = 0; i < THREAD_COUNT; i++) {
+                final int threadId = i;
+                executor.submit(() -> {
+                    try {
+                        for (int j = 0; j < OPERATIONS_PER_THREAD; j++) {
+                            String data = 
String.format("Thread-%d-Operation-%d", threadId, j);
+                            byte[] bytes = data.getBytes();
+                            
+                            boolean result = mappedFile.appendMessage(bytes);
+                            if (result) {
+                                successCount.incrementAndGet();
+                            } else {
+                                errorCount.incrementAndGet();
+                            }
+                        }
+                    } catch (Exception e) {
+                        errorCount.incrementAndGet();
+                        e.printStackTrace();
+                    } finally {
+                        latch.countDown();
+                    }
+                });
+            }
+            
+            latch.await();
+            executor.shutdown();
+            
+            // Success count: successCount.get()
+            // Error count: errorCount.get()
+            // Final wrote position: mappedFile.getWrotePosition()
+            
+            // All operations should succeed
+            Assert.assertEquals("All write operations should succeed", 
+                THREAD_COUNT * OPERATIONS_PER_THREAD, successCount.get());
+            Assert.assertEquals("No errors should occur", 0, errorCount.get());
+            
+        } finally {
+            mappedFile.destroy(0);
+        }
+    }
+
+    @Test
+    public void testConcurrentFlush() throws Exception {
+        DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, 
fileSize, true);
+        
+        try {
+            // Write some data first
+            for (int i = 0; i < 100; i++) {
+                String data = "Test data " + i;
+                mappedFile.appendMessage(data.getBytes());
+            }
+            
+            ExecutorService executor = Executors.newFixedThreadPool(5);
+            CountDownLatch latch = new CountDownLatch(5);
+            AtomicInteger flushCount = new AtomicInteger(0);
+            
+            for (int i = 0; i < 5; i++) {
+                executor.submit(() -> {
+                    try {
+                        int flushed = mappedFile.flush(0);
+                        if (flushed > 0) {
+                            flushCount.incrementAndGet();
+                        }
+                    } catch (Exception e) {
+                        e.printStackTrace();
+                    } finally {
+                        latch.countDown();
+                    }
+                });
+            }
+            
+            latch.await();
+            executor.shutdown();
+            
+            Assert.assertTrue("At least one flush should succeed", 
flushCount.get() > 0);
+            
+        } finally {
+            mappedFile.destroy(0);
+        }
+    }
+}
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileErrorHandlingTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileErrorHandlingTest.java
new file mode 100644
index 0000000000..649e8071cc
--- /dev/null
+++ 
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileErrorHandlingTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.logfile;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.apache.rocketmq.store.AppendMessageCallback;
+import org.apache.rocketmq.store.AppendMessageResult;
+import org.apache.rocketmq.store.AppendMessageStatus;
+import org.apache.rocketmq.store.CompactionAppendMsgCallback;
+import org.apache.rocketmq.store.PutMessageContext;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DefaultMappedFileErrorHandlingTest {
+
+    private String storePath;
+    private String fileName;
+    private int fileSize = 1024 * 1024; // 1MB
+
+    @Before
+    public void setUp() throws Exception {
+        storePath = System.getProperty("user.home") + File.separator + 
"unitteststore" + System.currentTimeMillis();
+        fileName = storePath + File.separator + "00000000000000000000";
+        UtilAll.ensureDirOK(storePath);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        UtilAll.deleteFile(new File(storePath));
+    }
+
+    @Test
+    public void testAppendMessageCallbackErrorHandling() throws IOException {
+        DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, 
fileSize, true);
+        
+        try {
+            // Test with a callback that returns an error
+            AppendMessageCallback errorCallback = new AppendMessageCallback() {
+                @Override
+                public AppendMessageResult doAppend(long fileFromOffset, 
ByteBuffer byteBuffer, 
+                    int maxBlank, MessageExtBrokerInner msg, 
+                    PutMessageContext putMessageContext) {
+                    return new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+                }
+                
+                @Override
+                public AppendMessageResult doAppend(long fileFromOffset, 
ByteBuffer byteBuffer, 
+                    int maxBlank, MessageExtBatch messageExtBatch, 
+                    PutMessageContext putMessageContext) {
+                    return new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+                }
+            };
+            
+            // Create a mock message
+            MessageExtBrokerInner msg = new MessageExtBrokerInner();
+            msg.setBody("test message".getBytes());
+            
+            AppendMessageResult result = mappedFile.appendMessage(msg, 
errorCallback, new PutMessageContext("test-topic"));
+            
+            Assert.assertEquals("Should return error status", 
+                AppendMessageStatus.UNKNOWN_ERROR, result.getStatus());
+            
+        } finally {
+            mappedFile.destroy(0);
+        }
+    }
+
+    @Test
+    public void testCompactionAppendMsgCallbackErrorHandling() throws 
IOException {
+        DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, 
fileSize, true);
+        
+        try {
+            // Test with a callback that returns an error
+            CompactionAppendMsgCallback errorCallback = new 
CompactionAppendMsgCallback() {
+                @Override
+                public AppendMessageResult doAppend(ByteBuffer bbDest, long 
fileFromOffset, 
+                    int maxBlank, ByteBuffer bbSrc) {
+                    return new 
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+                }
+            };
+            
+            ByteBuffer testBuffer = ByteBuffer.wrap("test data".getBytes());
+            AppendMessageResult result = mappedFile.appendMessage(testBuffer, 
errorCallback);
+            
+            Assert.assertEquals("Should return error status", 
+                AppendMessageStatus.UNKNOWN_ERROR, result.getStatus());
+            
+        } finally {
+            mappedFile.destroy(0);
+        }
+    }
+
+    @Test
+    public void testWriteWithoutMmapWithNullRandomAccessFile() throws 
IOException {
+        DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, 
fileSize, true);
+        
+        try {
+            // Simulate the case where randomAccessFile is null
+            // This should fall back to normal behavior
+            byte[] testData = "test data".getBytes();
+            boolean result = mappedFile.appendMessage(testData);
+            
+            // Should still work, but using MappedByteBuffer
+            Assert.assertTrue("Should still work with null RandomAccessFile", 
result);
+            
+        } finally {
+            mappedFile.destroy(0);
+        }
+    }
+
+    @Test
+    public void testLargeDataWrite() throws IOException {
+        DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, 
fileSize, true);
+        
+        try {
+            // Test writing data that's close to the file size limit
+            byte[] largeData = new byte[fileSize - 100]; // Leave some space
+            for (int i = 0; i < largeData.length; i++) {
+                largeData[i] = (byte) (i % 256);
+            }
+            
+            boolean result = mappedFile.appendMessage(largeData);
+            Assert.assertTrue("Should successfully write large data", result);
+            Assert.assertEquals("Wrote position should match data size", 
+                largeData.length, mappedFile.getWrotePosition());
+            
+        } finally {
+            mappedFile.destroy(0);
+        }
+    }
+
+    @Test
+    public void testWriteBeyondFileSize() throws IOException {
+        DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, 
fileSize, true);
+        
+        try {
+            // Fill the file almost completely
+            byte[] data = new byte[fileSize - 10];
+            boolean result = mappedFile.appendMessage(data);
+            Assert.assertTrue("Should successfully write data", result);
+            
+            // Try to write more data than remaining space
+            byte[] overflowData = new byte[20]; // More than remaining 10 bytes
+            result = mappedFile.appendMessage(overflowData);
+            Assert.assertFalse("Should fail to write beyond file size", 
result);
+            
+        } finally {
+            mappedFile.destroy(0);
+        }
+    }
+
+    @Test
+    public void testFlushErrorHandling() throws IOException {
+        DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, 
fileSize, true);
+        
+        try {
+            // Write some data
+            byte[] testData = "test data for flush".getBytes();
+            mappedFile.appendMessage(testData);
+            
+            // Flush should succeed
+            int flushedPosition = mappedFile.flush(0);
+            Assert.assertTrue("Flush should succeed", flushedPosition > 0);
+            
+        } finally {
+            mappedFile.destroy(0);
+        }
+    }
+
+    @Test
+    public void testAppendMessageWithOffset() throws IOException {
+        DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, 
fileSize, true);
+        
+        try {
+            byte[] testData = "Hello, RocketMQ!".getBytes();
+            
+            // Test with valid offset
+            boolean result = mappedFile.appendMessage(testData, 0, 
testData.length);
+            Assert.assertTrue("Should successfully append with valid offset", 
result);
+            
+            // Test with invalid offset (beyond array length)
+            result = mappedFile.appendMessage(testData, testData.length + 1, 
1);
+            Assert.assertFalse("Should fail with invalid offset", result);
+            
+        } finally {
+            mappedFile.destroy(0);
+        }
+    }
+}
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFilePerformanceTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFilePerformanceTest.java
new file mode 100644
index 0000000000..b958487add
--- /dev/null
+++ 
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFilePerformanceTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.logfile;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.common.UtilAll;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DefaultMappedFilePerformanceTest {
+
+    private String storePath;
+    private String fileName;
+    private int fileSize = 10 * 1024 * 1024; // 10MB
+    private static final int WRITE_COUNT = 10000;
+    private static final int DATA_SIZE = 1024; // 1KB per write
+
+    @Before
+    public void setUp() throws Exception {
+        storePath = System.getProperty("user.home") + File.separator + 
"unitteststore" + System.currentTimeMillis();
+        fileName = storePath + File.separator + "00000000000000000000";
+        UtilAll.ensureDirOK(storePath);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        UtilAll.deleteFile(new File(storePath));
+    }
+
+    @Test
+    public void testWriteWithoutMmapPerformance() throws IOException {
+        DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, 
fileSize, true);
+        
+        try {
+            byte[] testData = new byte[DATA_SIZE];
+            for (int i = 0; i < testData.length; i++) {
+                testData[i] = (byte) (i % 256);
+            }
+            
+            long startTime = System.currentTimeMillis();
+            
+            for (int i = 0; i < WRITE_COUNT; i++) {
+                boolean result = mappedFile.appendMessage(testData);
+                Assert.assertTrue("Write should succeed", result);
+            }
+            
+            long endTime = System.currentTimeMillis();
+            long duration = endTime - startTime;
+            
+            // WriteWithoutMmap Performance:
+            //   Writes: WRITE_COUNT
+            //   Data size per write: DATA_SIZE bytes
+            //   Total data: (WRITE_COUNT * DATA_SIZE / 1024) KB
+            //   Duration: duration ms
+            //   Throughput: (WRITE_COUNT * DATA_SIZE / 1024.0 / duration * 
1000) KB/s
+            
+            Assert.assertEquals("Wrote position should match expected", 
+                WRITE_COUNT * DATA_SIZE, mappedFile.getWrotePosition());
+            
+        } finally {
+            mappedFile.destroy(0);
+        }
+    }
+
+    @Test
+    public void testWriteWithMmapPerformance() throws IOException {
+        DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, 
fileSize, false);
+        
+        try {
+            byte[] testData = new byte[DATA_SIZE];
+            for (int i = 0; i < testData.length; i++) {
+                testData[i] = (byte) (i % 256);
+            }
+            
+            long startTime = System.currentTimeMillis();
+            
+            for (int i = 0; i < WRITE_COUNT; i++) {
+                boolean result = mappedFile.appendMessage(testData);
+                Assert.assertTrue("Write should succeed", result);
+            }
+            
+            long endTime = System.currentTimeMillis();
+            long duration = endTime - startTime;
+            
+            // WriteWithMmap Performance:
+            //   Writes: WRITE_COUNT
+            //   Data size per write: DATA_SIZE bytes
+            //   Total data: (WRITE_COUNT * DATA_SIZE / 1024) KB
+            //   Duration: duration ms
+            //   Throughput: (WRITE_COUNT * DATA_SIZE / 1024.0 / duration * 
1000) KB/s
+            
+            Assert.assertEquals("Wrote position should match expected", 
+                WRITE_COUNT * DATA_SIZE, mappedFile.getWrotePosition());
+            
+        } finally {
+            mappedFile.destroy(0);
+        }
+    }
+
+    @Test
+    public void testFlushPerformance() throws IOException {
+        DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, 
fileSize, true);
+        
+        try {
+            // Write some data first
+            byte[] testData = new byte[DATA_SIZE];
+            for (int i = 0; i < testData.length; i++) {
+                testData[i] = (byte) (i % 256);
+            }
+            
+            for (int i = 0; i < 1000; i++) {
+                mappedFile.appendMessage(testData);
+            }
+            
+            long startTime = System.currentTimeMillis();
+            
+            int flushedPosition = mappedFile.flush(0);
+            
+            long endTime = System.currentTimeMillis();
+            long duration = endTime - startTime;
+            
+            // Flush Performance:
+            //   Flushed position: flushedPosition
+            //   Duration: duration ms
+            
+            Assert.assertTrue("Flush should succeed", flushedPosition > 0);
+            
+        } finally {
+            mappedFile.destroy(0);
+        }
+    }
+
+    @Test
+    public void testByteBufferWritePerformance() throws IOException {
+        DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, 
fileSize, true);
+        
+        try {
+            ByteBuffer testBuffer = ByteBuffer.allocate(DATA_SIZE);
+            for (int i = 0; i < DATA_SIZE; i++) {
+                testBuffer.put((byte) (i % 256));
+            }
+            
+            long startTime = System.currentTimeMillis();
+            
+            for (int i = 0; i < WRITE_COUNT; i++) {
+                testBuffer.rewind();
+                boolean result = mappedFile.appendMessage(testBuffer);
+                Assert.assertTrue("Write should succeed", result);
+            }
+            
+            long endTime = System.currentTimeMillis();
+            long duration = endTime - startTime;
+            
+            // ByteBuffer Write Performance:
+            //   Writes: WRITE_COUNT
+            //   Data size per write: DATA_SIZE bytes
+            //   Total data: (WRITE_COUNT * DATA_SIZE / 1024) KB
+            //   Duration: duration ms
+            //   Throughput: (WRITE_COUNT * DATA_SIZE / 1024.0 / duration * 
1000) KB/s
+            
+            Assert.assertEquals("Wrote position should match expected", 
+                WRITE_COUNT * DATA_SIZE, mappedFile.getWrotePosition());
+            
+        } finally {
+            mappedFile.destroy(0);
+        }
+    }
+
+    @Test
+    public void testMixedWriteOperations() throws IOException {
+        DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, 
fileSize, true);
+        
+        try {
+            byte[] testData = new byte[DATA_SIZE];
+            for (int i = 0; i < testData.length; i++) {
+                testData[i] = (byte) (i % 256);
+            }
+            
+            long startTime = System.currentTimeMillis();
+            
+            // Mix of different write operations
+            for (int i = 0; i < WRITE_COUNT / 4; i++) {
+                // appendMessage(byte[])
+                boolean result1 = mappedFile.appendMessage(testData);
+                Assert.assertTrue("Write should succeed", result1);
+                
+                // appendMessage(byte[], offset, length)
+                boolean result2 = mappedFile.appendMessage(testData, 0, 
testData.length);
+                Assert.assertTrue("Write should succeed", result2);
+                
+                // appendMessage(ByteBuffer)
+                ByteBuffer buffer = ByteBuffer.wrap(testData);
+                boolean result3 = mappedFile.appendMessage(buffer);
+                Assert.assertTrue("Write should succeed", result3);
+                
+                // appendMessageUsingFileChannel(byte[])
+                boolean result4 = 
mappedFile.appendMessageUsingFileChannel(testData);
+                Assert.assertTrue("Write should succeed", result4);
+            }
+            
+            long endTime = System.currentTimeMillis();
+            long duration = endTime - startTime;
+            
+            // Mixed Write Operations Performance:
+            //   Total operations: WRITE_COUNT
+            //   Data size per operation: DATA_SIZE bytes
+            //   Total data: (WRITE_COUNT * DATA_SIZE / 1024) KB
+            //   Duration: duration ms
+            //   Throughput: (WRITE_COUNT * DATA_SIZE / 1024.0 / duration * 
1000) KB/s
+            
+            Assert.assertEquals("Wrote position should match expected", 
+                WRITE_COUNT * DATA_SIZE, mappedFile.getWrotePosition());
+            
+        } finally {
+            mappedFile.destroy(0);
+        }
+    }
+}
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileWriteWithoutMmapTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileWriteWithoutMmapTest.java
new file mode 100644
index 0000000000..79bca016e4
--- /dev/null
+++ 
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileWriteWithoutMmapTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.logfile;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.store.TransientStorePool;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DefaultMappedFileWriteWithoutMmapTest {
+
+    private String storePath;
+    private String fileName;
+    private int fileSize = 1024 * 1024; // 1MB
+
+    @Before
+    public void setUp() throws Exception {
+        storePath = System.getProperty("user.home") + File.separator + 
"unitteststore" + System.currentTimeMillis();
+        fileName = storePath + File.separator + "00000000000000000000";
+        UtilAll.ensureDirOK(storePath);
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        UtilAll.deleteFile(new File(storePath));
+    }
+
+    @Test
+    public void testWriteWithoutMmapEnabled() throws IOException {
+        // Test with writeWithoutMmap = true
+        DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, 
fileSize, true);
+        
+        try {
+            // Test appendMessage with byte array
+            byte[] testData = "Hello, RocketMQ!".getBytes();
+            boolean result = mappedFile.appendMessage(testData);
+            Assert.assertTrue("Should successfully append message", result);
+            Assert.assertEquals("Wrote position should be updated", 
testData.length, mappedFile.getWrotePosition());
+            
+            // Test appendMessage with ByteBuffer
+            ByteBuffer buffer = ByteBuffer.wrap("Test ByteBuffer".getBytes());
+            result = mappedFile.appendMessage(buffer);
+            Assert.assertTrue("Should successfully append ByteBuffer", result);
+            Assert.assertEquals("Wrote position should be updated", 
testData.length + "Test ByteBuffer".length(), mappedFile.getWrotePosition());
+            
+            // Test flush
+            int flushedPosition = mappedFile.flush(0);
+            Assert.assertTrue("Flush should succeed", flushedPosition > 0);
+            
+        } finally {
+            mappedFile.destroy(0);
+        }
+    }
+
+    @Test
+    public void testWriteWithoutMmapDisabled() throws IOException {
+        // Test with writeWithoutMmap = false (default behavior)
+        DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, 
fileSize, false);
+        
+        try {
+            // Test appendMessage with byte array
+            byte[] testData = "Hello, RocketMQ!".getBytes();
+            boolean result = mappedFile.appendMessage(testData);
+            Assert.assertTrue("Should successfully append message", result);
+            Assert.assertEquals("Wrote position should be updated", 
testData.length, mappedFile.getWrotePosition());
+            
+            // Test appendMessage with ByteBuffer
+            ByteBuffer buffer = ByteBuffer.wrap("Test ByteBuffer".getBytes());
+            result = mappedFile.appendMessage(buffer);
+            Assert.assertTrue("Should successfully append ByteBuffer", result);
+            Assert.assertEquals("Wrote position should be updated", 
testData.length + "Test ByteBuffer".length(), mappedFile.getWrotePosition());
+            
+            // Test flush
+            int flushedPosition = mappedFile.flush(0);
+            Assert.assertTrue("Flush should succeed", flushedPosition > 0);
+            
+        } finally {
+            mappedFile.destroy(0);
+        }
+    }
+
+    @Test
+    public void testWriteWithoutMmapWithTransientStorePool() throws 
IOException {
+        // Test with writeWithoutMmap = true and TransientStorePool
+        TransientStorePool transientStorePool = new TransientStorePool(5, 
fileSize);
+        DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, 
fileSize, transientStorePool, true);
+        
+        try {
+            // Test appendMessage with byte array
+            byte[] testData = "Hello, RocketMQ with 
TransientStorePool!".getBytes();
+            boolean result = mappedFile.appendMessage(testData);
+            Assert.assertTrue("Should successfully append message", result);
+            Assert.assertEquals("Wrote position should be updated", 
testData.length, mappedFile.getWrotePosition());
+            
+        } finally {
+            mappedFile.destroy(0);
+        }
+    }
+
+    @Test
+    public void testAppendMessageWithOffset() throws IOException {
+        DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, 
fileSize, true);
+        
+        try {
+            byte[] testData = "Hello, RocketMQ with offset!".getBytes();
+            boolean result = mappedFile.appendMessage(testData, 0, 
testData.length);
+            Assert.assertTrue("Should successfully append message with 
offset", result);
+            Assert.assertEquals("Wrote position should be updated", 
testData.length, mappedFile.getWrotePosition());
+            
+        } finally {
+            mappedFile.destroy(0);
+        }
+    }
+
+    @Test
+    public void testAppendMessageUsingFileChannel() throws IOException {
+        DefaultMappedFile mappedFile = new DefaultMappedFile(fileName, 
fileSize, true);
+        
+        try {
+            byte[] testData = "Hello, RocketMQ using FileChannel!".getBytes();
+            boolean result = 
mappedFile.appendMessageUsingFileChannel(testData);
+            Assert.assertTrue("Should successfully append message using 
FileChannel", result);
+            Assert.assertEquals("Wrote position should be updated", 
testData.length, mappedFile.getWrotePosition());
+            
+        } finally {
+            mappedFile.destroy(0);
+        }
+    }
+}

Reply via email to