This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new 26480ddc87 [ISSUE #10073] Fix StoreCheckpoint logicsMsgTimestamp to
reflect flushed consume queue storetime (#10074)
26480ddc87 is described below
commit 26480ddc8749ae0296b660873bd83ab47278c10b
Author: guyinyou <[email protected]>
AuthorDate: Thu Feb 5 22:34:12 2026 +0800
[ISSUE #10073] Fix StoreCheckpoint logicsMsgTimestamp to reflect flushed
consume queue storetime (#10074)
* Fix StoreCheckpoint: set logicsMsgTimestamp only after CQ flush, use
logicsMsgTempTimestamp for in-memory storetime
Change-Id: I6085bf6efaef84168ece31d080481717465f2b13
* rename to tmpLogicsMsgTimestamp
Change-Id: Ia65ca06751f765bdc2bf053c58e08789f4b2fb22
---------
Co-authored-by: guyinyou <[email protected]>
---
store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java | 2 +-
.../src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java | 9 +++++++++
.../java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java | 2 +-
.../java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java | 6 ++++--
4 files changed, 15 insertions(+), 4 deletions(-)
diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 2a77ede32a..1d16165c04 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -709,7 +709,7 @@ public class ConsumeQueue implements ConsumeQueueInterface {
this.messageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
}
-
this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
+
this.messageStore.getStoreCheckpoint().setTmpLogicsMsgTimestamp(request.getStoreTimestamp());
if
(MultiDispatchUtils.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(),
request)) {
multiDispatchLmqQueue(request, maxRetries);
}
diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
b/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
index b4518f18f8..3a8027267c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
+++ b/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
@@ -33,6 +33,7 @@ public class StoreCheckpoint {
private final RandomAccessFile randomAccessFile;
private final FileChannel fileChannel;
private final MappedByteBuffer mappedByteBuffer;
+ private volatile long tmpLogicsMsgTimestamp = 0;
private volatile long physicMsgTimestamp = 0;
private volatile long logicsMsgTimestamp = 0;
private volatile long indexMsgTimestamp = 0;
@@ -112,6 +113,14 @@ public class StoreCheckpoint {
this.logicsMsgTimestamp = logicsMsgTimestamp;
}
+ public long getTmpLogicsMsgTimestamp() {
+ return tmpLogicsMsgTimestamp;
+ }
+
+ public void setTmpLogicsMsgTimestamp(long tmpLogicsMsgTimestamp) {
+ this.tmpLogicsMsgTimestamp = tmpLogicsMsgTimestamp;
+ }
+
public long getConfirmPhyOffset() {
return confirmPhyOffset;
}
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index 7ad29ff538..7bfb09928f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -536,7 +536,7 @@ public class BatchConsumeQueue implements
ConsumeQueueInterface {
if (BrokerRole.SLAVE ==
this.messageStore.getMessageStoreConfig().getBrokerRole()) {
this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
}
-
this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
+
this.messageStore.getStoreCheckpoint().setTmpLogicsMsgTimestamp(request.getStoreTimestamp());
return;
} else {
// XXX: warn and notify me
diff --git
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index d5d096becd..992bfb668c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -648,15 +648,17 @@ public class ConsumeQueueStore extends
AbstractConsumeQueueStore {
if (currentTimeMillis >= (this.lastFlushTimestamp +
flushConsumeQueueThoroughInterval)) {
this.lastFlushTimestamp = currentTimeMillis;
flushConsumeQueueLeastPages = 0;
- logicsMsgTimestamp =
messageStore.getStoreCheckpoint().getLogicsMsgTimestamp();
+ logicsMsgTimestamp =
messageStore.getStoreCheckpoint().getTmpLogicsMsgTimestamp();
}
+ boolean flushOK = true;
for (ConcurrentMap<Integer, ConsumeQueueInterface> maps :
consumeQueueTable.values()) {
for (ConsumeQueueInterface cq : maps.values()) {
boolean result = false;
for (int i = 0; i < retryTimes && !result; i++) {
result = flush(cq, flushConsumeQueueLeastPages);
}
+ flushOK &= result;
}
}
@@ -664,7 +666,7 @@ public class ConsumeQueueStore extends
AbstractConsumeQueueStore {
messageStore.getCompactionStore().flush(flushConsumeQueueLeastPages);
}
- if (0 == flushConsumeQueueLeastPages) {
+ if (flushOK && 0 == flushConsumeQueueLeastPages) {
if (logicsMsgTimestamp > 0) {
messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
}