This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new 26480ddc87 [ISSUE #10073] Fix StoreCheckpoint logicsMsgTimestamp to 
reflect flushed consume queue storetime (#10074)
26480ddc87 is described below

commit 26480ddc8749ae0296b660873bd83ab47278c10b
Author: guyinyou <[email protected]>
AuthorDate: Thu Feb 5 22:34:12 2026 +0800

    [ISSUE #10073] Fix StoreCheckpoint logicsMsgTimestamp to reflect flushed 
consume queue storetime (#10074)
    
    * Fix StoreCheckpoint: set logicsMsgTimestamp only after CQ flush, use 
logicsMsgTempTimestamp for in-memory storetime
    
    Change-Id: I6085bf6efaef84168ece31d080481717465f2b13
    
    * rename to tmpLogicsMsgTimestamp
    
    Change-Id: Ia65ca06751f765bdc2bf053c58e08789f4b2fb22
    
    ---------
    
    Co-authored-by: guyinyou <[email protected]>
---
 store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java  | 2 +-
 .../src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java | 9 +++++++++
 .../java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java  | 2 +-
 .../java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java  | 6 ++++--
 4 files changed, 15 insertions(+), 4 deletions(-)

diff --git a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
index 2a77ede32a..1d16165c04 100644
--- a/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/ConsumeQueue.java
@@ -709,7 +709,7 @@ public class ConsumeQueue implements ConsumeQueueInterface {
                     
this.messageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) {
                     
this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
                 }
-                
this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
+                
this.messageStore.getStoreCheckpoint().setTmpLogicsMsgTimestamp(request.getStoreTimestamp());
                 if 
(MultiDispatchUtils.checkMultiDispatchQueue(this.messageStore.getMessageStoreConfig(),
 request)) {
                     multiDispatchLmqQueue(request, maxRetries);
                 }
diff --git a/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java 
b/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
index b4518f18f8..3a8027267c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
+++ b/store/src/main/java/org/apache/rocketmq/store/StoreCheckpoint.java
@@ -33,6 +33,7 @@ public class StoreCheckpoint {
     private final RandomAccessFile randomAccessFile;
     private final FileChannel fileChannel;
     private final MappedByteBuffer mappedByteBuffer;
+    private volatile long tmpLogicsMsgTimestamp = 0;
     private volatile long physicMsgTimestamp = 0;
     private volatile long logicsMsgTimestamp = 0;
     private volatile long indexMsgTimestamp = 0;
@@ -112,6 +113,14 @@ public class StoreCheckpoint {
         this.logicsMsgTimestamp = logicsMsgTimestamp;
     }
 
+    public long getTmpLogicsMsgTimestamp() {
+        return tmpLogicsMsgTimestamp;
+    }
+
+    public void setTmpLogicsMsgTimestamp(long tmpLogicsMsgTimestamp) {
+        this.tmpLogicsMsgTimestamp = tmpLogicsMsgTimestamp;
+    }
+
     public long getConfirmPhyOffset() {
         return confirmPhyOffset;
     }
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
index 7ad29ff538..7bfb09928f 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/BatchConsumeQueue.java
@@ -536,7 +536,7 @@ public class BatchConsumeQueue implements 
ConsumeQueueInterface {
                 if (BrokerRole.SLAVE == 
this.messageStore.getMessageStoreConfig().getBrokerRole()) {
                     
this.messageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
                 }
-                
this.messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
+                
this.messageStore.getStoreCheckpoint().setTmpLogicsMsgTimestamp(request.getStoreTimestamp());
                 return;
             } else {
                 // XXX: warn and notify me
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java 
b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
index d5d096becd..992bfb668c 100644
--- a/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
+++ b/store/src/main/java/org/apache/rocketmq/store/queue/ConsumeQueueStore.java
@@ -648,15 +648,17 @@ public class ConsumeQueueStore extends 
AbstractConsumeQueueStore {
             if (currentTimeMillis >= (this.lastFlushTimestamp + 
flushConsumeQueueThoroughInterval)) {
                 this.lastFlushTimestamp = currentTimeMillis;
                 flushConsumeQueueLeastPages = 0;
-                logicsMsgTimestamp = 
messageStore.getStoreCheckpoint().getLogicsMsgTimestamp();
+                logicsMsgTimestamp = 
messageStore.getStoreCheckpoint().getTmpLogicsMsgTimestamp();
             }
 
+            boolean flushOK = true;
             for (ConcurrentMap<Integer, ConsumeQueueInterface> maps : 
consumeQueueTable.values()) {
                 for (ConsumeQueueInterface cq : maps.values()) {
                     boolean result = false;
                     for (int i = 0; i < retryTimes && !result; i++) {
                         result = flush(cq, flushConsumeQueueLeastPages);
                     }
+                    flushOK &= result;
                 }
             }
 
@@ -664,7 +666,7 @@ public class ConsumeQueueStore extends 
AbstractConsumeQueueStore {
                 
messageStore.getCompactionStore().flush(flushConsumeQueueLeastPages);
             }
 
-            if (0 == flushConsumeQueueLeastPages) {
+            if (flushOK && 0 == flushConsumeQueueLeastPages) {
                 if (logicsMsgTimestamp > 0) {
                     
messageStore.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);
                 }

Reply via email to