This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 1d9b02ca46 [ISSUE #9707] Integrate RunningFlags with MappedFile system
for better error handling and state management (#9708)
1d9b02ca46 is described below
commit 1d9b02ca468c2b892ffba27b012be19b81c9e0e4
Author: guyinyou <[email protected]>
AuthorDate: Tue Sep 16 16:58:42 2025 +0800
[ISSUE #9707] Integrate RunningFlags with MappedFile system for better
error handling and state management (#9708)
* Add RunningFlags support to MappedFileQueue
- Integrate RunningFlags throughout MappedFileQueue hierarchy
- Add writeable state checking and error handling in DefaultMappedFile
- Update MappedFile interface and constructors to support RunningFlags
- Implement proper error state management during flush operations
* fix ut
* fix ut
---------
Co-authored-by: guyinyou <[email protected]>
---
.../rocketmq/store/AllocateMappedFileService.java | 6 +--
.../java/org/apache/rocketmq/store/CommitLog.java | 3 +-
.../org/apache/rocketmq/store/MappedFileQueue.java | 23 +++++---
.../rocketmq/store/MultiPathMappedFileQueue.java | 9 +++-
.../rocketmq/store/logfile/DefaultMappedFile.java | 61 +++++++++++++++++-----
.../apache/rocketmq/store/logfile/MappedFile.java | 3 +-
6 files changed, 79 insertions(+), 26 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
index a56fa46157..970e9b05ee 100644
---
a/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
+++
b/store/src/main/java/org/apache/rocketmq/store/AllocateMappedFileService.java
@@ -176,13 +176,13 @@ public class AllocateMappedFileService extends
ServiceThread {
if (messageStore.isTransientStorePoolEnable()) {
try {
mappedFile =
ServiceLoader.load(MappedFile.class).iterator().next();
- mappedFile.init(req.getFilePath(), req.getFileSize(),
messageStore.getTransientStorePool());
+ mappedFile.init(req.getFilePath(), req.getFileSize(),
messageStore.getRunningFlags(), messageStore.getTransientStorePool());
} catch (RuntimeException e) {
log.warn("Use default implementation.");
- mappedFile = new DefaultMappedFile(req.getFilePath(),
req.getFileSize(), messageStore.getTransientStorePool(), writeWithoutMmap);
+ mappedFile = new DefaultMappedFile(req.getFilePath(),
req.getFileSize(), messageStore.getRunningFlags(),
messageStore.getTransientStorePool(), writeWithoutMmap);
}
} else {
- mappedFile = new DefaultMappedFile(req.getFilePath(),
req.getFileSize(), writeWithoutMmap);
+ mappedFile = new DefaultMappedFile(req.getFilePath(),
req.getFileSize(), messageStore.getRunningFlags(), writeWithoutMmap);
}
long elapsedTime =
UtilAll.computeElapsedTimeMilliseconds(beginTime);
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 3b26afcc09..38894abc81 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -114,11 +114,12 @@ public class CommitLog implements Swappable {
if (storePath.contains(MixAll.MULTI_PATH_SPLITTER)) {
this.mappedFileQueue = new
MultiPathMappedFileQueue(messageStore.getMessageStoreConfig(),
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
- messageStore.getAllocateMappedFileService(),
this::getFullStorePaths);
+ messageStore.getAllocateMappedFileService(),
this::getFullStorePaths, messageStore.getRunningFlags());
} else {
this.mappedFileQueue = new MappedFileQueue(storePath,
messageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(),
messageStore.getAllocateMappedFileService(),
+ messageStore.getRunningFlags(),
messageStore.getMessageStoreConfig().isWriteWithoutMmap());
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
index 320e842154..70cc65f8f6 100644
--- a/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/MappedFileQueue.java
@@ -53,7 +53,9 @@ public class MappedFileQueue implements Swappable {
protected long committedWhere = 0;
protected volatile long storeTimestamp = 0;
-
+
+ protected RunningFlags runningFlags;
+
/**
* Configuration flag to use RandomAccessFile instead of MappedByteBuffer
for writing
*/
@@ -61,16 +63,25 @@ public class MappedFileQueue implements Swappable {
public MappedFileQueue(final String storePath, int mappedFileSize,
AllocateMappedFileService allocateMappedFileService) {
- this.storePath = storePath;
- this.mappedFileSize = mappedFileSize;
- this.allocateMappedFileService = allocateMappedFileService;
+ this(storePath, mappedFileSize, allocateMappedFileService, null,
false);
+ }
+
+ public MappedFileQueue(final String storePath, int mappedFileSize,
+ AllocateMappedFileService allocateMappedFileService, RunningFlags
runningFlags) {
+ this(storePath, mappedFileSize, allocateMappedFileService,
runningFlags, false);
}
public MappedFileQueue(final String storePath, int mappedFileSize,
AllocateMappedFileService allocateMappedFileService, boolean
writeWithoutMmap) {
+ this(storePath, mappedFileSize, allocateMappedFileService, null,
writeWithoutMmap);
+ }
+
+ public MappedFileQueue(final String storePath, int mappedFileSize,
+ AllocateMappedFileService allocateMappedFileService, RunningFlags
runningFlags, boolean writeWithoutMmap) {
this.storePath = storePath;
this.mappedFileSize = mappedFileSize;
this.allocateMappedFileService = allocateMappedFileService;
+ this.runningFlags = runningFlags;
this.writeWithoutMmap = writeWithoutMmap;
}
@@ -279,7 +290,7 @@ public class MappedFileQueue implements Swappable {
}
try {
- MappedFile mappedFile = new DefaultMappedFile(file.getPath(),
mappedFileSize, writeWithoutMmap);
+ MappedFile mappedFile = new DefaultMappedFile(file.getPath(),
mappedFileSize, runningFlags, writeWithoutMmap);
mappedFile.setWrotePosition(this.mappedFileSize);
mappedFile.setFlushedPosition(this.mappedFileSize);
@@ -369,7 +380,7 @@ public class MappedFileQueue implements Swappable {
nextNextFilePath, this.mappedFileSize);
} else {
try {
- mappedFile = new DefaultMappedFile(nextFilePath,
this.mappedFileSize, this.writeWithoutMmap);
+ mappedFile = new DefaultMappedFile(nextFilePath,
this.mappedFileSize, runningFlags, this.writeWithoutMmap);
} catch (IOException e) {
log.error("create mappedFile exception", e);
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
index 72ec8820a6..fcae4948c6 100644
---
a/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
+++
b/store/src/main/java/org/apache/rocketmq/store/MultiPathMappedFileQueue.java
@@ -36,10 +36,15 @@ public class MultiPathMappedFileQueue extends
MappedFileQueue {
private final MessageStoreConfig config;
private final Supplier<Set<String>> fullStorePathsSupplier;
+ public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int
mappedFileSize,
+ AllocateMappedFileService allocateMappedFileService,
+ Supplier<Set<String>> fullStorePathsSupplier) {
+ this(messageStoreConfig, mappedFileSize, allocateMappedFileService,
fullStorePathsSupplier, null);
+ }
public MultiPathMappedFileQueue(MessageStoreConfig messageStoreConfig, int
mappedFileSize,
AllocateMappedFileService
allocateMappedFileService,
- Supplier<Set<String>>
fullStorePathsSupplier) {
- super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize,
allocateMappedFileService,
+ Supplier<Set<String>>
fullStorePathsSupplier, RunningFlags runningFlags) {
+ super(messageStoreConfig.getStorePathCommitLog(), mappedFileSize,
allocateMappedFileService, runningFlags,
messageStoreConfig.isWriteWithoutMmap());
this.config = messageStoreConfig;
this.fullStorePathsSupplier = fullStorePathsSupplier;
diff --git
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
index b2d89108b4..f2383993d4 100644
---
a/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
+++
b/store/src/main/java/org/apache/rocketmq/store/logfile/DefaultMappedFile.java
@@ -53,6 +53,7 @@ import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.AppendMessageStatus;
import org.apache.rocketmq.store.CompactionAppendMsgCallback;
import org.apache.rocketmq.store.PutMessageContext;
+import org.apache.rocketmq.store.RunningFlags;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.TransientStorePool;
import org.apache.rocketmq.store.config.FlushDiskType;
@@ -121,6 +122,7 @@ public class DefaultMappedFile extends AbstractMappedFile {
private static int maxSharedNum = 16;
private static final SharedByteBuffer[] SHARED_BYTE_BUFFER;
+ protected RunningFlags runningFlags;
static class SharedByteBuffer {
private final ReentrantLock lock;
private final ByteBuffer buffer;
@@ -173,24 +175,36 @@ public class DefaultMappedFile extends AbstractMappedFile
{
}
public DefaultMappedFile(final String fileName, final int fileSize) throws
IOException {
- init(fileName, fileSize);
+ this(fileName, fileSize, null);
}
- public DefaultMappedFile(final String fileName, final int fileSize,
+ public DefaultMappedFile(final String fileName, final int fileSize,
boolean writeWithoutMmap) throws IOException {
+ this(fileName, fileSize, null, null, writeWithoutMmap);
+ }
+
+ public DefaultMappedFile(final String fileName, final int fileSize,
RunningFlags runningFlags) throws IOException {
+ this(fileName, fileSize, runningFlags, null, false);
+ }
+
+ public DefaultMappedFile(final String fileName, final int fileSize, final
RunningFlags runningFlags,
final TransientStorePool transientStorePool) throws IOException {
- init(fileName, fileSize, transientStorePool);
+ this(fileName, fileSize, runningFlags, transientStorePool, false);
}
- public DefaultMappedFile(final String fileName, final int fileSize,
+ public DefaultMappedFile(final String fileName, final int fileSize, final
RunningFlags runningFlags,
final boolean writeWithoutMmap) throws IOException {
- this.writeWithoutMmap = writeWithoutMmap;
- init(fileName, fileSize);
+ this(fileName, fileSize, runningFlags, null, writeWithoutMmap);
}
public DefaultMappedFile(final String fileName, final int fileSize,
+ final TransientStorePool transientStorePool, final boolean
writeWithoutMmap) throws IOException {
+ this(fileName, fileSize, null, transientStorePool, writeWithoutMmap);
+ }
+
+ public DefaultMappedFile(final String fileName, final int fileSize, final
RunningFlags runningFlags,
final TransientStorePool transientStorePool, final boolean
writeWithoutMmap) throws IOException {
this.writeWithoutMmap = writeWithoutMmap;
- init(fileName, fileSize, transientStorePool);
+ init(fileName, fileSize, runningFlags, transientStorePool);
}
public static int getTotalMappedFiles() {
@@ -202,30 +216,30 @@ public class DefaultMappedFile extends AbstractMappedFile
{
}
@Override
- public void init(final String fileName, final int fileSize,
+ public void init(final String fileName, final int fileSize, final
RunningFlags runningFlags,
final TransientStorePool transientStorePool) throws IOException {
- init(fileName, fileSize);
+ init(fileName, fileSize, runningFlags);
if (transientStorePool != null) {
this.writeBuffer = transientStorePool.borrowBuffer();
this.transientStorePool = transientStorePool;
}
}
- private void init(final String fileName, final int fileSize) throws
IOException {
+ private void init(final String fileName, final int fileSize, final
RunningFlags runningFlags) throws IOException {
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName());
+ this.runningFlags = runningFlags;
boolean ok = false;
UtilAll.ensureDirOK(this.file.getParent());
try {
- this.fileChannel = new RandomAccessFile(this.file,
"rw").getChannel();
+ this.randomAccessFile = new RandomAccessFile(this.file, "rw");
+ this.fileChannel = this.randomAccessFile.getChannel();
if (writeWithoutMmap) {
- // Use RandomAccessFile for writing instead of MappedByteBuffer
- this.randomAccessFile = new RandomAccessFile(this.file, "rw");
// Still create MappedByteBuffer for reading operations
this.mappedByteBuffer =
this.fileChannel.map(MapMode.READ_ONLY, 0, fileSize);
} else {
@@ -522,6 +536,10 @@ public class DefaultMappedFile extends AbstractMappedFile {
if (this.hold()) {
int value = getReadPosition();
+ if (!isWriteable()) {
+ return this.getFlushedPosition();
+ }
+
try {
this.mappedByteBufferAccessCountSinceLastSwap++;
@@ -538,6 +556,9 @@ public class DefaultMappedFile extends AbstractMappedFile {
}
this.lastFlushTime = System.currentTimeMillis();
} catch (Throwable e) {
+ if (e instanceof IOException) {
+ getAndMakeNotWriteable();
+ }
log.error("Error occurred when force data to disk.", e);
}
@@ -597,6 +618,20 @@ public class DefaultMappedFile extends AbstractMappedFile {
}
}
+ public boolean getAndMakeNotWriteable() {
+ if (runningFlags == null) {
+ return false;
+ }
+ return runningFlags.getAndMakeNotWriteable();
+ }
+
+ public boolean isWriteable() {
+ if (runningFlags == null) {
+ return true;
+ }
+ return runningFlags.isWriteable();
+ }
+
private boolean isAbleToFlush(final int flushLeastPages) {
int flush = FLUSHED_POSITION_UPDATER.get(this);
int write = getReadPosition();
diff --git
a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
index d1f11959aa..0985ff1edc 100644
--- a/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
+++ b/store/src/main/java/org/apache/rocketmq/store/logfile/MappedFile.java
@@ -28,6 +28,7 @@ import org.apache.rocketmq.store.AppendMessageCallback;
import org.apache.rocketmq.store.AppendMessageResult;
import org.apache.rocketmq.store.CompactionAppendMsgCallback;
import org.apache.rocketmq.store.PutMessageContext;
+import org.apache.rocketmq.store.RunningFlags;
import org.apache.rocketmq.store.SelectMappedBufferResult;
import org.apache.rocketmq.store.TransientStorePool;
import org.apache.rocketmq.store.config.FlushDiskType;
@@ -368,7 +369,7 @@ public interface MappedFile {
* @param transientStorePool transient store pool
* @throws IOException
*/
- void init(String fileName, int fileSize, TransientStorePool
transientStorePool) throws IOException;
+ void init(String fileName, int fileSize, RunningFlags runningFlags,
TransientStorePool transientStorePool) throws IOException;
Iterator<SelectMappedBufferResult> iterator(int pos);