This is an automated email from the ASF dual-hosted git repository.
ltamber pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 86e8ed630b [ISSUE #9666] Add accelerated startup recovery feature for
RocksDB store with SYNC_FLUSH (#9667)
86e8ed630b is described below
commit 86e8ed630bd20c8ea2c9ab662af9b97389c41a20
Author: rongtong <[email protected]>
AuthorDate: Tue Sep 9 14:11:24 2025 +0800
[ISSUE #9666] Add accelerated startup recovery feature for RocksDB store
with SYNC_FLUSH (#9667)
* feat: Add accelerated startup recovery feature
Add accelerated startup recovery functionality when using RocksDB store
with SYNC_FLUSH configuration:
- Add enableAcceleratedRecovery configuration option in MessageStoreConfig
- Implement accelerated recovery logic in CommitLog for both normal and
abnormal recovery
- Add protective fallback mechanism to handle edge cases
- Improve isMappedFileMatchedRecover method for better robustness
- Add comprehensive unit tests for the accelerated recovery feature
This feature significantly reduces startup time when recovering from
RocksDB-based storage
with synchronous flushing enabled, while maintaining data consistency and
safety.
* Fix the issue of accelerated startup failure
* refactor: Remove problematic unit test
Remove AcceleratedRecoveryTest.java as the test implementation was not
appropriate for the accelerated startup recovery feature.
* Delete useless code
---
.../java/org/apache/rocketmq/store/CommitLog.java | 54 +++++++++++++++++-----
.../rocketmq/store/config/MessageStoreConfig.java | 18 +++++++-
2 files changed, 60 insertions(+), 12 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
index 2e3dbbadc3..a4bdb7851d 100644
--- a/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
+++ b/store/src/main/java/org/apache/rocketmq/store/CommitLog.java
@@ -344,6 +344,16 @@ public class CommitLog implements Swappable {
long processOffset = mappedFile.getFileFromOffset();
long mappedFileOffset = 0;
long lastValidMsgPhyOffset = this.getConfirmOffset();
+
+ if
(defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore()
+ &&
defaultMessageStore.getMessageStoreConfig().isEnableAcceleratedRecovery()) {
+ mappedFileOffset = dispatchFromPhyOffset -
mappedFile.getFileFromOffset();
+ if (mappedFileOffset > 0) {
+ log.info("recover using acceleration, recovery offset is
{}", dispatchFromPhyOffset);
+ lastValidMsgPhyOffset = dispatchFromPhyOffset;
+ byteBuffer.position((int) mappedFileOffset);
+ }
+ }
while (true) {
DispatchRequest dispatchRequest =
this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
int size = dispatchRequest.getMsgSize();
@@ -728,9 +738,29 @@ public class CommitLog implements Swappable {
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
long processOffset = mappedFile.getFileFromOffset();
- long mappedFileOffset = 0;
- long lastValidMsgPhyOffset = processOffset;
- long lastConfirmValidMsgPhyOffset = processOffset;
+ long mappedFileOffset;
+ long lastValidMsgPhyOffset;
+ long lastConfirmValidMsgPhyOffset;
+
+ if
(defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore()
+ &&
defaultMessageStore.getMessageStoreConfig().isEnableAcceleratedRecovery()) {
+ mappedFileOffset = maxPhyOffsetOfConsumeQueue -
mappedFile.getFileFromOffset();
+ // Protective measures, falling back to non-accelerated mode,
which is extremely unlikely to occur
+ if (mappedFileOffset < 0) {
+ mappedFileOffset = 0;
+ lastValidMsgPhyOffset = processOffset;
+ lastConfirmValidMsgPhyOffset = processOffset;
+ } else {
+ log.info("recover using acceleration, recovery offset is
{}", maxPhyOffsetOfConsumeQueue);
+ lastValidMsgPhyOffset = maxPhyOffsetOfConsumeQueue;
+ lastConfirmValidMsgPhyOffset = maxPhyOffsetOfConsumeQueue;
+ byteBuffer.position((int) mappedFileOffset);
+ }
+ } else {
+ mappedFileOffset = 0;
+ lastValidMsgPhyOffset = processOffset;
+ lastConfirmValidMsgPhyOffset = processOffset;
+ }
// abnormal recover require dispatching
boolean doDispatch = true;
while (true) {
@@ -840,19 +870,21 @@ public class CommitLog implements Swappable {
boolean recoverNormally) throws RocksDBException {
ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();
- int magicCode =
byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSITION);
- if (magicCode != MessageDecoder.MESSAGE_MAGIC_CODE && magicCode !=
MessageDecoder.MESSAGE_MAGIC_CODE_V2) {
+ boolean checkCRCOnRecover =
this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();
+ boolean checkDupInfo =
this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable();
+
+ DispatchRequest dispatchRequest =
this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover, checkDupInfo);
+
+ if (!dispatchRequest.isSuccess()) {
return false;
}
- int sysFlag = byteBuffer.getInt(MessageDecoder.SYSFLAG_POSITION);
- int bornHostLength = (sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0
? 8 : 20;
- int msgStoreTimePos = 4 + 4 + 4 + 4 + 4 + 8 + 8 + 4 + 8 +
bornHostLength;
- long storeTimestamp = byteBuffer.getLong(msgStoreTimePos);
- if (0 == storeTimestamp) {
+ long storeTimestamp = dispatchRequest.getStoreTimestamp();
+ long phyOffset = dispatchRequest.getCommitLogOffset();
+
+ if (0 == dispatchRequest.getStoreTimestamp()) {
return false;
}
- long phyOffset =
byteBuffer.getLong(MessageDecoder.MESSAGE_PHYSIC_OFFSET_POSITION);
if
(this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable() &&
this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
diff --git
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
index 60f6a90381..a142eca86f 100644
---
a/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
+++
b/store/src/main/java/org/apache/rocketmq/store/config/MessageStoreConfig.java
@@ -464,6 +464,13 @@ public class MessageStoreConfig {
private long rocksdbWalFileRollingThreshold = SizeUnit.GB;
+ /**
+ * Note: For correctness, this switch should be enabled only if the
previous startup was configured with SYNC_FLUSH
+ * and the storeType was defaultRocksDB. This switch is not recommended
for normal use cases (include master-slave
+ * or controller mode).
+ */
+ private boolean enableAcceleratedRecovery = false;
+
public String getRocksdbCompressionType() {
return rocksdbCompressionType;
}
@@ -2008,7 +2015,16 @@ public class MessageStoreConfig {
return enableLogConsumeQueueRepeatedlyBuildWhenRecover;
}
- public void setEnableLogConsumeQueueRepeatedlyBuildWhenRecover(boolean
enableLogConsumeQueueRepeatedlyBuildWhenRecover) {
+ public void setEnableLogConsumeQueueRepeatedlyBuildWhenRecover(
+ boolean enableLogConsumeQueueRepeatedlyBuildWhenRecover) {
this.enableLogConsumeQueueRepeatedlyBuildWhenRecover =
enableLogConsumeQueueRepeatedlyBuildWhenRecover;
}
+
+ public boolean isEnableAcceleratedRecovery() {
+ return enableAcceleratedRecovery;
+ }
+
+ public void setEnableAcceleratedRecovery(boolean
enableAcceleratedRecovery) {
+ this.enableAcceleratedRecovery = enableAcceleratedRecovery;
+ }
}