This is an automated email from the ASF dual-hosted git repository.

jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git


The following commit(s) were added to refs/heads/develop by this push:
     new aa9ee1d059 [ISSUE #9689] Fix the issue that master transfer epoch was 
not updated in time
aa9ee1d059 is described below

commit aa9ee1d059dc0ad9112a2e397feb7b7e9aa21c0f
Author: Liu Shengzhong <[email protected]>
AuthorDate: Mon Sep 22 08:31:29 2025 +0800

    [ISSUE #9689] Fix the issue that master transfer epoch was not updated in 
time
---
 .../org/apache/rocketmq/remoting/protocol/EpochEntry.java  |  3 ++-
 .../store/ha/autoswitch/AutoSwitchHAConnection.java        | 14 ++++++++++++++
 2 files changed, 16 insertions(+), 1 deletion(-)

diff --git 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/EpochEntry.java 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/EpochEntry.java
index 4ff81760ad..9e8f1aab42 100644
--- 
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/EpochEntry.java
+++ 
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/EpochEntry.java
@@ -21,9 +21,10 @@ import java.util.Objects;
 
 public class EpochEntry extends RemotingSerializable {
 
+    public static final long LAST_EPOCH_END_OFFSET = Long.MAX_VALUE;
     private int epoch;
     private long startOffset;
-    private long endOffset = Long.MAX_VALUE;
+    private long endOffset = LAST_EPOCH_END_OFFSET;
 
     public EpochEntry(EpochEntry entry) {
         this.epoch = entry.getEpoch();
diff --git 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index 440cd3c7a5..cc55937aeb 100644
--- 
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++ 
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -593,6 +593,20 @@ public class AutoSwitchHAConnection implements 
HAConnection {
                     return;
                 }
 
+                // Check and update currentTransferEpochEndOffset
+                if (AutoSwitchHAConnection.this.currentTransferEpochEndOffset 
== -1) {
+                    EpochEntry currentEpochEntry = 
AutoSwitchHAConnection.this.epochCache.getEntry(AutoSwitchHAConnection.this.currentTransferEpoch);
+                    if (currentEpochEntry != null) {
+                        if (currentEpochEntry.getEndOffset() != 
EpochEntry.LAST_EPOCH_END_OFFSET) {
+                            LOGGER.info("Update currentTransferEpochEndOffset 
from -1 to {}", currentEpochEntry.getEndOffset());
+                            
AutoSwitchHAConnection.this.currentTransferEpochEndOffset = 
currentEpochEntry.getEndOffset();
+                        }
+                    } else {
+                        // we should never reach here
+                        LOGGER.warn("[BUG]Can't find currentTransferEpoch [{}] 
from epoch cache", currentTransferEpoch);
+                    }
+                }
+
                 // We must ensure that the transmitted logs are within the 
same epoch
                 // If currentEpochEndOffset == -1, means that 
currentTransferEpoch = last epoch, so the endOffset = Long.max
                 final long currentEpochEndOffset = 
AutoSwitchHAConnection.this.currentTransferEpochEndOffset;

Reply via email to