This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 7cc3851892a Pipe: Stablized the tree model IoTDBPipeClusterIT (#17729) 
(#17762)
7cc3851892a is described below

commit 7cc3851892aad611cd14b0b000729cc8c5fa2c7e
Author: Caideyipi <[email protected]>
AuthorDate: Wed May 27 15:48:02 2026 +0800

    Pipe: Stablized the tree model IoTDBPipeClusterIT (#17729) (#17762)
    
    * Update IoTDBPipeClusterIT.java
    
    * Fix
---
 .../pipe/it/autocreate/IoTDBPipeClusterIT.java     | 183 +++++++++++++++++----
 1 file changed, 149 insertions(+), 34 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
index 9617de89cd7..96ec056cd6c 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeClusterIT.java
@@ -19,11 +19,13 @@
 
 package org.apache.iotdb.pipe.it.autocreate;
 
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.cluster.RegionRoleType;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
 import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
@@ -49,18 +51,22 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.awaitility.Awaitility.await;
 import static org.junit.Assert.fail;
 
 @RunWith(IoTDBTestRunner.class)
 @Category({MultiClusterIT2AutoCreateSchema.class})
 public class IoTDBPipeClusterIT extends AbstractPipeDualAutoIT {
 
+  private static final double SYNC_LAG_DELTA = 0.001;
+
   @Override
   @Before
   public void setUp() {
@@ -77,6 +83,10 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualAutoIT {
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
+    senderEnv
+        .getConfig()
+        .getDataNodeConfig()
+        .setMetricReporterType(Collections.singletonList("PROMETHEUS"));
 
     receiverEnv
         .getConfig()
@@ -89,6 +99,10 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualAutoIT {
         .setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
         .setPipeMemoryManagementEnabled(false)
         .setIsPipeEnableMemoryCheck(false);
+    receiverEnv
+        .getConfig()
+        .getDataNodeConfig()
+        .setMetricReporterType(Collections.singletonList("PROMETHEUS"));
 
     // 10 min, assert that the operations will not time out
     senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
@@ -302,40 +316,7 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualAutoIT {
       TestUtils.executeNonQueries(
           senderEnv, Arrays.asList("insert into root.db.d1(time,s1) values 
(1,1)", "flush"), null);
 
-      final AtomicInteger leaderPort = new AtomicInteger(-1);
-      final TShowRegionResp showRegionResp = client.showRegion(new 
TShowRegionReq());
-      showRegionResp
-          .getRegionInfoList()
-          .forEach(
-              regionInfo -> {
-                if 
(RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
-                  leaderPort.set(regionInfo.getClientRpcPort());
-                }
-              });
-
-      int leaderIndex = -1;
-      for (int i = 0; i < 3; ++i) {
-        if (senderEnv.getDataNodeWrapper(i).getPort() == leaderPort.get()) {
-          leaderIndex = i;
-          try {
-            senderEnv.shutdownDataNode(i);
-          } catch (final Throwable e) {
-            e.printStackTrace();
-            return;
-          }
-          try {
-            TimeUnit.SECONDS.sleep(1);
-          } catch (final InterruptedException ignored) {
-          }
-          try {
-            senderEnv.startDataNode(i);
-            ((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown();
-          } catch (final Throwable e) {
-            e.printStackTrace();
-            return;
-          }
-        }
-      }
+      final int leaderIndex = restartTreeDataRegionLeader(client, "root.db");
       if (leaderIndex == -1) { // ensure the leader is stopped
         fail();
       }
@@ -352,6 +333,7 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualAutoIT {
           "select count(*) from root.db.d1",
           "count(root.db.d1.s1),",
           Collections.singleton("2,"));
+      
waitForTreeDataRegionReplicationComplete(Collections.singletonList("root.db"));
     }
 
     try {
@@ -397,6 +379,139 @@ public class IoTDBPipeClusterIT extends 
AbstractPipeDualAutoIT {
     }
   }
 
+  private int restartTreeDataRegionLeader(
+      final SyncConfigNodeIServiceClient client, final String database) throws 
TException {
+    final List<TRegionInfo> leaderRegionInfoList =
+        showTreeDataRegionLeaders(Collections.singletonList(database), client);
+    if (leaderRegionInfoList.isEmpty()) {
+      return -1;
+    }
+
+    final TRegionInfo targetRegionInfo =
+        leaderRegionInfoList.stream()
+            .min(Comparator.comparingInt(regionInfo -> 
regionInfo.getConsensusGroupId().getId()))
+            .orElse(null);
+    if (targetRegionInfo == null) {
+      return -1;
+    }
+
+    final int leaderPort = targetRegionInfo.getClientRpcPort();
+    for (int i = 0; i < senderEnv.getDataNodeWrapperList().size(); ++i) {
+      if (senderEnv.getDataNodeWrapper(i).getPort() != leaderPort) {
+        continue;
+      }
+
+      try {
+        senderEnv.shutdownDataNode(i);
+      } catch (final Throwable e) {
+        e.printStackTrace();
+        return -1;
+      }
+
+      try {
+        TimeUnit.SECONDS.sleep(1);
+      } catch (final InterruptedException ignored) {
+        Thread.currentThread().interrupt();
+        return -1;
+      }
+
+      try {
+        senderEnv.startDataNode(i);
+        ((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown();
+      } catch (final Throwable e) {
+        e.printStackTrace();
+        return -1;
+      }
+      return i;
+    }
+    return -1;
+  }
+
+  private void waitForTreeDataRegionReplicationComplete(final List<String> 
databases) {
+    await()
+        .pollInterval(500, TimeUnit.MILLISECONDS)
+        .atMost(2, TimeUnit.MINUTES)
+        .untilAsserted(
+            () -> {
+              try (final SyncConfigNodeIServiceClient client =
+                  (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
+                final List<TRegionInfo> leaderRegionInfoList =
+                    showTreeDataRegionLeaders(databases, client);
+                Assert.assertFalse(
+                    "No tree DataRegion leader found for databases " + 
databases,
+                    leaderRegionInfoList.isEmpty());
+
+                for (final TRegionInfo regionInfo : leaderRegionInfoList) {
+                  final DataNodeWrapper leaderNode =
+                      findDataNodeWrapperByPort(regionInfo.getClientRpcPort());
+                  final String metricsUrl =
+                      "http://";
+                          + leaderNode.getIp()
+                          + ":"
+                          + leaderNode.getMetricPort()
+                          + "/metrics";
+                  final String metricsContent = 
senderEnv.getUrlContent(metricsUrl, null);
+                  Assert.assertNotNull(
+                      "Failed to fetch metrics from leader DataNode at " + 
metricsUrl,
+                      metricsContent);
+                  assertSyncLagIsZero(metricsContent, 
buildDataRegionTag(regionInfo), metricsUrl);
+                }
+              }
+            });
+  }
+
+  private List<TRegionInfo> showTreeDataRegionLeaders(
+      final List<String> databases, final SyncConfigNodeIServiceClient client) 
throws TException {
+    final TShowRegionResp showRegionResp =
+        client.showRegion(
+            new TShowRegionReq()
+                .setConsensusGroupType(TConsensusGroupType.DataRegion)
+                .setDatabases(databases));
+    Assert.assertEquals(
+        TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
showRegionResp.getStatus().getCode());
+    final List<TRegionInfo> result = new ArrayList<>();
+    for (final TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+      if 
(RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
+        result.add(regionInfo);
+      }
+    }
+    return result;
+  }
+
+  private DataNodeWrapper findDataNodeWrapperByPort(final int port) {
+    for (final DataNodeWrapper dataNodeWrapper : 
senderEnv.getDataNodeWrapperList()) {
+      if (dataNodeWrapper.getPort() == port) {
+        return dataNodeWrapper;
+      }
+    }
+    fail("Failed to find DataNodeWrapper for client rpc port " + port);
+    return null;
+  }
+
+  private String buildDataRegionTag(final TRegionInfo regionInfo) {
+    return "DataRegion[" + regionInfo.getConsensusGroupId().getId() + "]";
+  }
+
+  private void assertSyncLagIsZero(
+      final String metricsContent, final String dataRegionTag, final String 
metricsUrl) {
+    for (final String line : metricsContent.split("\\R")) {
+      if (!line.startsWith("iot_consensus{")
+          || !line.contains("type=\"syncLag\"")
+          || !line.contains("region=\"" + dataRegionTag + "\"")) {
+        continue;
+      }
+      final int lastSpaceIndex = line.lastIndexOf(' ');
+      Assert.assertTrue("Malformed syncLag metric line: " + line, 
lastSpaceIndex > 0);
+      Assert.assertEquals(
+          "Expected syncLag of " + dataRegionTag + " to be 0 at " + metricsUrl 
+ " but got " + line,
+          0.0,
+          Double.parseDouble(line.substring(lastSpaceIndex + 1)),
+          SYNC_LAG_DELTA);
+      return;
+    }
+    fail("No syncLag metric found for " + dataRegionTag + " at " + metricsUrl);
+  }
+
   @Test
   public void testPipeAfterRegisterNewDataNode() throws Exception {
     final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

Reply via email to