This is an automated email from the ASF dual-hosted git repository.
jinrongtong pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new aa9ee1d059 [ISSUE #9689] Fix the issue that master transfer epoch was
not updated in time
aa9ee1d059 is described below
commit aa9ee1d059dc0ad9112a2e397feb7b7e9aa21c0f
Author: Liu Shengzhong <[email protected]>
AuthorDate: Mon Sep 22 08:31:29 2025 +0800
[ISSUE #9689] Fix the issue that master transfer epoch was not updated in
time
---
.../org/apache/rocketmq/remoting/protocol/EpochEntry.java | 3 ++-
.../store/ha/autoswitch/AutoSwitchHAConnection.java | 14 ++++++++++++++
2 files changed, 16 insertions(+), 1 deletion(-)
diff --git
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/EpochEntry.java
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/EpochEntry.java
index 4ff81760ad..9e8f1aab42 100644
---
a/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/EpochEntry.java
+++
b/remoting/src/main/java/org/apache/rocketmq/remoting/protocol/EpochEntry.java
@@ -21,9 +21,10 @@ import java.util.Objects;
public class EpochEntry extends RemotingSerializable {
+ public static final long LAST_EPOCH_END_OFFSET = Long.MAX_VALUE;
private int epoch;
private long startOffset;
- private long endOffset = Long.MAX_VALUE;
+ private long endOffset = LAST_EPOCH_END_OFFSET;
public EpochEntry(EpochEntry entry) {
this.epoch = entry.getEpoch();
diff --git
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
index 440cd3c7a5..cc55937aeb 100644
---
a/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
+++
b/store/src/main/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHAConnection.java
@@ -593,6 +593,20 @@ public class AutoSwitchHAConnection implements
HAConnection {
return;
}
+ // Check and update currentTransferEpochEndOffset
+ if (AutoSwitchHAConnection.this.currentTransferEpochEndOffset
== -1) {
+ EpochEntry currentEpochEntry =
AutoSwitchHAConnection.this.epochCache.getEntry(AutoSwitchHAConnection.this.currentTransferEpoch);
+ if (currentEpochEntry != null) {
+ if (currentEpochEntry.getEndOffset() !=
EpochEntry.LAST_EPOCH_END_OFFSET) {
+ LOGGER.info("Update currentTransferEpochEndOffset
from -1 to {}", currentEpochEntry.getEndOffset());
+
AutoSwitchHAConnection.this.currentTransferEpochEndOffset =
currentEpochEntry.getEndOffset();
+ }
+ } else {
+ // we should never reach here
+ LOGGER.warn("[BUG]Can't find currentTransferEpoch [{}]
from epoch cache", currentTransferEpoch);
+ }
+ }
+
// We must ensure that the transmitted logs are within the
same epoch
// If currentEpochEndOffset == -1, means that
currentTransferEpoch = last epoch, so the endOffset = Long.max
final long currentEpochEndOffset =
AutoSwitchHAConnection.this.currentTransferEpochEndOffset;