This is an automated email from the ASF dual-hosted git repository.

Caideyipi pushed a commit to branch grasia
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 43acfe72cda21084e1cff83b16456cf8dd1edb1e
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 20 11:36:41 2026 +0800

    Update IoTDBPipeClusterIT.java
---
 .../manual/enhanced/IoTDBPipeClusterIT.java        | 178 +++++++++++++++++----
 1 file changed, 143 insertions(+), 35 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
index ad283d4a02c..3a0aaec2aed 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
@@ -19,11 +19,13 @@
 
 package org.apache.iotdb.pipe.it.dual.tablemodel.manual.enhanced;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.cluster.RegionRoleType;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
@@ -51,6 +53,8 @@ import org.junit.runner.RunWith;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -59,12 +63,15 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2DualTableManualEnhanced.class})
 public class IoTDBPipeClusterIT extends AbstractPipeTableModelDualManualIT {
 
+  private static final double SYNC_LAG_DELTA = 0.001;
+
   @Override
   @Before
   public void setUp() {
@@ -299,41 +306,7 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
 
           TableModelUtils.insertData("test1", "test1", 100, 200, senderEnv);
 
-          final AtomicInteger leaderPort = new AtomicInteger(-1);
-          final TShowRegionResp showRegionResp =
-              client.showRegion(new TShowRegionReq().setIsTableModel(true));
-          showRegionResp
-              .getRegionInfoList()
-              .forEach(
-                  regionInfo -> {
-                    if 
(RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
-                      leaderPort.set(regionInfo.getClientRpcPort());
-                    }
-                  });
-
-          int leaderIndex = -1;
-          for (int i = 0; i < 3; ++i) {
-            if (senderEnv.getDataNodeWrapper(i).getPort() == leaderPort.get()) 
{
-              leaderIndex = i;
-              try {
-                senderEnv.shutdownDataNode(i);
-              } catch (final Throwable e) {
-                e.printStackTrace();
-                return;
-              }
-              try {
-                TimeUnit.SECONDS.sleep(1);
-              } catch (final InterruptedException ignored) {
-              }
-              try {
-                senderEnv.startDataNode(i);
-                ((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown();
-              } catch (final Throwable e) {
-                e.printStackTrace();
-                return;
-              }
-            }
-          }
+          final int leaderIndex = restartTableDataRegionLeader(client, 
"test1");
           if (leaderIndex == -1) { // ensure the leader is stopped
             fail();
           }
@@ -343,6 +316,7 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
           TableModelUtils.insertData("test1", "test1", 200, 300, senderEnv);
 
           TableModelUtils.assertData("test", "test", 0, 300, receiverEnv, 
handleFailure);
+          waitForTableDataRegionReplicationComplete(Arrays.asList("test", 
"test1"));
         }
 
         try {
@@ -398,6 +372,140 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeTableModelDualManualIT {
     }
   }
 
+  private int restartTableDataRegionLeader(
+      final SyncConfigNodeIServiceClient client, final String database) throws 
TException {
+    final List<TRegionInfo> leaderRegionInfoList =
+        showTableDataRegionLeaders(Collections.singletonList(database), 
client);
+    if (leaderRegionInfoList.isEmpty()) {
+      return -1;
+    }
+
+    final TRegionInfo targetRegionInfo =
+        leaderRegionInfoList.stream()
+            .min(Comparator.comparingInt(regionInfo -> 
regionInfo.getConsensusGroupId().getId()))
+            .orElse(null);
+    if (targetRegionInfo == null) {
+      return -1;
+    }
+
+    final int leaderPort = targetRegionInfo.getClientRpcPort();
+    for (int i = 0; i < senderEnv.getDataNodeWrapperList().size(); ++i) {
+      if (senderEnv.getDataNodeWrapper(i).getPort() != leaderPort) {
+        continue;
+      }
+
+      try {
+        senderEnv.shutdownDataNode(i);
+      } catch (final Throwable e) {
+        e.printStackTrace();
+        return -1;
+      }
+
+      try {
+        TimeUnit.SECONDS.sleep(1);
+      } catch (final InterruptedException ignored) {
+        Thread.currentThread().interrupt();
+        return -1;
+      }
+
+      try {
+        senderEnv.startDataNode(i);
+        ((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown();
+      } catch (final Throwable e) {
+        e.printStackTrace();
+        return -1;
+      }
+      return i;
+    }
+    return -1;
+  }
+
+  private void waitForTableDataRegionReplicationComplete(final List<String> 
databases) {
+    await()
+        .pollInterval(500, TimeUnit.MILLISECONDS)
+        .atMost(2, TimeUnit.MINUTES)
+        .untilAsserted(
+            () -> {
+              try (final SyncConfigNodeIServiceClient client =
+                  (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+                final List<TRegionInfo> leaderRegionInfoList =
+                    showTableDataRegionLeaders(databases, client);
+                Assert.assertFalse(
+                    "No table DataRegion leader found for databases " + 
databases,
+                    leaderRegionInfoList.isEmpty());
+
+                for (final TRegionInfo regionInfo : leaderRegionInfoList) {
+                  final DataNodeWrapper leaderNode =
+                      findDataNodeWrapperByPort(regionInfo.getClientRpcPort());
+                  final String metricsUrl =
+                      "http://";
+                          + leaderNode.getIp()
+                          + ":"
+                          + leaderNode.getMetricPort()
+                          + "/metrics";
+                  final String metricsContent = 
senderEnv.getUrlContent(metricsUrl, null);
+                  Assert.assertNotNull(
+                      "Failed to fetch metrics from leader DataNode at " + 
metricsUrl,
+                      metricsContent);
+                  assertSyncLagIsZero(metricsContent, 
buildDataRegionTag(regionInfo), metricsUrl);
+                }
+              }
+            });
+  }
+
+  private List<TRegionInfo> showTableDataRegionLeaders(
+      final List<String> databases, final SyncConfigNodeIServiceClient client) 
throws TException {
+    final TShowRegionResp showRegionResp =
+        client.showRegion(
+            new TShowRegionReq()
+                .setConsensusGroupType(TConsensusGroupType.DataRegion)
+                .setDatabases(databases)
+                .setIsTableModel(true));
+    Assert.assertEquals(
+        TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
showRegionResp.getStatus().getCode());
+    final List<TRegionInfo> result = new ArrayList<>();
+    for (final TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+      if 
(RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
+        result.add(regionInfo);
+      }
+    }
+    return result;
+  }
+
+  private DataNodeWrapper findDataNodeWrapperByPort(final int port) {
+    for (final DataNodeWrapper dataNodeWrapper : 
senderEnv.getDataNodeWrapperList()) {
+      if (dataNodeWrapper.getPort() == port) {
+        return dataNodeWrapper;
+      }
+    }
+    fail("Failed to find DataNodeWrapper for client rpc port " + port);
+    return null;
+  }
+
+  private String buildDataRegionTag(final TRegionInfo regionInfo) {
+    return "DataRegion[" + regionInfo.getConsensusGroupId().getId() + "]";
+  }
+
+  private void assertSyncLagIsZero(
+      final String metricsContent, final String dataRegionTag, final String 
metricsUrl) {
+    for (final String line : metricsContent.split("\\R")) {
+      if (!line.startsWith("iot_consensus{")
+          || !line.contains("type=\"syncLag\"")
+          || !line.contains("region=\"" + dataRegionTag + "\"")) {
+        continue;
+      }
+      final int lastSpaceIndex = line.lastIndexOf(' ');
+      Assert.assertTrue("Malformed syncLag metric line: " + line, 
lastSpaceIndex > 0);
+      Assert.assertEquals(
+          "Expected syncLag of " + dataRegionTag + " to be 0 at " + metricsUrl 
+ " but got " + line,
+          0.0,
+          Double.parseDouble(line.substring(lastSpaceIndex + 1)),
+          SYNC_LAG_DELTA);
+      return;
+    }
+    fail("No syncLag metric found for " + dataRegionTag + " at " + metricsUrl);
+  }
+
   @Test
   public void testPipeAfterRegisterNewDataNode() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

Reply via email to