This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 4ae02942e5 [ISSUE #9693] Add writeWithoutMmap configuration to prevent
JVM crash when device becomes read-only (#9694)
4ae02942e5 is described below
commit 4ae02942e58a519829a4b50c6a4d50e6c631cab2
Author: guyinyou <[email protected]>
AuthorDate: Sun Sep 14 14:43:06 2025 +0800
[ISSUE #9693] Add writeWithoutMmap configuration to prevent JVM crash when
device becomes read-only (#9694)
* add "writeWithoutMmap"
* fix npe
---------
Co-authored-by: guyinyou <[email protected]>
---
.../rocketmq/store/AllocateMappedFileService.java | 9 +-
.../java/org/apache/rocketmq/store/CommitLog.java | 3 +-
.../org/apache/rocketmq/store/ConsumeQueue.java | 10 +-
.../org/apache/rocketmq/store/ConsumeQueueExt.java | 36 ++++
.../org/apache/rocketmq/store/MappedFileQueue.java | 17 +-
.../rocketmq/store/MultiPathMappedFileQueue.java | 3 +-
.../rocketmq/store/config/MessageStoreConfig.java | 15 ++
.../rocketmq/store/logfile/DefaultMappedFile.java | 200 ++++++++++++++---
.../rocketmq/store/queue/BatchConsumeQueue.java | 11 +-
.../logfile/DefaultMappedFileConcurrencyTest.java | 191 +++++++++++++++++
.../DefaultMappedFileErrorHandlingTest.java | 210 ++++++++++++++++++
.../logfile/DefaultMappedFilePerformanceTest.java | 236 +++++++++++++++++++++
.../DefaultMappedFileWriteWithoutMmapTest.java | 148 +++++++++++++
13 files changed, 1052 insertions(+), 37 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
index d9cd602a65..a56fa46157 100644
---
a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
+++
b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
@@ -172,16 +172,17 @@ public class AllocateMappedFileService extends
ServiceThread {
long beginTime = System.currentTimeMillis();
MappedFile mappedFile;
+ boolean writeWithoutMmap =
messageStore.getMessageStoreConfig().isWriteWithoutMmap();
if (messageStore.isTransientStorePoolEnable()) {
try {
mappedFile =
ServiceLoader.load(MappedFile.class).iterator().next();
mappedFile.init(req.getFilePath(), req.getFileSize(),
messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
- mappedFile = new DefaultMappedFile(req.getFilePath(),
req.getFileSize(), messageStore.getTransientStorePool());
+ mappedFile = new DefaultMappedFile(req.getFilePath(),
req.getFileSize(), messageStore.getTransientStorePool(), writeWithoutMmap);
}
} else {
- mappedFile = new DefaultMappedFile(req.getFilePath(),
req.getFileSize());
+ mappedFile = new DefaultMappedFile(req.getFilePath(),
req.getFileSize(), writeWithoutMmap);
}
long elapsedTime =
UtilAll.computeElapsedTimeMilliseconds(beginTime);
@@ -195,7 +196,9 @@ public class AllocateMappedFileService extends
ServiceThread {
if (mappedFile.getFileSize() >=
this.messageStore.getMessageStoreConfig()
.getMappedFileSizeCommitLog()
&&
-
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
+
this.messageStore.getMessageStoreConfig().isWarmMapedFileEnable()
+ &&
+
!this.messageStore.getMessageStoreConfig().isWriteWithoutMmap()) {
mappedFile.warmMappedFile(this.messageStore.getMessageStoreConfig().getFlushDiskType(),
this.messageStore.getMessageStoreConfig().getFlushLeastPagesWhenWarmMapedFile());
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index a4bdb7851d..3b26afcc09 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -118,7 +118,8 @@ public class CommitLog implements Swappable {
} else {
this.mappedFileQueue = new MappedFileQueue(storePath,
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
- messageStore.getAllocateMappedFileService());
+ messageStore.getAllocateMappedFileService(),
+ messageStore.getMessageStoreConfig().isWriteWithoutMmap());
}
this.defaultMessageStore = messageStore;
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 2850299b7d..02f90cef1d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -98,7 +98,12 @@ public class ConsumeQueue implements ConsumeQueueInterface,
FileQueueLifeCycle {
+ File.separator + topic
+ File.separator + queueId;
- this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize,
null);
+ boolean writeWithoutMmap = false;
+ if (messageStore.getMessageStoreConfig() != null) {
+ writeWithoutMmap =
messageStore.getMessageStoreConfig().isWriteWithoutMmap();
+ }
+
+ this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize,
null, writeWithoutMmap);
this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
@@ -108,7 +113,8 @@ public class ConsumeQueue implements ConsumeQueueInterface,
FileQueueLifeCycle {
queueId,
StorePathConfigHelper.getStorePathConsumeQueueExt(messageStore.getMessageStoreConfig().getStorePathRootDir()),
messageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
-
messageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
+
messageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt(),
+ writeWithoutMmap
);
}
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
index 3f266378df..641f672bba 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueueExt.java
@@ -90,6 +90,42 @@ public class ConsumeQueueExt {
}
}
+ /**
+ * Constructor with writeWithoutMmap support.
+ *
+ * @param topic topic
+ * @param queueId id of queue
+ * @param storePath root dir of files to store.
+ * @param mappedFileSize file size
+ * @param bitMapLength bit map length.
+ * @param writeWithoutMmap whether to use RandomAccessFile instead of
MappedByteBuffer
+ */
+ public ConsumeQueueExt(final String topic,
+ final int queueId,
+ final String storePath,
+ final int mappedFileSize,
+ final int bitMapLength,
+ final boolean writeWithoutMmap) {
+
+ this.storePath = storePath;
+ this.mappedFileSize = mappedFileSize;
+
+ this.topic = topic;
+ this.queueId = queueId;
+
+ String queueDir = this.storePath
+ + File.separator + topic
+ + File.separator + queueId;
+
+ this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize,
null, writeWithoutMmap);
+
+ if (bitMapLength > 0) {
+ this.tempContainer = ByteBuffer.allocate(
+ bitMapLength / Byte.SIZE
+ );
+ }
+ }
+
public long getTotalSize() {
return this.mappedFileQueue.getTotalFileSize();
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index e32c16a82a..320e842154 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -53,6 +53,11 @@ public class MappedFileQueue implements Swappable {
protected long committedWhere = 0;
protected volatile long storeTimestamp = 0;
+
+ /**
+ * Configuration flag to use RandomAccessFile instead of MappedByteBuffer
for writing
+ */
+ protected boolean writeWithoutMmap = false;
public MappedFileQueue(final String storePath, int mappedFileSize,
AllocateMappedFileService allocateMappedFileService) {
@@ -61,6 +66,14 @@ public class MappedFileQueue implements Swappable {
this.allocateMappedFileService = allocateMappedFileService;
}
+ public MappedFileQueue(final String storePath, int mappedFileSize,
+ AllocateMappedFileService allocateMappedFileService, boolean
writeWithoutMmap) {
+ this.storePath = storePath;
+ this.mappedFileSize = mappedFileSize;
+ this.allocateMappedFileService = allocateMappedFileService;
+ this.writeWithoutMmap = writeWithoutMmap;
+ }
+
public void checkSelf() {
List<MappedFile> mappedFiles = new ArrayList<>(this.mappedFiles);
if (!mappedFiles.isEmpty()) {
@@ -266,7 +279,7 @@ public class MappedFileQueue implements Swappable {
}
try {
- MappedFile mappedFile = new DefaultMappedFile(file.getPath(),
mappedFileSize);
+ MappedFile mappedFile = new DefaultMappedFile(file.getPath(),
mappedFileSize, writeWithoutMmap);
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
@@ -356,7 +369,7 @@ public class MappedFileQueue implements Swappable {
nextNextFilePath, this.mappedFileSize);
} else {
try {
- mappedFile = new DefaultMappedFile(nextFilePath,
this.mappedFileSize);
+ mappedFile = new DefaultMappedFile(nextFilePath,
this.mappedFileSize, this.writeWithoutMmap);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
index 8ff050dfe3..72ec8820a6 100644
---
a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
+++
b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
@@ -39,7 +39,8 @@ public class MultiPathMappedFileQueue extends MappedFileQueue
{
public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int
mappedFileSize,
AllocateMappedFileService
allocateMappedFileService,
Supplier<Set<String>>
fullStorePathsSupplier) {
- super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize,
allocateMappedFileService);
+ super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize,
allocateMappedFileService,
+ messageStoreConfig.isWriteWithoutMmap());
this.config = messageStoreConfig;
this.fullStorePathsSupplier = fullStorePathsSupplier;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index a142eca86f..e4c3a4045f 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -238,6 +238,13 @@ public class MessageStoreConfig {
private int transientStorePoolSize = 5;
private boolean fastFailIfNoBufferInStorePool = false;
+ /**
+ * When true, use RandomAccessFile for writing instead of MappedByteBuffer.
+ * This can be useful for certain scenarios where mmap is not desired.
+ */
+ @ImportantField
+ private boolean writeWithoutMmap = false;
+
// DLedger message store config
private boolean enableDLegerCommitLog = false;
private String dLegerGroup;
@@ -1147,6 +1154,14 @@ public class MessageStoreConfig {
this.transientStorePoolEnable = transientStorePoolEnable;
}
+ public boolean isWriteWithoutMmap() {
+ return writeWithoutMmap;
+ }
+
+ public void setWriteWithoutMmap(final boolean writeWithoutMmap) {
+ this.writeWithoutMmap = writeWithoutMmap;
+ }
+
public int getTransientStorePoolSize() {
return transientStorePoolSize;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index c490d093a1..b2d89108b4 100644
---
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@ -33,9 +33,11 @@ import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.Iterator;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import org.apache.commons.lang3.SystemUtils;
import org.apache.rocketmq.common.UtilAll;
@@ -79,11 +81,19 @@ public class DefaultMappedFile extends AbstractMappedFile {
protected volatile int flushedPosition;
protected int fileSize;
protected FileChannel fileChannel;
+ /**
+ * RandomAccessFile for writing when writeWithoutMmap is enabled
+ */
+ protected RandomAccessFile randomAccessFile = null;
/**
* Message will put to here first, and then reput to FileChannel if
writeBuffer is not null.
*/
protected ByteBuffer writeBuffer = null;
protected TransientStorePool transientStorePool = null;
+ /**
+ * Configuration flag to use RandomAccessFile instead of MappedByteBuffer
for writing
+ */
+ protected boolean writeWithoutMmap = false;
protected String fileName;
protected long fileFromOffset;
protected File file;
@@ -108,6 +118,28 @@ public class DefaultMappedFile extends AbstractMappedFile {
*/
private long stopTimestamp = -1;
+ private static int maxSharedNum = 16;
+ private static final SharedByteBuffer[] SHARED_BYTE_BUFFER;
+
+ static class SharedByteBuffer {
+ private final ReentrantLock lock;
+ private final ByteBuffer buffer;
+
+ public SharedByteBuffer(int size) {
+ this.lock = new ReentrantLock();
+ this.buffer = ByteBuffer.allocate(size);
+ }
+
+ public void release() {
+ this.lock.unlock();
+ }
+
+ public ByteBuffer acquire() {
+ this.lock.lock();
+ return buffer;
+ }
+ }
+
static {
WROTE_POSITION_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class, "wrotePosition");
COMMITTED_POSITION_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(DefaultMappedFile.class,
"committedPosition");
@@ -124,6 +156,17 @@ public class DefaultMappedFile extends AbstractMappedFile {
}
}
IS_LOADED_METHOD = isLoaded0method;
+
+ SHARED_BYTE_BUFFER = new SharedByteBuffer[maxSharedNum];
+ for (int i = 0; i < maxSharedNum; i++) {
+ SHARED_BYTE_BUFFER[i] = new SharedByteBuffer(4 * 1024 * 1024);
+ }
+ }
+
+ private static SharedByteBuffer borrowSharedByteBuffer() {
+ int idx = ThreadLocalRandom.current().nextInt(maxSharedNum);
+ SharedByteBuffer buffer = SHARED_BYTE_BUFFER[idx];
+ return buffer;
}
public DefaultMappedFile() {
@@ -138,6 +181,18 @@ public class DefaultMappedFile extends AbstractMappedFile {
init(fileName, fileSize, transientStorePool);
}
+ public DefaultMappedFile(final String fileName, final int fileSize,
+ final boolean writeWithoutMmap) throws IOException {
+ this.writeWithoutMmap = writeWithoutMmap;
+ init(fileName, fileSize);
+ }
+
+ public DefaultMappedFile(final String fileName, final int fileSize,
+ final TransientStorePool transientStorePool, final boolean
writeWithoutMmap) throws IOException {
+ this.writeWithoutMmap = writeWithoutMmap;
+ init(fileName, fileSize, transientStorePool);
+ }
+
public static int getTotalMappedFiles() {
return TOTAL_MAPPED_FILES.get();
}
@@ -150,8 +205,10 @@ public class DefaultMappedFile extends AbstractMappedFile {
public void init(final String fileName, final int fileSize,
final TransientStorePool transientStorePool) throws IOException {
init(fileName, fileSize);
- this.writeBuffer = transientStorePool.borrowBuffer();
- this.transientStorePool = transientStorePool;
+ if (transientStorePool != null) {
+ this.writeBuffer = transientStorePool.borrowBuffer();
+ this.transientStorePool = transientStorePool;
+ }
}
private void init(final String fileName, final int fileSize) throws
IOException {
@@ -165,7 +222,17 @@ public class DefaultMappedFile extends AbstractMappedFile {
try {
this.fileChannel = new RandomAccessFile(this.file,
"rw").getChannel();
- this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE,
0, fileSize);
+
+ if (writeWithoutMmap) {
+ // Use RandomAccessFile for writing instead of MappedByteBuffer
+ this.randomAccessFile = new RandomAccessFile(this.file, "rw");
+ // Still create MappedByteBuffer for reading operations
+ this.mappedByteBuffer =
this.fileChannel.map(MapMode.READ_ONLY, 0, fileSize);
+ } else {
+ // Use MappedByteBuffer for both reading and writing (default
behavior)
+ this.mappedByteBuffer =
this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
+ }
+
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
@@ -179,6 +246,9 @@ public class DefaultMappedFile extends AbstractMappedFile {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
+ if (!ok && this.randomAccessFile != null) {
+ this.randomAccessFile.close();
+ }
}
}
@@ -243,10 +313,35 @@ public class DefaultMappedFile extends AbstractMappedFile
{
assert cb != null;
int currentPos = WROTE_POSITION_UPDATER.get(this);
+ long fileFromOffset = this.getFileFromOffset();
+
if (currentPos < this.fileSize) {
- ByteBuffer byteBuffer = appendMessageBuffer().slice();
- byteBuffer.position(currentPos);
- AppendMessageResult result = cb.doAppend(byteBuffer,
this.fileFromOffset, this.fileSize - currentPos, byteBufferMsg);
+ SharedByteBuffer sharedByteBuffer = null;
+ ByteBuffer byteBuffer;
+ if (writeWithoutMmap && randomAccessFile != null) {
+ sharedByteBuffer = borrowSharedByteBuffer();
+ byteBuffer = sharedByteBuffer.acquire();
+ byteBuffer.position(0).limit(byteBuffer.capacity());
+ fileFromOffset += currentPos;
+ } else {
+ byteBuffer = appendMessageBuffer().slice();
+ byteBuffer.position(currentPos);
+ }
+
+ AppendMessageResult result = cb.doAppend(byteBuffer,
fileFromOffset, this.fileSize - currentPos, byteBufferMsg);
+
+ if (sharedByteBuffer != null) {
+ try {
+ randomAccessFile.seek(currentPos);
+ randomAccessFile.write(byteBuffer.array(), 0,
result.getWroteBytes());
+ } catch (Throwable t) {
+ log.error("Failed to write to mappedFile {}",
this.fileName, t);
+ return new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+ } finally {
+ sharedByteBuffer.release();
+ }
+ }
+
WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
@@ -273,22 +368,46 @@ public class DefaultMappedFile extends AbstractMappedFile
{
assert cb != null;
int currentPos = WROTE_POSITION_UPDATER.get(this);
+ long fileFromOffset = this.getFileFromOffset();
if (currentPos < this.fileSize) {
- ByteBuffer byteBuffer = appendMessageBuffer().slice();
- byteBuffer.position(currentPos);
+ SharedByteBuffer sharedByteBuffer = null;
+ ByteBuffer byteBuffer;
+ if (writeWithoutMmap && randomAccessFile != null) {
+ sharedByteBuffer = borrowSharedByteBuffer();
+ byteBuffer = sharedByteBuffer.acquire();
+ byteBuffer.position(0).limit(byteBuffer.capacity());
+ fileFromOffset += currentPos;
+ } else {
+ byteBuffer = appendMessageBuffer().slice();
+ byteBuffer.position(currentPos);
+ }
+
AppendMessageResult result;
if (messageExt instanceof MessageExtBatch && !((MessageExtBatch)
messageExt).isInnerBatch()) {
// traditional batch message
- result = cb.doAppend(this.getFileFromOffset(), byteBuffer,
this.fileSize - currentPos,
+ result = cb.doAppend(fileFromOffset, byteBuffer, this.fileSize
- currentPos,
(MessageExtBatch) messageExt, putMessageContext);
} else if (messageExt instanceof MessageExtBrokerInner) {
// traditional single message or newly introduced inner-batch
message
- result = cb.doAppend(this.getFileFromOffset(), byteBuffer,
this.fileSize - currentPos,
+ result = cb.doAppend(fileFromOffset, byteBuffer, this.fileSize
- currentPos,
(MessageExtBrokerInner) messageExt, putMessageContext);
} else {
return new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
+
+ if (sharedByteBuffer != null) {
+ try {
+ randomAccessFile.seek(currentPos);
+ randomAccessFile.write(byteBuffer.array(), 0,
result.getWroteBytes());
+ } catch (Throwable t) {
+ log.error("Failed to write to mappedFile {}",
this.fileName, t);
+ return new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+ } finally {
+ sharedByteBuffer.release();
+ }
+ }
+
WROTE_POSITION_UPDATER.addAndGet(this, result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
@@ -319,15 +438,25 @@ public class DefaultMappedFile extends AbstractMappedFile
{
if ((currentPos + remaining) <= this.fileSize) {
try {
- this.fileChannel.position(currentPos);
- while (data.hasRemaining()) {
- this.fileChannel.write(data);
+ if (writeWithoutMmap && randomAccessFile != null) {
+ // Use RandomAccessFile for writing
+ randomAccessFile.seek(currentPos);
+ byte[] buffer = new byte[remaining];
+ data.get(buffer);
+ randomAccessFile.write(buffer);
+ } else {
+ // Use FileChannel for writing (default behavior)
+ this.fileChannel.position(currentPos);
+ while (data.hasRemaining()) {
+ this.fileChannel.write(data);
+ }
}
+ WROTE_POSITION_UPDATER.addAndGet(this, remaining);
+ return true;
} catch (Throwable e) {
log.error("Error occurred when append message to mappedFile.",
e);
+ return false;
}
- WROTE_POSITION_UPDATER.addAndGet(this, remaining);
- return true;
}
return false;
}
@@ -344,14 +473,22 @@ public class DefaultMappedFile extends AbstractMappedFile
{
if ((currentPos + length) <= this.fileSize) {
try {
- ByteBuffer buf = this.mappedByteBuffer.slice();
- buf.position(currentPos);
- buf.put(data, offset, length);
+ if (writeWithoutMmap && randomAccessFile != null) {
+ // Use RandomAccessFile for writing
+ randomAccessFile.seek(currentPos);
+ randomAccessFile.write(data, offset, length);
+ } else {
+ // Use MappedByteBuffer for writing (default behavior)
+ ByteBuffer buf = this.mappedByteBuffer.slice();
+ buf.position(currentPos);
+ buf.put(data, offset, length);
+ }
+ WROTE_POSITION_UPDATER.addAndGet(this, length);
+ return true;
} catch (Throwable e) {
log.error("Error occurred when append message to mappedFile.",
e);
+ return false;
}
- WROTE_POSITION_UPDATER.addAndGet(this, length);
- return true;
}
return false;
@@ -365,11 +502,12 @@ public class DefaultMappedFile extends AbstractMappedFile
{
try {
this.fileChannel.position(currentPos);
this.fileChannel.write(ByteBuffer.wrap(data, 0, data.length));
+ WROTE_POSITION_UPDATER.addAndGet(this, data.length);
+ return true;
} catch (Throwable e) {
log.error("Error occurred when append message to mappedFile.",
e);
+ return false;
}
- WROTE_POSITION_UPDATER.addAndGet(this, data.length);
- return true;
}
return false;
@@ -387,11 +525,16 @@ public class DefaultMappedFile extends AbstractMappedFile
{
try {
this.mappedByteBufferAccessCountSinceLastSwap++;
- //We only append data to fileChannel or mappedByteBuffer,
never both.
- if (writeBuffer != null || this.fileChannel.position() !=
0) {
- this.fileChannel.force(false);
+ if (writeWithoutMmap && randomAccessFile != null) {
+ // Use RandomAccessFile for flushing
+ randomAccessFile.getChannel().force(false);
} else {
- this.mappedByteBuffer.force();
+ //We only append data to fileChannel or
mappedByteBuffer, never both.
+ if (writeBuffer != null || this.fileChannel.position()
!= 0) {
+ this.fileChannel.force(false);
+ } else {
+ this.mappedByteBuffer.force();
+ }
}
this.lastFlushTime = System.currentTimeMillis();
} catch (Throwable e) {
@@ -574,6 +717,11 @@ public class DefaultMappedFile extends AbstractMappedFile {
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
+ if (this.randomAccessFile != null) {
+ this.randomAccessFile.close();
+ log.info("close random access file " + this.fileName + "
OK");
+ }
+
long beginTime = System.currentTimeMillis();
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " +
this.fileName
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index 21181faf3b..3f1dc237d6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -105,12 +105,19 @@ public class BatchConsumeQueue implements
ConsumeQueueInterface {
this.topic = topic;
this.queueId = queueId;
+ boolean writeWithoutMmap = false;
+ if (messageStore.getMessageStoreConfig() != null) {
+ writeWithoutMmap =
messageStore.getMessageStoreConfig().isWriteWithoutMmap();
+ }
+
if (StringUtils.isBlank(subfolder)) {
String queueDir = this.storePath + File.separator + topic +
File.separator + queueId;
- this.mappedFileQueue = new MappedFileQueue(queueDir,
mappedFileSize, null);
+ this.mappedFileQueue = new MappedFileQueue(queueDir,
mappedFileSize, null,
+ writeWithoutMmap);
} else {
String queueDir = this.storePath + File.separator + topic +
File.separator + queueId + File.separator + subfolder;
- this.mappedFileQueue = new MappedFileQueue(queueDir,
mappedFileSize, null);
+ this.mappedFileQueue = new MappedFileQueue(queueDir,
mappedFileSize, null,
+ writeWithoutMmap);
}
this.byteBufferItem = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
diff --git
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileConcurrencyTest.java
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileConcurrencyTest.java
new file mode 100644
index 0000000000..06f94727d6
--- /dev/null
+++
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileConcurrencyTest.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.logfile;
+
+import java.io.File;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.rocketmq.common.UtilAll;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DefaultMappedFileConcurrencyTest {
+
+ private String storePath;
+ private String fileName;
+ private int fileSize = 1024 * 1024; // 1MB
+ private static final int THREAD_COUNT = 10;
+ private static final int OPERATIONS_PER_THREAD = 100;
+
+ @Before
+ public void setUp() throws Exception {
+ storePath = System.getProperty("user.home") + File.separator +
"unitteststore" + System.currentTimeMillis();
+ fileName = storePath + File.separator + "00000000000000000000";
+ UtilAll.ensureDirOK(storePath);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ UtilAll.deleteFile(new File(storePath));
+ }
+
+ @Test
+ public void testConcurrentWriteWithoutMmap() throws Exception {
+ DefaultMappedFile mappedFile = new DefaultMappedFile(fileName,
fileSize, true);
+
+ try {
+ ExecutorService executor =
Executors.newFixedThreadPool(THREAD_COUNT);
+ CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
+ AtomicInteger successCount = new AtomicInteger(0);
+ AtomicInteger errorCount = new AtomicInteger(0);
+
+ for (int i = 0; i < THREAD_COUNT; i++) {
+ final int threadId = i;
+ executor.submit(() -> {
+ try {
+ for (int j = 0; j < OPERATIONS_PER_THREAD; j++) {
+ String data =
String.format("Thread-%d-Operation-%d", threadId, j);
+ byte[] bytes = data.getBytes();
+
+ boolean result = mappedFile.appendMessage(bytes);
+ if (result) {
+ successCount.incrementAndGet();
+ } else {
+ errorCount.incrementAndGet();
+ }
+ }
+ } catch (Exception e) {
+ errorCount.incrementAndGet();
+ e.printStackTrace();
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ latch.await();
+ executor.shutdown();
+
+ // Success count: successCount.get()
+ // Error count: errorCount.get()
+ // Final wrote position: mappedFile.getWrotePosition()
+
+ // All operations should succeed
+ Assert.assertEquals("All write operations should succeed",
+ THREAD_COUNT * OPERATIONS_PER_THREAD, successCount.get());
+ Assert.assertEquals("No errors should occur", 0, errorCount.get());
+
+ } finally {
+ mappedFile.destroy(0);
+ }
+ }
+
+ @Test
+ public void testConcurrentWriteWithMmap() throws Exception {
+ DefaultMappedFile mappedFile = new DefaultMappedFile(fileName,
fileSize, false);
+
+ try {
+ ExecutorService executor =
Executors.newFixedThreadPool(THREAD_COUNT);
+ CountDownLatch latch = new CountDownLatch(THREAD_COUNT);
+ AtomicInteger successCount = new AtomicInteger(0);
+ AtomicInteger errorCount = new AtomicInteger(0);
+
+ for (int i = 0; i < THREAD_COUNT; i++) {
+ final int threadId = i;
+ executor.submit(() -> {
+ try {
+ for (int j = 0; j < OPERATIONS_PER_THREAD; j++) {
+ String data =
String.format("Thread-%d-Operation-%d", threadId, j);
+ byte[] bytes = data.getBytes();
+
+ boolean result = mappedFile.appendMessage(bytes);
+ if (result) {
+ successCount.incrementAndGet();
+ } else {
+ errorCount.incrementAndGet();
+ }
+ }
+ } catch (Exception e) {
+ errorCount.incrementAndGet();
+ e.printStackTrace();
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ latch.await();
+ executor.shutdown();
+
+ // Success count: successCount.get()
+ // Error count: errorCount.get()
+ // Final wrote position: mappedFile.getWrotePosition()
+
+ // All operations should succeed
+ Assert.assertEquals("All write operations should succeed",
+ THREAD_COUNT * OPERATIONS_PER_THREAD, successCount.get());
+ Assert.assertEquals("No errors should occur", 0, errorCount.get());
+
+ } finally {
+ mappedFile.destroy(0);
+ }
+ }
+
+ @Test
+ public void testConcurrentFlush() throws Exception {
+ DefaultMappedFile mappedFile = new DefaultMappedFile(fileName,
fileSize, true);
+
+ try {
+ // Write some data first
+ for (int i = 0; i < 100; i++) {
+ String data = "Test data " + i;
+ mappedFile.appendMessage(data.getBytes());
+ }
+
+ ExecutorService executor = Executors.newFixedThreadPool(5);
+ CountDownLatch latch = new CountDownLatch(5);
+ AtomicInteger flushCount = new AtomicInteger(0);
+
+ for (int i = 0; i < 5; i++) {
+ executor.submit(() -> {
+ try {
+ int flushed = mappedFile.flush(0);
+ if (flushed > 0) {
+ flushCount.incrementAndGet();
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ } finally {
+ latch.countDown();
+ }
+ });
+ }
+
+ latch.await();
+ executor.shutdown();
+
+ Assert.assertTrue("At least one flush should succeed",
flushCount.get() > 0);
+
+ } finally {
+ mappedFile.destroy(0);
+ }
+ }
+}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileErrorHandlingTest.java
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileErrorHandlingTest.java
new file mode 100644
index 0000000000..649e8071cc
--- /dev/null
+++
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileErrorHandlingTest.java
@@ -0,0 +1,210 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.logfile;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.common.message.MessageExtBrokerInner;
+import org.apache.rocketmq.common.message.MessageExtBatch;
+import org.apache.rocketmq.store.AppendMessageCallback;
+import org.apache.rocketmq.store.AppendMessageResult;
+import org.apache.rocketmq.store.AppendMessageStatus;
+import org.apache.rocketmq.store.CompactionAppendMsgCallback;
+import org.apache.rocketmq.store.PutMessageContext;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DefaultMappedFileErrorHandlingTest {
+
+ private String storePath;
+ private String fileName;
+ private int fileSize = 1024 * 1024; // 1MB
+
+ @Before
+ public void setUp() throws Exception {
+ storePath = System.getProperty("user.home") + File.separator +
"unitteststore" + System.currentTimeMillis();
+ fileName = storePath + File.separator + "00000000000000000000";
+ UtilAll.ensureDirOK(storePath);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ UtilAll.deleteFile(new File(storePath));
+ }
+
+ @Test
+ public void testAppendMessageCallbackErrorHandling() throws IOException {
+ DefaultMappedFile mappedFile = new DefaultMappedFile(fileName,
fileSize, true);
+
+ try {
+ // Test with a callback that returns an error
+ AppendMessageCallback errorCallback = new AppendMessageCallback() {
+ @Override
+ public AppendMessageResult doAppend(long fileFromOffset,
ByteBuffer byteBuffer,
+ int maxBlank, MessageExtBrokerInner msg,
+ PutMessageContext putMessageContext) {
+ return new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+ }
+
+ @Override
+ public AppendMessageResult doAppend(long fileFromOffset,
ByteBuffer byteBuffer,
+ int maxBlank, MessageExtBatch messageExtBatch,
+ PutMessageContext putMessageContext) {
+ return new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+ }
+ };
+
+ // Create a mock message
+ MessageExtBrokerInner msg = new MessageExtBrokerInner();
+ msg.setBody("test message".getBytes());
+
+ AppendMessageResult result = mappedFile.appendMessage(msg,
errorCallback, new PutMessageContext("test-topic"));
+
+ Assert.assertEquals("Should return error status",
+ AppendMessageStatus.UNKNOWN_ERROR, result.getStatus());
+
+ } finally {
+ mappedFile.destroy(0);
+ }
+ }
+
+ @Test
+ public void testCompactionAppendMsgCallbackErrorHandling() throws
IOException {
+ DefaultMappedFile mappedFile = new DefaultMappedFile(fileName,
fileSize, true);
+
+ try {
+ // Test with a callback that returns an error
+ CompactionAppendMsgCallback errorCallback = new
CompactionAppendMsgCallback() {
+ @Override
+ public AppendMessageResult doAppend(ByteBuffer bbDest, long
fileFromOffset,
+ int maxBlank, ByteBuffer bbSrc) {
+ return new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
+ }
+ };
+
+ ByteBuffer testBuffer = ByteBuffer.wrap("test data".getBytes());
+ AppendMessageResult result = mappedFile.appendMessage(testBuffer,
errorCallback);
+
+ Assert.assertEquals("Should return error status",
+ AppendMessageStatus.UNKNOWN_ERROR, result.getStatus());
+
+ } finally {
+ mappedFile.destroy(0);
+ }
+ }
+
+ @Test
+ public void testWriteWithoutMmapWithNullRandomAccessFile() throws
IOException {
+ DefaultMappedFile mappedFile = new DefaultMappedFile(fileName,
fileSize, true);
+
+ try {
+ // Simulate the case where randomAccessFile is null
+ // This should fall back to normal behavior
+ byte[] testData = "test data".getBytes();
+ boolean result = mappedFile.appendMessage(testData);
+
+ // Should still work, but using MappedByteBuffer
+ Assert.assertTrue("Should still work with null RandomAccessFile",
result);
+
+ } finally {
+ mappedFile.destroy(0);
+ }
+ }
+
+ @Test
+ public void testLargeDataWrite() throws IOException {
+ DefaultMappedFile mappedFile = new DefaultMappedFile(fileName,
fileSize, true);
+
+ try {
+ // Test writing data that's close to the file size limit
+ byte[] largeData = new byte[fileSize - 100]; // Leave some space
+ for (int i = 0; i < largeData.length; i++) {
+ largeData[i] = (byte) (i % 256);
+ }
+
+ boolean result = mappedFile.appendMessage(largeData);
+ Assert.assertTrue("Should successfully write large data", result);
+ Assert.assertEquals("Wrote position should match data size",
+ largeData.length, mappedFile.getWrotePosition());
+
+ } finally {
+ mappedFile.destroy(0);
+ }
+ }
+
+ @Test
+ public void testWriteBeyondFileSize() throws IOException {
+ DefaultMappedFile mappedFile = new DefaultMappedFile(fileName,
fileSize, true);
+
+ try {
+ // Fill the file almost completely
+ byte[] data = new byte[fileSize - 10];
+ boolean result = mappedFile.appendMessage(data);
+ Assert.assertTrue("Should successfully write data", result);
+
+ // Try to write more data than remaining space
+ byte[] overflowData = new byte[20]; // More than remaining 10 bytes
+ result = mappedFile.appendMessage(overflowData);
+ Assert.assertFalse("Should fail to write beyond file size",
result);
+
+ } finally {
+ mappedFile.destroy(0);
+ }
+ }
+
+ @Test
+ public void testFlushErrorHandling() throws IOException {
+ DefaultMappedFile mappedFile = new DefaultMappedFile(fileName,
fileSize, true);
+
+ try {
+ // Write some data
+ byte[] testData = "test data for flush".getBytes();
+ mappedFile.appendMessage(testData);
+
+ // Flush should succeed
+ int flushedPosition = mappedFile.flush(0);
+ Assert.assertTrue("Flush should succeed", flushedPosition > 0);
+
+ } finally {
+ mappedFile.destroy(0);
+ }
+ }
+
+ @Test
+ public void testAppendMessageWithOffset() throws IOException {
+ DefaultMappedFile mappedFile = new DefaultMappedFile(fileName,
fileSize, true);
+
+ try {
+ byte[] testData = "Hello, RocketMQ!".getBytes();
+
+ // Test with valid offset
+ boolean result = mappedFile.appendMessage(testData, 0,
testData.length);
+ Assert.assertTrue("Should successfully append with valid offset",
result);
+
+ // Test with invalid offset (beyond array length)
+ result = mappedFile.appendMessage(testData, testData.length + 1,
1);
+ Assert.assertFalse("Should fail with invalid offset", result);
+
+ } finally {
+ mappedFile.destroy(0);
+ }
+ }
+}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFilePerformanceTest.java
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFilePerformanceTest.java
new file mode 100644
index 0000000000..b958487add
--- /dev/null
+++
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFilePerformanceTest.java
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.logfile;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.common.UtilAll;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DefaultMappedFilePerformanceTest {
+
+ private String storePath;
+ private String fileName;
+ private int fileSize = 10 * 1024 * 1024; // 10MB
+ private static final int WRITE_COUNT = 10000;
+ private static final int DATA_SIZE = 1024; // 1KB per write
+
+ @Before
+ public void setUp() throws Exception {
+ storePath = System.getProperty("user.home") + File.separator +
"unitteststore" + System.currentTimeMillis();
+ fileName = storePath + File.separator + "00000000000000000000";
+ UtilAll.ensureDirOK(storePath);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ UtilAll.deleteFile(new File(storePath));
+ }
+
+ @Test
+ public void testWriteWithoutMmapPerformance() throws IOException {
+ DefaultMappedFile mappedFile = new DefaultMappedFile(fileName,
fileSize, true);
+
+ try {
+ byte[] testData = new byte[DATA_SIZE];
+ for (int i = 0; i < testData.length; i++) {
+ testData[i] = (byte) (i % 256);
+ }
+
+ long startTime = System.currentTimeMillis();
+
+ for (int i = 0; i < WRITE_COUNT; i++) {
+ boolean result = mappedFile.appendMessage(testData);
+ Assert.assertTrue("Write should succeed", result);
+ }
+
+ long endTime = System.currentTimeMillis();
+ long duration = endTime - startTime;
+
+ // WriteWithoutMmap Performance:
+ // Writes: WRITE_COUNT
+ // Data size per write: DATA_SIZE bytes
+ // Total data: (WRITE_COUNT * DATA_SIZE / 1024) KB
+ // Duration: duration ms
+ // Throughput: (WRITE_COUNT * DATA_SIZE / 1024.0 / duration *
1000) KB/s
+
+ Assert.assertEquals("Wrote position should match expected",
+ WRITE_COUNT * DATA_SIZE, mappedFile.getWrotePosition());
+
+ } finally {
+ mappedFile.destroy(0);
+ }
+ }
+
+ @Test
+ public void testWriteWithMmapPerformance() throws IOException {
+ DefaultMappedFile mappedFile = new DefaultMappedFile(fileName,
fileSize, false);
+
+ try {
+ byte[] testData = new byte[DATA_SIZE];
+ for (int i = 0; i < testData.length; i++) {
+ testData[i] = (byte) (i % 256);
+ }
+
+ long startTime = System.currentTimeMillis();
+
+ for (int i = 0; i < WRITE_COUNT; i++) {
+ boolean result = mappedFile.appendMessage(testData);
+ Assert.assertTrue("Write should succeed", result);
+ }
+
+ long endTime = System.currentTimeMillis();
+ long duration = endTime - startTime;
+
+ // WriteWithMmap Performance:
+ // Writes: WRITE_COUNT
+ // Data size per write: DATA_SIZE bytes
+ // Total data: (WRITE_COUNT * DATA_SIZE / 1024) KB
+ // Duration: duration ms
+ // Throughput: (WRITE_COUNT * DATA_SIZE / 1024.0 / duration *
1000) KB/s
+
+ Assert.assertEquals("Wrote position should match expected",
+ WRITE_COUNT * DATA_SIZE, mappedFile.getWrotePosition());
+
+ } finally {
+ mappedFile.destroy(0);
+ }
+ }
+
+ @Test
+ public void testFlushPerformance() throws IOException {
+ DefaultMappedFile mappedFile = new DefaultMappedFile(fileName,
fileSize, true);
+
+ try {
+ // Write some data first
+ byte[] testData = new byte[DATA_SIZE];
+ for (int i = 0; i < testData.length; i++) {
+ testData[i] = (byte) (i % 256);
+ }
+
+ for (int i = 0; i < 1000; i++) {
+ mappedFile.appendMessage(testData);
+ }
+
+ long startTime = System.currentTimeMillis();
+
+ int flushedPosition = mappedFile.flush(0);
+
+ long endTime = System.currentTimeMillis();
+ long duration = endTime - startTime;
+
+ // Flush Performance:
+ // Flushed position: flushedPosition
+ // Duration: duration ms
+
+ Assert.assertTrue("Flush should succeed", flushedPosition > 0);
+
+ } finally {
+ mappedFile.destroy(0);
+ }
+ }
+
+ @Test
+ public void testByteBufferWritePerformance() throws IOException {
+ DefaultMappedFile mappedFile = new DefaultMappedFile(fileName,
fileSize, true);
+
+ try {
+ ByteBuffer testBuffer = ByteBuffer.allocate(DATA_SIZE);
+ for (int i = 0; i < DATA_SIZE; i++) {
+ testBuffer.put((byte) (i % 256));
+ }
+
+ long startTime = System.currentTimeMillis();
+
+ for (int i = 0; i < WRITE_COUNT; i++) {
+ testBuffer.rewind();
+ boolean result = mappedFile.appendMessage(testBuffer);
+ Assert.assertTrue("Write should succeed", result);
+ }
+
+ long endTime = System.currentTimeMillis();
+ long duration = endTime - startTime;
+
+ // ByteBuffer Write Performance:
+ // Writes: WRITE_COUNT
+ // Data size per write: DATA_SIZE bytes
+ // Total data: (WRITE_COUNT * DATA_SIZE / 1024) KB
+ // Duration: duration ms
+ // Throughput: (WRITE_COUNT * DATA_SIZE / 1024.0 / duration *
1000) KB/s
+
+ Assert.assertEquals("Wrote position should match expected",
+ WRITE_COUNT * DATA_SIZE, mappedFile.getWrotePosition());
+
+ } finally {
+ mappedFile.destroy(0);
+ }
+ }
+
+ @Test
+ public void testMixedWriteOperations() throws IOException {
+ DefaultMappedFile mappedFile = new DefaultMappedFile(fileName,
fileSize, true);
+
+ try {
+ byte[] testData = new byte[DATA_SIZE];
+ for (int i = 0; i < testData.length; i++) {
+ testData[i] = (byte) (i % 256);
+ }
+
+ long startTime = System.currentTimeMillis();
+
+ // Mix of different write operations
+ for (int i = 0; i < WRITE_COUNT / 4; i++) {
+ // appendMessage(byte[])
+ boolean result1 = mappedFile.appendMessage(testData);
+ Assert.assertTrue("Write should succeed", result1);
+
+ // appendMessage(byte[], offset, length)
+ boolean result2 = mappedFile.appendMessage(testData, 0,
testData.length);
+ Assert.assertTrue("Write should succeed", result2);
+
+ // appendMessage(ByteBuffer)
+ ByteBuffer buffer = ByteBuffer.wrap(testData);
+ boolean result3 = mappedFile.appendMessage(buffer);
+ Assert.assertTrue("Write should succeed", result3);
+
+ // appendMessageUsingFileChannel(byte[])
+ boolean result4 =
mappedFile.appendMessageUsingFileChannel(testData);
+ Assert.assertTrue("Write should succeed", result4);
+ }
+
+ long endTime = System.currentTimeMillis();
+ long duration = endTime - startTime;
+
+ // Mixed Write Operations Performance:
+ // Total operations: WRITE_COUNT
+ // Data size per operation: DATA_SIZE bytes
+ // Total data: (WRITE_COUNT * DATA_SIZE / 1024) KB
+ // Duration: duration ms
+ // Throughput: (WRITE_COUNT * DATA_SIZE / 1024.0 / duration *
1000) KB/s
+
+ Assert.assertEquals("Wrote position should match expected",
+ WRITE_COUNT * DATA_SIZE, mappedFile.getWrotePosition());
+
+ } finally {
+ mappedFile.destroy(0);
+ }
+ }
+}
diff --git
a/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileWriteWithoutMmapTest.java
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileWriteWithoutMmapTest.java
new file mode 100644
index 0000000000..79bca016e4
--- /dev/null
+++
b/store/src/test/java/org/apache/rocketmq/store/logfile/DefaultMappedFileWriteWithoutMmapTest.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.store.logfile;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import org.apache.rocketmq.common.UtilAll;
+import org.apache.rocketmq.store.TransientStorePool;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DefaultMappedFileWriteWithoutMmapTest {
+
+ private String storePath;
+ private String fileName;
+ private int fileSize = 1024 * 1024; // 1MB
+
+ @Before
+ public void setUp() throws Exception {
+ storePath = System.getProperty("user.home") + File.separator +
"unitteststore" + System.currentTimeMillis();
+ fileName = storePath + File.separator + "00000000000000000000";
+ UtilAll.ensureDirOK(storePath);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ UtilAll.deleteFile(new File(storePath));
+ }
+
+ @Test
+ public void testWriteWithoutMmapEnabled() throws IOException {
+ // Test with writeWithoutMmap = true
+ DefaultMappedFile mappedFile = new DefaultMappedFile(fileName,
fileSize, true);
+
+ try {
+ // Test appendMessage with byte array
+ byte[] testData = "Hello, RocketMQ!".getBytes();
+ boolean result = mappedFile.appendMessage(testData);
+ Assert.assertTrue("Should successfully append message", result);
+ Assert.assertEquals("Wrote position should be updated",
testData.length, mappedFile.getWrotePosition());
+
+ // Test appendMessage with ByteBuffer
+ ByteBuffer buffer = ByteBuffer.wrap("Test ByteBuffer".getBytes());
+ result = mappedFile.appendMessage(buffer);
+ Assert.assertTrue("Should successfully append ByteBuffer", result);
+ Assert.assertEquals("Wrote position should be updated",
testData.length + "Test ByteBuffer".length(), mappedFile.getWrotePosition());
+
+ // Test flush
+ int flushedPosition = mappedFile.flush(0);
+ Assert.assertTrue("Flush should succeed", flushedPosition > 0);
+
+ } finally {
+ mappedFile.destroy(0);
+ }
+ }
+
+ @Test
+ public void testWriteWithoutMmapDisabled() throws IOException {
+ // Test with writeWithoutMmap = false (default behavior)
+ DefaultMappedFile mappedFile = new DefaultMappedFile(fileName,
fileSize, false);
+
+ try {
+ // Test appendMessage with byte array
+ byte[] testData = "Hello, RocketMQ!".getBytes();
+ boolean result = mappedFile.appendMessage(testData);
+ Assert.assertTrue("Should successfully append message", result);
+ Assert.assertEquals("Wrote position should be updated",
testData.length, mappedFile.getWrotePosition());
+
+ // Test appendMessage with ByteBuffer
+ ByteBuffer buffer = ByteBuffer.wrap("Test ByteBuffer".getBytes());
+ result = mappedFile.appendMessage(buffer);
+ Assert.assertTrue("Should successfully append ByteBuffer", result);
+ Assert.assertEquals("Wrote position should be updated",
testData.length + "Test ByteBuffer".length(), mappedFile.getWrotePosition());
+
+ // Test flush
+ int flushedPosition = mappedFile.flush(0);
+ Assert.assertTrue("Flush should succeed", flushedPosition > 0);
+
+ } finally {
+ mappedFile.destroy(0);
+ }
+ }
+
+ @Test
+ public void testWriteWithoutMmapWithTransientStorePool() throws
IOException {
+ // Test with writeWithoutMmap = true and TransientStorePool
+ TransientStorePool transientStorePool = new TransientStorePool(5,
fileSize);
+ DefaultMappedFile mappedFile = new DefaultMappedFile(fileName,
fileSize, transientStorePool, true);
+
+ try {
+ // Test appendMessage with byte array
+ byte[] testData = "Hello, RocketMQ with
TransientStorePool!".getBytes();
+ boolean result = mappedFile.appendMessage(testData);
+ Assert.assertTrue("Should successfully append message", result);
+ Assert.assertEquals("Wrote position should be updated",
testData.length, mappedFile.getWrotePosition());
+
+ } finally {
+ mappedFile.destroy(0);
+ }
+ }
+
+ @Test
+ public void testAppendMessageWithOffset() throws IOException {
+ DefaultMappedFile mappedFile = new DefaultMappedFile(fileName,
fileSize, true);
+
+ try {
+ byte[] testData = "Hello, RocketMQ with offset!".getBytes();
+ boolean result = mappedFile.appendMessage(testData, 0,
testData.length);
+ Assert.assertTrue("Should successfully append message with
offset", result);
+ Assert.assertEquals("Wrote position should be updated",
testData.length, mappedFile.getWrotePosition());
+
+ } finally {
+ mappedFile.destroy(0);
+ }
+ }
+
+ @Test
+ public void testAppendMessageUsingFileChannel() throws IOException {
+ DefaultMappedFile mappedFile = new DefaultMappedFile(fileName,
fileSize, true);
+
+ try {
+ byte[] testData = "Hello, RocketMQ using FileChannel!".getBytes();
+ boolean result =
mappedFile.appendMessageUsingFileChannel(testData);
+ Assert.assertTrue("Should successfully append message using
FileChannel", result);
+ Assert.assertEquals("Wrote position should be updated",
testData.length, mappedFile.getWrotePosition());
+
+ } finally {
+ mappedFile.destroy(0);
+ }
+ }
+}