This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 47c07a9724 [ISSUE #9716] refactor: replace RandomAccessFile with
FileChannel (#9715)
47c07a9724 is described below
commit 47c07a97242bd979c4ce97c51efc350f3007882a
Author: rongtong <[email protected]>
AuthorDate: Fri Sep 19 15:43:01 2025 +0800
[ISSUE #9716] refactor: replace RandomAccessFile with FileChannel (#9715)
* refactor: replace RandomAccessFile with FileChannel for better I/O
performance
- Remove RandomAccessFile field and related logic completely
- Use FileChannel for all write operations when writeWithoutMmap is enabled
- Change SharedByteBuffer to use direct memory allocation
(ByteBuffer.allocateDirect)
- Add RunningFlags support for better error handling
- Improve constructor design with better parameter handling
- Fix SharedByteBuffer write operation to ensure correct byte count
This change improves I/O performance by:
1. Eliminating the overhead of RandomAccessFile
2. Using direct memory allocation for better memory management
3. Providing more consistent I/O operations through FileChannel
4. Better error handling with RunningFlags integration
* writeWithoutMmap and transientStorePoolEnable cannot be used together. If
both are enabled, only transientStorePoolEnable will take effect.
* Fix config comment
---
.../apache/rocketmq/store/DefaultMessageStore.java | 3 +-
.../rocketmq/store/config/MessageStoreConfig.java | 7 ++-
.../rocketmq/store/logfile/DefaultMappedFile.java | 72 ++++++++--------------
3 files changed, 33 insertions(+), 49 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
index 4a8ecbfbf2..41b2e3da3e 100644
--- a/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/DefaultMessageStore.java
@@ -3025,7 +3025,8 @@ public class DefaultMessageStore implements MessageStore {
*/
public boolean isTransientStorePoolEnable() {
return this.messageStoreConfig.isTransientStorePoolEnable() &&
- (this.brokerConfig.isEnableControllerMode() ||
this.messageStoreConfig.getBrokerRole() != BrokerRole.SLAVE);
+ (this.brokerConfig.isEnableControllerMode() ||
this.messageStoreConfig.getBrokerRole() != BrokerRole.SLAVE)
+ && !messageStoreConfig.isWriteWithoutMmap();
}
public long getReputFromOffset() {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index a48138b60d..2e72f9e6f2 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -239,8 +239,11 @@ public class MessageStoreConfig {
private boolean fastFailIfNoBufferInStorePool = false;
/**
- * When true, use RandomAccessFile for writing instead of MappedByteBuffer.
- * This can be useful for certain scenarios where mmap is not desired.
+ * When true, use RandomAccessFile for writing instead of
MappedByteBuffer. This can be useful for certain scenarios
+ * where mmap is not desired.
+ *
+ * The configurations writeWithoutMmap and transientStorePoolEnable are
mutually exclusive. When both are set to
+ * true, only writeWithoutMmap will be effective.
*/
@ImportantField
private boolean writeWithoutMmap = false;
diff --git
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index c566d9956d..147eb3d708 100644
---
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@ -82,10 +82,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
protected volatile int flushedPosition;
protected int fileSize;
protected FileChannel fileChannel;
- /**
- * RandomAccessFile for writing when writeWithoutMmap is enabled
- */
- protected RandomAccessFile randomAccessFile = null;
+
/**
* Message will put to here first, and then reput to FileChannel if
writeBuffer is not null.
*/
@@ -130,7 +127,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
public SharedByteBuffer(int size) {
this.lock = new ReentrantLock();
- this.buffer = ByteBuffer.allocate(size);
+ this.buffer = ByteBuffer.allocateDirect(size);
}
public void release() {
@@ -237,8 +234,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
UtilAll.ensureDirOK(this.file.getParent());
try {
- this.randomAccessFile = new RandomAccessFile(this.file, "rw");
- this.fileChannel = this.randomAccessFile.getChannel();
+ this.fileChannel = new RandomAccessFile(this.file,
"rw").getChannel();
if (writeWithoutMmap) {
// Still create MappedByteBuffer for reading operations
@@ -261,9 +257,6 @@ public class DefaultMappedFile extends AbstractMappedFile {
if (!ok && this.fileChannel != null) {
this.fileChannel.close();
}
- if (!ok && this.randomAccessFile != null) {
- this.randomAccessFile.close();
- }
}
}
@@ -333,7 +326,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
if (currentPos < this.fileSize) {
SharedByteBuffer sharedByteBuffer = null;
ByteBuffer byteBuffer;
- if (writeWithoutMmap && randomAccessFile != null) {
+ if (writeWithoutMmap) {
sharedByteBuffer = borrowSharedByteBuffer();
byteBuffer = sharedByteBuffer.acquire();
byteBuffer.position(0).limit(byteBuffer.capacity());
@@ -347,8 +340,9 @@ public class DefaultMappedFile extends AbstractMappedFile {
if (sharedByteBuffer != null) {
try {
- randomAccessFile.seek(currentPos);
- randomAccessFile.write(byteBuffer.array(), 0,
result.getWroteBytes());
+ this.fileChannel.position(currentPos);
+ byteBuffer.position(0).limit(result.getWroteBytes());
+ this.fileChannel.write(byteBuffer);
} catch (Throwable t) {
log.error("Failed to write to mappedFile {}",
this.fileName, t);
return new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
@@ -388,7 +382,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
if (currentPos < this.fileSize) {
SharedByteBuffer sharedByteBuffer = null;
ByteBuffer byteBuffer;
- if (writeWithoutMmap && randomAccessFile != null) {
+ if (writeWithoutMmap) {
sharedByteBuffer = borrowSharedByteBuffer();
byteBuffer = sharedByteBuffer.acquire();
byteBuffer.position(0).limit(byteBuffer.capacity());
@@ -413,8 +407,9 @@ public class DefaultMappedFile extends AbstractMappedFile {
if (sharedByteBuffer != null) {
try {
- randomAccessFile.seek(currentPos);
- randomAccessFile.write(byteBuffer.array(), 0,
result.getWroteBytes());
+ this.fileChannel.position(currentPos);
+ byteBuffer.position(0).limit(result.getWroteBytes());
+ this.fileChannel.write(byteBuffer);
} catch (Throwable t) {
log.error("Failed to write to mappedFile {}",
this.fileName, t);
return new
AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
@@ -452,12 +447,13 @@ public class DefaultMappedFile extends AbstractMappedFile
{
if ((currentPos + remaining) <= this.fileSize) {
try {
- if (writeWithoutMmap && randomAccessFile != null) {
- // Use RandomAccessFile for writing
- randomAccessFile.seek(currentPos);
+ if (writeWithoutMmap) {
+ // Use FileChannel for writing
+ this.fileChannel.position(currentPos);
byte[] buffer = new byte[remaining];
data.get(buffer);
- randomAccessFile.write(buffer);
+ ByteBuffer writeBuffer = ByteBuffer.wrap(buffer);
+ this.fileChannel.write(writeBuffer);
} else {
// Use FileChannel for writing (default behavior)
this.fileChannel.position(currentPos);
@@ -487,10 +483,11 @@ public class DefaultMappedFile extends AbstractMappedFile
{
if ((currentPos + length) <= this.fileSize) {
try {
- if (writeWithoutMmap && randomAccessFile != null) {
- // Use RandomAccessFile for writing
- randomAccessFile.seek(currentPos);
- randomAccessFile.write(data, offset, length);
+ if (writeWithoutMmap) {
+ // Use FileChannel for writing
+ this.fileChannel.position(currentPos);
+ ByteBuffer writeBuffer = ByteBuffer.wrap(data, offset,
length);
+ this.fileChannel.write(writeBuffer);
} else {
// Use MappedByteBuffer for writing (default behavior)
ByteBuffer buf = this.mappedByteBuffer.slice();
@@ -542,17 +539,13 @@ public class DefaultMappedFile extends AbstractMappedFile
{
try {
this.mappedByteBufferAccessCountSinceLastSwap++;
- if (writeWithoutMmap && randomAccessFile != null) {
- // Use RandomAccessFile for flushing
- randomAccessFile.getChannel().force(false);
+ //We only append data to fileChannel or mappedByteBuffer,
never both.
+ if (writeWithoutMmap || writeBuffer != null ||
this.fileChannel.position() != 0) {
+ this.fileChannel.force(false);
} else {
- //We only append data to fileChannel or
mappedByteBuffer, never both.
- if (writeBuffer != null || this.fileChannel.position()
!= 0) {
- this.fileChannel.force(false);
- } else {
- this.mappedByteBuffer.force();
- }
+ this.mappedByteBuffer.force();
}
+
this.lastFlushTime = System.currentTimeMillis();
FLUSHED_POSITION_UPDATER.set(this, value);
} catch (Throwable e) {
@@ -750,14 +743,6 @@ public class DefaultMappedFile extends AbstractMappedFile {
} catch (Throwable e) {
log.warn("close file channel {" + this.fileName + "} failed when
cleanup", e);
}
- try {
- if (this.randomAccessFile != null) {
- this.randomAccessFile.close();
- }
- } catch (Throwable e) {
- log.info("close random access file " + this.fileName + " failed",
e);
- }
-
}
@Override
@@ -770,11 +755,6 @@ public class DefaultMappedFile extends AbstractMappedFile {
this.fileChannel.close();
log.info("close file channel " + this.fileName + " OK");
- if (this.randomAccessFile != null) {
- this.randomAccessFile.close();
- log.info("close random access file " + this.fileName + "
OK");
- }
-
long beginTime = System.currentTimeMillis();
boolean result = this.file.delete();
log.info("delete file[REF:" + this.getRefCount() + "] " +
this.fileName