This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new e866b58800f Pipe: Stablized the tree model IoTDBPipeClusterIT (#17729)
e866b58800f is described below
commit e866b58800fc14b1059963a7846cf8ad392b8311
Author: Caideyipi <[email protected]>
AuthorDate: Mon May 25 15:34:51 2026 +0800
Pipe: Stablized the tree model IoTDBPipeClusterIT (#17729)
* Update IoTDBPipeClusterIT.java
* Fix
---
.../manual/enhanced/IoTDBPipeClusterIT.java | 8 +
.../auto/enhanced/IoTDBPipeClusterIT.java | 183 +++++++++++++++++----
2 files changed, 157 insertions(+), 34 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
index 3a0aaec2aed..244a1ff0c83 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/enhanced/IoTDBPipeClusterIT.java
@@ -89,6 +89,10 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
.setDnConnectionTimeoutMs(600000)
.setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
+ senderEnv
+ .getConfig()
+ .getDataNodeConfig()
+ .setMetricReporterType(Collections.singletonList("PROMETHEUS"));
receiverEnv
.getConfig()
@@ -102,6 +106,10 @@ public class IoTDBPipeClusterIT extends
AbstractPipeTableModelDualManualIT {
.setDnConnectionTimeoutMs(600000)
.setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
+ receiverEnv
+ .getConfig()
+ .getDataNodeConfig()
+ .setMetricReporterType(Collections.singletonList("PROMETHEUS"));
senderEnv.initClusterEnvironment(3, 3, 180);
receiverEnv.initClusterEnvironment(3, 3, 180);
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
index ff06e6531cf..48c2f7a0308 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/enhanced/IoTDBPipeClusterIT.java
@@ -19,11 +19,13 @@
package org.apache.iotdb.pipe.it.dual.treemodel.auto.enhanced;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.commons.cluster.RegionRoleType;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
+import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TShowPipeReq;
import org.apache.iotdb.confignode.rpc.thrift.TShowRegionReq;
@@ -54,18 +56,22 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.awaitility.Awaitility.await;
import static org.junit.Assert.fail;
@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2DualTreeAutoEnhanced.class})
public class IoTDBPipeClusterIT extends AbstractPipeDualTreeModelAutoIT {
+ private static final double SYNC_LAG_DELTA = 0.001;
+
@Override
@Before
public void setUp() {
@@ -82,6 +88,10 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualTreeModelAutoIT {
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
+ senderEnv
+ .getConfig()
+ .getDataNodeConfig()
+ .setMetricReporterType(Collections.singletonList("PROMETHEUS"));
senderEnv.getConfig().getConfigNodeConfig().setLeaderDistributionPolicy("HASH");
receiverEnv
@@ -95,6 +105,10 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualTreeModelAutoIT {
.setDataRegionConsensusProtocolClass(ConsensusFactory.IOT_CONSENSUS)
.setPipeMemoryManagementEnabled(false)
.setIsPipeEnableMemoryCheck(false);
+ receiverEnv
+ .getConfig()
+ .getDataNodeConfig()
+ .setMetricReporterType(Collections.singletonList("PROMETHEUS"));
// 10 min, assert that the operations will not time out
senderEnv.getConfig().getCommonConfig().setDnConnectionTimeoutMs(600000);
@@ -315,40 +329,7 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualTreeModelAutoIT {
Arrays.asList("insert into root.db.d1(time, s1) values (1, 1)",
"flush"),
null);
- final AtomicInteger leaderPort = new AtomicInteger(-1);
- final TShowRegionResp showRegionResp = client.showRegion(new
TShowRegionReq());
- showRegionResp
- .getRegionInfoList()
- .forEach(
- regionInfo -> {
- if
(RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
- leaderPort.set(regionInfo.getClientRpcPort());
- }
- });
-
- int leaderIndex = -1;
- for (int i = 0; i < 3; ++i) {
- if (senderEnv.getDataNodeWrapper(i).getPort() == leaderPort.get()) {
- leaderIndex = i;
- try {
- senderEnv.shutdownDataNode(i);
- } catch (final Throwable e) {
- e.printStackTrace();
- return;
- }
- try {
- TimeUnit.SECONDS.sleep(1);
- } catch (final InterruptedException ignored) {
- }
- try {
- senderEnv.startDataNode(i);
- ((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown();
- } catch (final Throwable e) {
- e.printStackTrace();
- return;
- }
- }
- }
+ final int leaderIndex = restartTreeDataRegionLeader(client, "root.db");
if (leaderIndex == -1) { // ensure the leader is stopped
fail();
}
@@ -363,6 +344,7 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualTreeModelAutoIT {
"select count(*) from root.db.d1",
"count(root.db.d1.s1),",
Collections.singleton("2,"));
+
waitForTreeDataRegionReplicationComplete(Collections.singletonList("root.db"));
}
try {
@@ -411,6 +393,139 @@ public class IoTDBPipeClusterIT extends
AbstractPipeDualTreeModelAutoIT {
}
}
+ private int restartTreeDataRegionLeader(
+ final SyncConfigNodeIServiceClient client, final String database) throws
TException {
+ final List<TRegionInfo> leaderRegionInfoList =
+ showTreeDataRegionLeaders(Collections.singletonList(database), client);
+ if (leaderRegionInfoList.isEmpty()) {
+ return -1;
+ }
+
+ final TRegionInfo targetRegionInfo =
+ leaderRegionInfoList.stream()
+ .min(Comparator.comparingInt(regionInfo ->
regionInfo.getConsensusGroupId().getId()))
+ .orElse(null);
+ if (targetRegionInfo == null) {
+ return -1;
+ }
+
+ final int leaderPort = targetRegionInfo.getClientRpcPort();
+ for (int i = 0; i < senderEnv.getDataNodeWrapperList().size(); ++i) {
+ if (senderEnv.getDataNodeWrapper(i).getPort() != leaderPort) {
+ continue;
+ }
+
+ try {
+ senderEnv.shutdownDataNode(i);
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ return -1;
+ }
+
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (final InterruptedException ignored) {
+ Thread.currentThread().interrupt();
+ return -1;
+ }
+
+ try {
+ senderEnv.startDataNode(i);
+ ((AbstractEnv) senderEnv).checkClusterStatusWithoutUnknown();
+ } catch (final Throwable e) {
+ e.printStackTrace();
+ return -1;
+ }
+ return i;
+ }
+ return -1;
+ }
+
+ private void waitForTreeDataRegionReplicationComplete(final List<String>
databases) {
+ await()
+ .pollInterval(500, TimeUnit.MILLISECONDS)
+ .atMost(2, TimeUnit.MINUTES)
+ .untilAsserted(
+ () -> {
+ try (final SyncConfigNodeIServiceClient client =
+ (SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
+ final List<TRegionInfo> leaderRegionInfoList =
+ showTreeDataRegionLeaders(databases, client);
+ Assert.assertFalse(
+ "No tree DataRegion leader found for databases " +
databases,
+ leaderRegionInfoList.isEmpty());
+
+ for (final TRegionInfo regionInfo : leaderRegionInfoList) {
+ final DataNodeWrapper leaderNode =
+ findDataNodeWrapperByPort(regionInfo.getClientRpcPort());
+ final String metricsUrl =
+ "http://"
+ + leaderNode.getIp()
+ + ":"
+ + leaderNode.getMetricPort()
+ + "/metrics";
+ final String metricsContent =
senderEnv.getUrlContent(metricsUrl, null);
+ Assert.assertNotNull(
+ "Failed to fetch metrics from leader DataNode at " +
metricsUrl,
+ metricsContent);
+ assertSyncLagIsZero(metricsContent,
buildDataRegionTag(regionInfo), metricsUrl);
+ }
+ }
+ });
+ }
+
+ private List<TRegionInfo> showTreeDataRegionLeaders(
+ final List<String> databases, final SyncConfigNodeIServiceClient client)
throws TException {
+ final TShowRegionResp showRegionResp =
+ client.showRegion(
+ new TShowRegionReq()
+ .setConsensusGroupType(TConsensusGroupType.DataRegion)
+ .setDatabases(databases));
+ Assert.assertEquals(
+ TSStatusCode.SUCCESS_STATUS.getStatusCode(),
showRegionResp.getStatus().getCode());
+ final List<TRegionInfo> result = new ArrayList<>();
+ for (final TRegionInfo regionInfo : showRegionResp.getRegionInfoList()) {
+ if
(RegionRoleType.Leader.getRoleType().equals(regionInfo.getRoleType())) {
+ result.add(regionInfo);
+ }
+ }
+ return result;
+ }
+
+ private DataNodeWrapper findDataNodeWrapperByPort(final int port) {
+ for (final DataNodeWrapper dataNodeWrapper :
senderEnv.getDataNodeWrapperList()) {
+ if (dataNodeWrapper.getPort() == port) {
+ return dataNodeWrapper;
+ }
+ }
+ fail("Failed to find DataNodeWrapper for client rpc port " + port);
+ return null;
+ }
+
+ private String buildDataRegionTag(final TRegionInfo regionInfo) {
+ return "DataRegion[" + regionInfo.getConsensusGroupId().getId() + "]";
+ }
+
+ private void assertSyncLagIsZero(
+ final String metricsContent, final String dataRegionTag, final String
metricsUrl) {
+ for (final String line : metricsContent.split("\\R")) {
+ if (!line.startsWith("iot_consensus{")
+ || !line.contains("type=\"syncLag\"")
+ || !line.contains("region=\"" + dataRegionTag + "\"")) {
+ continue;
+ }
+ final int lastSpaceIndex = line.lastIndexOf(' ');
+ Assert.assertTrue("Malformed syncLag metric line: " + line,
lastSpaceIndex > 0);
+ Assert.assertEquals(
+ "Expected syncLag of " + dataRegionTag + " to be 0 at " + metricsUrl
+ " but got " + line,
+ 0.0,
+ Double.parseDouble(line.substring(lastSpaceIndex + 1)),
+ SYNC_LAG_DELTA);
+ return;
+ }
+ fail("No syncLag metric found for " + dataRegionTag + " at " + metricsUrl);
+ }
+
@Test
public void testPipeAfterRegisterNewDataNode() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);