This is an automated email from the ASF dual-hosted git repository.

RongtongJin pushed a commit to branch codex/fix-store-flaky-tests
in repository https://gitbox.apache.org/repos/asf/rocketmq.git

commit b43a61f3df66c5d92c637c0fdb73220dc894eb96
Author: RongtongJin <[email protected]>
AuthorDate: Fri May 29 16:11:32 2026 +0800

    Fix flaky store HA and DLedger tests
---
 .../store/dledger/DLedgerCommitlogTest.java        | 18 +++---
 .../store/dledger/DLedgerMultiPathTest.java        |  2 +-
 .../store/dledger/MessageStoreTestBase.java        |  8 +++
 .../store/ha/autoswitch/AutoSwitchHATest.java      | 66 +++++++++++++---------
 4 files changed, 57 insertions(+), 37 deletions(-)

diff --git 
a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
index 7b09a6aa2f..90536a71c3 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerCommitlogTest.java
@@ -63,7 +63,7 @@ public class DLedgerCommitlogTest extends 
MessageStoreTestBase {
     @Test
     public void testTruncateCQ() throws Exception {
         String base = createBaseDir();
-        String peers = String.format("n0-localhost:%d", nextPort());
+        String peers = "n0-localhost:0";
         String group = UUID.randomUUID().toString();
         String topic = UUID.randomUUID().toString();
         {
@@ -122,7 +122,7 @@ public class DLedgerCommitlogTest extends 
MessageStoreTestBase {
     @Test
     public void testRecover() throws Exception {
         String base = createBaseDir();
-        String peers = String.format("n0-localhost:%d", nextPort());
+        String peers = "n0-localhost:0";
         String group = UUID.randomUUID().toString();
         String topic = UUID.randomUUID().toString();
         {
@@ -162,7 +162,7 @@ public class DLedgerCommitlogTest extends 
MessageStoreTestBase {
     @Test
     public void testDLedgerAbnormallyRecover() throws Exception {
         String base = createBaseDir();
-        String peers = String.format("n0-localhost:%d", nextPort());
+        String peers = "n0-localhost:0";
         String group = UUID.randomUUID().toString();
         String topic = UUID.randomUUID().toString();
 
@@ -199,7 +199,7 @@ public class DLedgerCommitlogTest extends 
MessageStoreTestBase {
     @Test
     public void testPutAndGetMessage() throws Exception {
         String base = createBaseDir();
-        String peers = String.format("n0-localhost:%d", nextPort());
+        String peers = "n0-localhost:0";
         String group = UUID.randomUUID().toString();
         DefaultMessageStore messageStore = createDledgerMessageStore(base, 
group, "n0", peers, null, false, 0);
         DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) 
messageStore.getCommitLog();
@@ -242,7 +242,7 @@ public class DLedgerCommitlogTest extends 
MessageStoreTestBase {
     @Test
     public void testBatchPutAndGetMessage() throws Exception {
         String base = createBaseDir();
-        String peers = String.format("n0-localhost:%d", nextPort());
+        String peers = "n0-localhost:0";
         String group = UUID.randomUUID().toString();
         DefaultMessageStore messageStore = createDledgerMessageStore(base, 
group, "n0", peers, null, false, 0);
         DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) 
messageStore.getCommitLog();
@@ -289,7 +289,7 @@ public class DLedgerCommitlogTest extends 
MessageStoreTestBase {
     public void testAsyncPutAndGetMessage() throws Exception {
         Assume.assumeFalse(MixAll.isWindows());
         String base = createBaseDir();
-        String peers = String.format("n0-localhost:%d", nextPort());
+        String peers = "n0-localhost:0";
         String group = UUID.randomUUID().toString();
         DefaultMessageStore messageStore = createDledgerMessageStore(base, 
group, "n0", peers, null, false, 0);
         DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) 
messageStore.getCommitLog();
@@ -333,7 +333,7 @@ public class DLedgerCommitlogTest extends 
MessageStoreTestBase {
     @Test
     public void testAsyncBatchPutAndGetMessage() throws Exception {
         String base = createBaseDir();
-        String peers = String.format("n0-localhost:%d", nextPort());
+        String peers = "n0-localhost:0";
         String group = UUID.randomUUID().toString();
         DefaultMessageStore messageStore = createDledgerMessageStore(base, 
group, "n0", peers, null, false, 0);
         DLedgerCommitLog dLedgerCommitLog = (DLedgerCommitLog) 
messageStore.getCommitLog();
@@ -380,7 +380,7 @@ public class DLedgerCommitlogTest extends 
MessageStoreTestBase {
 
     @Test
     public void testCommittedPos() throws Exception {
-        String peers = String.format("n0-localhost:%d;n1-localhost:%d", 
nextPort(), nextPort());
+        String peers = String.format("n0-localhost:%d;n1-localhost:%d", 
nextAvailablePort(), nextAvailablePort());
         String group = UUID.randomUUID().toString();
         DefaultMessageStore leaderStore = 
createDledgerMessageStore(createBaseDir(), group, "n0", peers, "n0", false, 0);
 
@@ -409,7 +409,7 @@ public class DLedgerCommitlogTest extends 
MessageStoreTestBase {
 
     @Test
     public void testIPv6HostMsgCommittedPos() throws Exception {
-        String peers = String.format("n0-localhost:%d;n1-localhost:%d", 
nextPort(), nextPort());
+        String peers = String.format("n0-localhost:%d;n1-localhost:%d", 
nextAvailablePort(), nextAvailablePort());
         String group = UUID.randomUUID().toString();
         DefaultMessageStore leaderStore = 
createDledgerMessageStore(createBaseDir(), group, "n0", peers, "n0", false, 0);
 
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java
 
b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java
index 9de4e4820e..315921d3dd 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/dledger/DLedgerMultiPathTest.java
@@ -43,7 +43,7 @@ public class DLedgerMultiPathTest extends 
MessageStoreTestBase {
         Assume.assumeFalse(MixAll.isWindows());
         String base = createBaseDir();
         String topic = UUID.randomUUID().toString();
-        String peers = String.format("n0-localhost:%d", nextPort());
+        String peers = "n0-localhost:0";
         String group = UUID.randomUUID().toString();
         String multiStorePath =
             base + "/multi/a/" + MessageStoreConfig.MULTI_PATH_SPLITTER +
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
 
b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
index c4d9f0727b..c8370b9ddb 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/dledger/MessageStoreTestBase.java
@@ -20,6 +20,8 @@ import com.google.common.util.concurrent.RateLimiter;
 import io.openmessaging.storage.dledger.DLedgerConfig;
 import io.openmessaging.storage.dledger.DLedgerServer;
 import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
 import java.net.UnknownHostException;
 import java.util.Arrays;
 import java.util.concurrent.ConcurrentHashMap;
@@ -41,6 +43,12 @@ import org.junit.Assert;
 
 public class MessageStoreTestBase extends StoreTestBase {
 
+    protected static int nextAvailablePort() throws IOException {
+        try (ServerSocket serverSocket = new ServerSocket(0)) {
+            return serverSocket.getLocalPort();
+        }
+    }
+
     protected DefaultMessageStore createDledgerMessageStore(String base, 
String group, String selfId, String peers, String leaderId, boolean 
createAbort, int deleteFileNum) throws Exception {
         System.setProperty("dledger.disk.ratio.check", "0.95");
         System.setProperty("dledger.disk.ratio.clean", "0.95");
diff --git 
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
 
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
index 519af44159..2d75948622 100644
--- 
a/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
+++ 
b/store/src/test/java/org/apache/rocketmq/store/ha/autoswitch/AutoSwitchHATest.java
@@ -100,8 +100,8 @@ public class AutoSwitchHATest {
         storeConfig1.setStorePathEpochFile(storePathRootDir + File.separator + 
brokerName + "#1" + File.separator + "EpochFileCache");
         storeConfig1.setTotalReplicas(3);
         storeConfig1.setInSyncReplicas(2);
+        storeConfig1.setHaListenPort(0);
         buildMessageStoreConfig(storeConfig1, mappedFileSize);
-        this.store1HaAddress = "127.0.0.1:10912";
 
         storeConfig2 = new MessageStoreConfig();
         storeConfig2.setBrokerRole(BrokerRole.SLAVE);
@@ -109,11 +109,10 @@ public class AutoSwitchHATest {
         storeConfig2.setStorePathRootDir(storePathRootDir + File.separator + 
brokerName + "#2");
         storeConfig2.setStorePathCommitLog(storePathRootDir + File.separator + 
brokerName + "#2" + File.separator + "commitlog");
         storeConfig2.setStorePathEpochFile(storePathRootDir + File.separator + 
brokerName + "#2" + File.separator + "EpochFileCache");
-        storeConfig2.setHaListenPort(10943);
+        storeConfig2.setHaListenPort(0);
         storeConfig2.setTotalReplicas(3);
         storeConfig2.setInSyncReplicas(2);
         buildMessageStoreConfig(storeConfig2, mappedFileSize);
-        this.store2HaAddress = "127.0.0.1:10943";
 
         messageStore1 = buildMessageStore(storeConfig1, 1L);
         messageStore2 = buildMessageStore(storeConfig2, 2L);
@@ -124,7 +123,7 @@ public class AutoSwitchHATest {
         storeConfig3.setStorePathRootDir(storePathRootDir + File.separator + 
brokerName + "#3");
         storeConfig3.setStorePathCommitLog(storePathRootDir + File.separator + 
brokerName + "#3" + File.separator + "commitlog");
         storeConfig3.setStorePathEpochFile(storePathRootDir + File.separator + 
brokerName + "#3" + File.separator + "EpochFileCache");
-        storeConfig3.setHaListenPort(10980);
+        storeConfig3.setHaListenPort(0);
         storeConfig3.setTotalReplicas(3);
         storeConfig3.setInSyncReplicas(2);
         buildMessageStoreConfig(storeConfig3, mappedFileSize);
@@ -136,6 +135,8 @@ public class AutoSwitchHATest {
         messageStore1.start();
         messageStore2.start();
         messageStore3.start();
+        this.store1HaAddress = haAddress(storeConfig1);
+        this.store2HaAddress = haAddress(storeConfig2);
 
 //        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).("127.0.0.1:8000");
 //        ((AutoSwitchHAService) 
this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
@@ -154,18 +155,17 @@ public class AutoSwitchHATest {
         storeConfig1.setStorePathCommitLog(storePathRootDir + File.separator + 
brokerName + "#1" + File.separator + "commitlog");
         storeConfig1.setStorePathEpochFile(storePathRootDir + File.separator + 
brokerName + "#1" + File.separator + "EpochFileCache");
         storeConfig1.setAllAckInSyncStateSet(allAckInSyncStateSet);
+        storeConfig1.setHaListenPort(0);
         buildMessageStoreConfig(storeConfig1, mappedFileSize);
-        this.store1HaAddress = "127.0.0.1:10912";
 
         storeConfig2 = new MessageStoreConfig();
         storeConfig2.setBrokerRole(BrokerRole.SLAVE);
         storeConfig2.setStorePathRootDir(storePathRootDir + File.separator + 
brokerName + "#2");
         storeConfig2.setStorePathCommitLog(storePathRootDir + File.separator + 
brokerName + "#2" + File.separator + "commitlog");
         storeConfig2.setStorePathEpochFile(storePathRootDir + File.separator + 
brokerName + "#2" + File.separator + "EpochFileCache");
-        storeConfig2.setHaListenPort(10943);
+        storeConfig2.setHaListenPort(0);
         storeConfig2.setAllAckInSyncStateSet(allAckInSyncStateSet);
         buildMessageStoreConfig(storeConfig2, mappedFileSize);
-        this.store2HaAddress = "127.0.0.1:10943";
 
         messageStore1 = buildMessageStore(storeConfig1, 1L);
         messageStore2 = buildMessageStore(storeConfig2, 2L);
@@ -174,6 +174,8 @@ public class AutoSwitchHATest {
         assertTrue(messageStore2.load());
         messageStore1.start();
         messageStore2.start();
+        this.store1HaAddress = haAddress(storeConfig1);
+        this.store2HaAddress = haAddress(storeConfig2);
 
 //        ((AutoSwitchHAService) 
this.messageStore1.getHaService()).setLocalAddress("127.0.0.1:8000");
 //        ((AutoSwitchHAService) 
this.messageStore2.getHaService()).setLocalAddress("127.0.0.1:8001");
@@ -190,6 +192,8 @@ public class AutoSwitchHATest {
         flag &= slave.getHaService().changeToSlave("", epoch, slaveId);
         slave.getHaService().updateHaMasterAddress(masterHaAddress);
         flag &= master.getHaService().changeToMaster(epoch);
+        AutoSwitchHAService masterHaService = (AutoSwitchHAService) 
master.getHaService();
+        await().atMost(10, TimeUnit.SECONDS).until(() -> 
masterHaService.getConnectionCount().get() > 0);
         // Put message on master
         for (int i = 0; i < totalPutMessageNums; i++) {
             PutMessageResult result = master.putMessage(buildMessage());
@@ -219,7 +223,7 @@ public class AutoSwitchHATest {
         final long confirmOffset = this.messageStore1.getConfirmOffset();
 
         // Step2, shutdown store2
-        this.messageStore2.shutdown();
+        this.messageStore2 = shutdownStore(this.messageStore2);
 
         // Put message, which should succeed because slave is removed from 
syncStateSet, only master remains
         final PutMessageResult putMessageResult = 
this.messageStore1.putMessage(buildMessage());
@@ -229,7 +233,7 @@ public class AutoSwitchHATest {
         assertTrue(this.messageStore1.getConfirmOffset() >= confirmOffset);
 
         // Step3, shutdown store1, start store2, change store2 to master, 
epoch = 2
-        this.messageStore1.shutdown();
+        this.messageStore1 = shutdownStore(this.messageStore1);
 
         storeConfig2.setBrokerRole(BrokerRole.SYNC_MASTER);
         messageStore2 = buildMessageStore(storeConfig2, 2L);
@@ -293,8 +297,7 @@ public class AutoSwitchHATest {
         assertTrue(result.contains(2L));
 
         // Now, shutdown store2
-        this.messageStore2.shutdown();
-        this.messageStore2.destroy();
+        this.messageStore2 = destroyStore(this.messageStore2);
 
         // Wait for connection to be removed and syncStateSet to be updated by 
removeConnection
         await().atMost(10, TimeUnit.SECONDS).until(() -> {
@@ -447,8 +450,7 @@ public class AutoSwitchHATest {
         checkMessage(this.messageStore2, 20, 0);
 
         // Step2: restart broker3
-        messageStore3.shutdown();
-        messageStore3.destroy();
+        messageStore3 = destroyStore(messageStore3);
 
         storeConfig3.setSyncFromLastFile(true);
         messageStore3 = buildMessageStore(storeConfig3, 3L);
@@ -457,7 +459,7 @@ public class AutoSwitchHATest {
 
         // Step2: add new broker3, link to broker1. because broker3 request 
sync from lastFile, so it only synced 10 msg from offset 10;
         messageStore3.getHaService().changeToSlave("", 2, 3L);
-        messageStore3.getHaService().updateHaMasterAddress("127.0.0.1:10912");
+        messageStore3.getHaService().updateHaMasterAddress(store1HaAddress);
 
         checkMessage(messageStore3, 10, 10);
     }
@@ -498,7 +500,7 @@ public class AutoSwitchHATest {
 
         long tmpConfirmOffset = this.messageStore2.getConfirmOffset();
         long setConfirmOffset = this.messageStore2.getConfirmOffset() - 
this.messageStore2.getConfirmOffset() / 2;
-        messageStore2.shutdown();
+        messageStore2 = shutdownStore(messageStore2);
         StoreCheckpoint storeCheckpoint = new 
StoreCheckpoint(storeConfig2.getStorePathRootDir() + File.separator + 
"checkpoint");
         assertEquals(tmpConfirmOffset, storeCheckpoint.getConfirmPhyOffset());
         storeCheckpoint.setConfirmPhyOffset(setConfirmOffset);
@@ -514,22 +516,28 @@ public class AutoSwitchHATest {
 
     @After
     public void destroy() throws Exception {
-        if (this.messageStore2 != null) {
-            messageStore2.shutdown();
-            messageStore2.destroy();
-        }
-        if (this.messageStore1 != null) {
-            messageStore1.shutdown();
-            messageStore1.destroy();
-        }
-        if (this.messageStore3 != null) {
-            messageStore3.shutdown();
-            messageStore3.destroy();
-        }
+        this.messageStore2 = destroyStore(this.messageStore2);
+        this.messageStore1 = destroyStore(this.messageStore1);
+        this.messageStore3 = destroyStore(this.messageStore3);
         File file = new File(storePathRootParentDir);
         UtilAll.deleteFile(file);
     }
 
+    private DefaultMessageStore shutdownStore(DefaultMessageStore 
messageStore) {
+        if (messageStore != null) {
+            messageStore.shutdown();
+        }
+        return null;
+    }
+
+    private DefaultMessageStore destroyStore(DefaultMessageStore messageStore) 
{
+        if (messageStore != null) {
+            messageStore.shutdown();
+            messageStore.destroy();
+        }
+        return null;
+    }
+
     private DefaultMessageStore buildMessageStore(MessageStoreConfig 
messageStoreConfig,
         long brokerId) throws Exception {
         BrokerConfig brokerConfig = new BrokerConfig();
@@ -538,6 +546,10 @@ public class AutoSwitchHATest {
         return new DefaultMessageStore(messageStoreConfig, brokerStatsManager, 
null, brokerConfig, new ConcurrentHashMap<>());
     }
 
+    private String haAddress(MessageStoreConfig messageStoreConfig) {
+        return "127.0.0.1:" + messageStoreConfig.getHaListenPort();
+    }
+
     private void buildMessageStoreConfig(MessageStoreConfig 
messageStoreConfig, int mappedFileSize) {
         messageStoreConfig.setMappedFileSizeCommitLog(mappedFileSize);
         messageStoreConfig.setMappedFileSizeConsumeQueue(1024 * 1024);

Reply via email to