This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new f5d2cd82177 Pipe IT: Stabilize leader stop test (#17809)
f5d2cd82177 is described below

commit f5d2cd821775de2e80795f6f48a568c0e6ce132e
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 2 18:53:19 2026 +0800

    Pipe IT: Stabilize leader stop test (#17809)
    
    * Pipe IT: Stabilize table model leader stop test
    
    * Pipe IT: Stabilize tree model leader stop test
---
 .../manual/enhanced/IoTDBPipeClusterIT.java        | 28 ++++++++++++++++------
 .../auto/enhanced/IoTDBPipeClusterIT.java          | 28 ++++++++++++++++------
 2 files changed, 42 insertions(+), 14 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
index 244a1ff0c83..27297c06471 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
@@ -313,6 +313,9 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
           TableModelUtils.insertData("test", "test", 100, 200, senderEnv);
 
           TableModelUtils.insertData("test1", "test1", 100, 200, senderEnv);
+          // Avoid electing a stale follower after stopping the current test1 
leader.
+          flushTableDataRegionReplicasAfterReplicationComplete(
+              senderEnv, Collections.singletonList("test1"));
 
           final int leaderIndex = restartTableDataRegionLeader(client, 
"test1");
           if (leaderIndex == -1) { // ensure the leader is stopped
@@ -324,7 +327,10 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
           TableModelUtils.insertData("test1", "test1", 200, 300, senderEnv);
 
           TableModelUtils.assertData("test", "test", 0, 300, receiverEnv, 
handleFailure);
-          waitForTableDataRegionReplicationComplete(Arrays.asList("test", 
"test1"));
+          flushTableDataRegionReplicasAfterReplicationComplete(
+              senderEnv, Arrays.asList("test", "test1"));
+          flushTableDataRegionReplicasAfterReplicationComplete(
+              receiverEnv, Collections.singletonList("test"));
         }
 
         try {
@@ -428,14 +434,22 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
     return -1;
   }
 
-  private void waitForTableDataRegionReplicationComplete(final List<String> 
databases) {
+  private void flushTableDataRegionReplicasAfterReplicationComplete(
+      final BaseEnv env, final List<String> databases) {
+    waitForTableDataRegionReplicationComplete(env, databases);
+    TestUtils.executeNonQueryWithRetry(env, "flush");
+    waitForTableDataRegionReplicationComplete(env, databases);
+  }
+
+  private void waitForTableDataRegionReplicationComplete(
+      final BaseEnv env, final List<String> databases) {
     await()
         .pollInterval(500, TimeUnit.MILLISECONDS)
         .atMost(2, TimeUnit.MINUTES)
         .untilAsserted(
             () -> {
               try (final SyncConfigNodeIServiceClient client =
-                  (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+                  (SyncConfigNodeIServiceClient) 
env.getLeaderConfigNodeConnection()) {
                 final List<TRegionInfo> leaderRegionInfoList =
                     showTableDataRegionLeaders(databases, client);
                 Assert.assertFalse(
@@ -444,14 +458,14 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
 
                 for (final TRegionInfo regionInfo : leaderRegionInfoList) {
                   final DataNodeWrapper leaderNode =
-                      findDataNodeWrapperByPort(regionInfo.getClientRpcPort());
+                      findDataNodeWrapperByPort(env, 
regionInfo.getClientRpcPort());
                   final String metricsUrl =
                       "http://";
                           + leaderNode.getIp()
                           + ":"
                           + leaderNode.getMetricPort()
                           + "/metrics";
-                  final String metricsContent = 
senderEnv.getUrlContent(metricsUrl, null);
+                  final String metricsContent = env.getUrlContent(metricsUrl, 
null);
                   Assert.assertNotNull(
                       "Failed to fetch metrics from leader DataNode at " + 
metricsUrl,
                       metricsContent);
@@ -480,8 +494,8 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
     return result;
   }
 
-  private DataNodeWrapper findDataNodeWrapperByPort(final int port) {
-    for (final DataNodeWrapper dataNodeWrapper : 
senderEnv.getDataNodeWrapperList()) {
+  private DataNodeWrapper findDataNodeWrapperByPort(final BaseEnv env, final 
int port) {
+    for (final DataNodeWrapper dataNodeWrapper : env.getDataNodeWrapperList()) 
{
       if (dataNodeWrapper.getPort() == port) {
         return dataNodeWrapper;
       }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
index 48c2f7a0308..22b6d07f44a 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
@@ -38,6 +38,7 @@ import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2DualTreeAutoEnhanced;
+import org.apache.iotdb.itbase.env.BaseEnv;
 import org.apache.iotdb.pipe.it.dual.tablemodel.TableModelUtils;
 import 
org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -328,6 +329,8 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualTreeModelAutoIT {
           senderEnv,
           Arrays.asList("insert into root.db.d1(time, s1) values (1, 1)", 
"flush"),
           null);
+      flushTreeDataRegionReplicasAfterReplicationComplete(
+          senderEnv, Collections.singletonList("root.db"));
 
       final int leaderIndex = restartTreeDataRegionLeader(client, "root.db");
       if (leaderIndex == -1) { // ensure the leader is stopped
@@ -344,7 +347,10 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualTreeModelAutoIT {
           "select count(*) from root.db.d1",
           "count(root.db.d1.s1),",
           Collections.singleton("2,"));
-      
waitForTreeDataRegionReplicationComplete(Collections.singletonList("root.db"));
+      flushTreeDataRegionReplicasAfterReplicationComplete(
+          senderEnv, Collections.singletonList("root.db"));
+      flushTreeDataRegionReplicasAfterReplicationComplete(
+          receiverEnv, Collections.singletonList("root.db"));
     }
 
     try {
@@ -441,14 +447,22 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualTreeModelAutoIT {
     return -1;
   }
 
-  private void waitForTreeDataRegionReplicationComplete(final List<String> 
databases) {
+  private void flushTreeDataRegionReplicasAfterReplicationComplete(
+      final BaseEnv env, final List<String> databases) {
+    waitForTreeDataRegionReplicationComplete(env, databases);
+    TestUtils.executeNonQueryWithRetry(env, "flush");
+    waitForTreeDataRegionReplicationComplete(env, databases);
+  }
+
+  private void waitForTreeDataRegionReplicationComplete(
+      final BaseEnv env, final List<String> databases) {
     await()
         .pollInterval(500, TimeUnit.MILLISECONDS)
         .atMost(2, TimeUnit.MINUTES)
         .untilAsserted(
             () -> {
               try (final SyncConfigNodeIServiceClient client =
-                  (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+                  (SyncConfigNodeIServiceClient) 
env.getLeaderConfigNodeConnection()) {
                 final List<TRegionInfo> leaderRegionInfoList =
                     showTreeDataRegionLeaders(databases, client);
                 Assert.assertFalse(
@@ -457,14 +471,14 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualTreeModelAutoIT {
 
                 for (final TRegionInfo regionInfo : leaderRegionInfoList) {
                   final DataNodeWrapper leaderNode =
-                      findDataNodeWrapperByPort(regionInfo.getClientRpcPort());
+                      findDataNodeWrapperByPort(env, 
regionInfo.getClientRpcPort());
                   final String metricsUrl =
                       "http://";
                           + leaderNode.getIp()
                           + ":"
                           + leaderNode.getMetricPort()
                           + "/metrics";
-                  final String metricsContent = 
senderEnv.getUrlContent(metricsUrl, null);
+                  final String metricsContent = env.getUrlContent(metricsUrl, 
null);
                   Assert.assertNotNull(
                       "Failed to fetch metrics from leader DataNode at " + 
metricsUrl,
                       metricsContent);
@@ -492,8 +506,8 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualTreeModelAutoIT {
     return result;
   }
 
-  private DataNodeWrapper findDataNodeWrapperByPort(final int port) {
-    for (final DataNodeWrapper dataNodeWrapper : 
senderEnv.getDataNodeWrapperList()) {
+  private DataNodeWrapper findDataNodeWrapperByPort(final BaseEnv env, final 
int port) {
+    for (final DataNodeWrapper dataNodeWrapper : env.getDataNodeWrapperList()) 
{
       if (dataNodeWrapper.getPort() == port) {
         return dataNodeWrapper;
       }

Reply via email to