This is an automated email from the ASF dual-hosted git repository.
lizhimin pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new cee19630ad [ISSUE #9241] RocksDBConsumeQueueStore do not need to
update StoreCheckpoint (#9242)
cee19630ad is described below
commit cee19630ad1afd036bca54c84f40f1c0d752f800
Author: qianye <[email protected]>
AuthorDate: Mon Mar 17 17:30:48 2025 +0800
[ISSUE #9241] RocksDBConsumeQueueStore do not need to update
StoreCheckpoint (#9242)
---
.../rocketmq/store/dledger/DLedgerCommitLog.java | 91 ++++++++++++----------
.../store/queue/RocksDBConsumeQueueStore.java | 9 ---
2 files changed, 48 insertions(+), 52 deletions(-)
diff --git
a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
index 5f4ef08374..29be9e7c61 100644
---
a/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
+++
b/store/src/main/java/org/apache/rocketmq/store/dledger/DLedgerCommitLog.java
@@ -16,13 +16,26 @@
*/
package org.apache.rocketmq.store.dledger;
+import io.openmessaging.storage.dledger.AppendFuture;
+import io.openmessaging.storage.dledger.BatchAppendFuture;
+import io.openmessaging.storage.dledger.DLedgerConfig;
+import io.openmessaging.storage.dledger.DLedgerServer;
+import io.openmessaging.storage.dledger.entry.DLedgerEntry;
+import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
+import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
+import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
+import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
+import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
+import io.openmessaging.storage.dledger.store.file.MmapFile;
+import io.openmessaging.storage.dledger.store.file.MmapFileList;
+import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult;
+import io.openmessaging.storage.dledger.utils.DLedgerUtils;
import java.net.Inet6Address;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExtBatch;
@@ -43,21 +56,6 @@ import org.apache.rocketmq.store.config.MessageStoreConfig;
import org.apache.rocketmq.store.logfile.MappedFile;
import org.rocksdb.RocksDBException;
-import io.openmessaging.storage.dledger.AppendFuture;
-import io.openmessaging.storage.dledger.BatchAppendFuture;
-import io.openmessaging.storage.dledger.DLedgerConfig;
-import io.openmessaging.storage.dledger.DLedgerServer;
-import io.openmessaging.storage.dledger.entry.DLedgerEntry;
-import io.openmessaging.storage.dledger.protocol.AppendEntryRequest;
-import io.openmessaging.storage.dledger.protocol.AppendEntryResponse;
-import io.openmessaging.storage.dledger.protocol.BatchAppendEntryRequest;
-import io.openmessaging.storage.dledger.protocol.DLedgerResponseCode;
-import io.openmessaging.storage.dledger.store.file.DLedgerMmapFileStore;
-import io.openmessaging.storage.dledger.store.file.MmapFile;
-import io.openmessaging.storage.dledger.store.file.MmapFileList;
-import io.openmessaging.storage.dledger.store.file.SelectMmapBufferResult;
-import io.openmessaging.storage.dledger.utils.DLedgerUtils;
-
/**
* Store all metadata downtime for recovery, data protection reliability
*/
@@ -428,7 +426,7 @@ public class DLedgerCommitLog extends CommitLog {
log.info("Will set the initial commitlog offset={} for dledger",
dividedCommitlogOffset);
}
- private boolean isMmapFileMatchedRecover(final MmapFile mmapFile) {
+ private boolean isMmapFileMatchedRecover(final MmapFile mmapFile) throws
RocksDBException {
ByteBuffer byteBuffer = mmapFile.sliceByteBuffer();
int magicCode = byteBuffer.getInt(DLedgerEntry.BODY_OFFSET +
MessageDecoder.MESSAGE_MAGIC_CODE_POSITION);
@@ -436,39 +434,46 @@ public class DLedgerCommitLog extends CommitLog {
return false;
}
- int storeTimestampPosition;
- int sysFlag = byteBuffer.getInt(DLedgerEntry.BODY_OFFSET +
MessageDecoder.SYSFLAG_POSITION);
- if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) {
- storeTimestampPosition =
MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION;
+ if
(this.defaultMessageStore.getMessageStoreConfig().isEnableRocksDBStore()) {
+ final long maxPhyOffsetInConsumeQueue =
this.defaultMessageStore.getQueueStore().getMaxPhyOffsetInConsumeQueue();
+ long phyOffset = byteBuffer.getLong(DLedgerEntry.BODY_OFFSET +
MessageDecoder.MESSAGE_PHYSIC_OFFSET_POSITION);
+ if (phyOffset <= maxPhyOffsetInConsumeQueue) {
+ log.info("find check. beginPhyOffset: {},
maxPhyOffsetInConsumeQueue: {}", phyOffset, maxPhyOffsetInConsumeQueue);
+ return true;
+ }
} else {
- // v6 address is 12 byte larger than v4
- storeTimestampPosition =
MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION + 12;
- }
+ int storeTimestampPosition;
+ int sysFlag = byteBuffer.getInt(DLedgerEntry.BODY_OFFSET +
MessageDecoder.SYSFLAG_POSITION);
+ if ((sysFlag & MessageSysFlag.BORNHOST_V6_FLAG) == 0) {
+ storeTimestampPosition =
MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION;
+ } else {
+ // v6 address is 12 byte larger than v4
+ storeTimestampPosition =
MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSITION + 12;
+ }
- long storeTimestamp = byteBuffer.getLong(DLedgerEntry.BODY_OFFSET +
storeTimestampPosition);
- if (storeTimestamp == 0) {
- return false;
- }
+ long storeTimestamp = byteBuffer.getLong(DLedgerEntry.BODY_OFFSET
+ storeTimestampPosition);
+ if (storeTimestamp == 0) {
+ return false;
+ }
- if
(this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
+ if
(this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()
&&
this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {
- if (storeTimestamp <=
this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
- log.info("dledger find check timestamp, {} {}",
- storeTimestamp,
- UtilAll.timeMillisToHumanString(storeTimestamp));
- return true;
- }
- } else {
- if (storeTimestamp <=
this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
- log.info("dledger find check timestamp, {} {}",
- storeTimestamp,
- UtilAll.timeMillisToHumanString(storeTimestamp));
- return true;
+ if (storeTimestamp <=
this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {
+ log.info("dledger find check timestamp, {} {}",
+ storeTimestamp,
+ UtilAll.timeMillisToHumanString(storeTimestamp));
+ return true;
+ }
+ } else {
+ if (storeTimestamp <=
this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {
+ log.info("dledger find check timestamp, {} {}",
+ storeTimestamp,
+ UtilAll.timeMillisToHumanString(storeTimestamp));
+ return true;
+ }
}
}
-
return false;
-
}
@Override
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
index 7e3aa70d02..94ed0c926a 100644
---
a/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
+++
b/store/src/main/java/org/apache/rocketmq/store/queue/RocksDBConsumeQueueStore.java
@@ -29,7 +29,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
-
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import org.apache.commons.io.FileUtils;
@@ -46,7 +45,6 @@ import org.apache.rocketmq.logging.org.slf4j.Logger;
import org.apache.rocketmq.logging.org.slf4j.LoggerFactory;
import org.apache.rocketmq.store.DefaultMessageStore;
import org.apache.rocketmq.store.DispatchRequest;
-import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.StorePathConfigHelper;
import org.apache.rocketmq.store.exception.ConsumeQueueException;
import org.apache.rocketmq.store.exception.StoreException;
@@ -265,13 +263,6 @@ public class RocksDBConsumeQueueStore extends
AbstractConsumeQueueStore {
this.rocksDBStorage.batchPut(writeBatch);
this.rocksDBConsumeQueueOffsetTable.putHeapMaxCqOffset(tempTopicQueueMaxOffsetMap);
-
- long storeTimeStamp = requests.get(size - 1).getStoreTimestamp();
- if (this.messageStore.getMessageStoreConfig().getBrokerRole() ==
BrokerRole.SLAVE
- ||
this.messageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
-
this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(storeTimeStamp);
- }
-
this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(storeTimeStamp);
notifyMessageArriveAndClear(requests);
return true;
} catch (Exception e) {