This is an automated email from the ASF dual-hosted git repository.
yx9o pushed a commit to branch develop
in repository https://gitbox.apache.org/repos/asf/rocketmq.git
The following commit(s) were added to refs/heads/develop by this push:
new b5bc1ff5d6 Fix flaky HATest semi-sync replication (#10495)
b5bc1ff5d6 is described below
commit b5bc1ff5d687ab3d1cd95fe0d37260c6d3f0d49d
Author: rongtong <[email protected]>
AuthorDate: Sun Jun 21 09:37:25 2026 +0800
Fix flaky HATest semi-sync replication (#10495)
---
.../src/test/java/org/apache/rocketmq/store/HATest.java | 17 +++++++++++++++++
1 file changed, 17 insertions(+)
diff --git a/store/src/test/java/org/apache/rocketmq/store/HATest.java
b/store/src/test/java/org/apache/rocketmq/store/HATest.java
index 5623adb64f..8bdb82000b 100644
--- a/store/src/test/java/org/apache/rocketmq/store/HATest.java
+++ b/store/src/test/java/org/apache/rocketmq/store/HATest.java
@@ -25,6 +25,7 @@ import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -38,6 +39,7 @@ import
org.apache.rocketmq.common.message.MessageExtBrokerInner;
import org.apache.rocketmq.store.config.BrokerRole;
import org.apache.rocketmq.store.config.FlushDiskType;
import org.apache.rocketmq.store.config.MessageStoreConfig;
+import org.apache.rocketmq.store.ha.HAConnection;
import org.apache.rocketmq.store.ha.HAConnectionState;
import org.apache.rocketmq.store.stats.BrokerStatsManager;
import org.junit.After;
@@ -104,6 +106,7 @@ public class HATest {
slaveMessageStore.start();
slaveMessageStore.updateHaMasterAddress("127.0.0.1:" +
masterMessageStoreConfig.getHaListenPort());
await().atMost(6, SECONDS).until(() ->
slaveMessageStore.getHaService().getHAClient().getCurrentState() ==
HAConnectionState.TRANSFER);
+ await().atMost(6, SECONDS).until(this::isSlaveReadyForReplication);
}
@Test
@@ -281,6 +284,20 @@ public class HATest {
return msg;
}
+ private boolean isSlaveReadyForReplication() {
+ if (slaveMessageStore.getHaService().getHAClient().getCurrentState()
!= HAConnectionState.TRANSFER) {
+ return false;
+ }
+
+ long slaveMaxOffset = slaveMessageStore.getMaxPhyOffset();
+ List<HAConnection> connections =
messageStore.getHaService().getConnectionList();
+ synchronized (connections) {
+ return connections.stream().anyMatch(connection ->
+ connection.getCurrentState() == HAConnectionState.TRANSFER
+ && connection.getSlaveAckOffset() >= slaveMaxOffset);
+ }
+ }
+
private boolean isCommitLogAvailable(DefaultMessageStore store) {
try {
Field serviceField =
store.getClass().getDeclaredField("reputMessageService");