This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 96d55ed221b Improve IT cluster readiness diagnostics (#17903) (#17989)
96d55ed221b is described below
commit 96d55ed221b6a456e02097dc30cd455846acd5e2
Author: Caideyipi <[email protected]>
AuthorDate: Mon Jun 22 09:38:55 2026 +0800
Improve IT cluster readiness diagnostics (#17903) (#17989)
* Improve IT cluster readiness diagnostics
* Increase pipe IT cluster readiness retries
(cherry picked from commit ef0d9f8534fbbb01efb294ce548f53361fac8ccb)
---
.github/workflows/pipe-it-2cluster.yml | 6 +
.../org/apache/iotdb/it/env/MultiEnvFactory.java | 1 +
.../iotdb/it/env/cluster/env/AbstractEnv.java | 221 +++++++++++++++++++--
3 files changed, 216 insertions(+), 12 deletions(-)
diff --git a/.github/workflows/pipe-it-2cluster.yml
b/.github/workflows/pipe-it-2cluster.yml
index 6c5f11bc35c..89794a279f7 100644
--- a/.github/workflows/pipe-it-2cluster.yml
+++ b/.github/workflows/pipe-it-2cluster.yml
@@ -75,6 +75,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
+ -DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster }},${{ matrix.cluster
}} \
-pl integration-test \
@@ -161,6 +162,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
+ -DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }} \
-pl integration-test \
@@ -245,6 +247,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
+ -DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }} \
-pl integration-test \
@@ -329,6 +332,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
+ -DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }} \
-pl integration-test \
@@ -413,6 +417,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
+ -DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }} \
-pl integration-test \
@@ -498,6 +503,7 @@ jobs:
mvn clean verify \
-P with-integration-tests \
-DskipUTs \
+ -DintegrationTest.clusterReadyRetryCount=90 \
-DintegrationTest.forkCount=1 -DConfigNodeMaxHeapSize=256
-DDataNodeMaxHeapSize=1024 -DDataNodeMaxDirectMemorySize=768 \
-DClusterConfigurations=${{ matrix.cluster1 }},${{
matrix.cluster2 }},${{ matrix.cluster3 }} \
-pl integration-test \
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/MultiEnvFactory.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/MultiEnvFactory.java
index 5832f1c485b..f2e5f9c1f90 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/MultiEnvFactory.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/MultiEnvFactory.java
@@ -51,6 +51,7 @@ public class MultiEnvFactory {
/** Create several environments according to the specific number. */
public static void createEnv(final int num) {
// Not judge EnvType for individual test convenience
+ envList.clear();
final long startTime = System.currentTimeMillis();
for (int i = 0; i < num; ++i) {
try {
diff --git
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 07b7364d004..f2b7a103f52 100644
---
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.it.env.cluster.env;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.ClientPoolFactory;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.exception.ClientManagerException;
@@ -79,6 +80,9 @@ import static org.apache.iotdb.jdbc.Config.VERSION;
public abstract class AbstractEnv implements BaseEnv {
private static final Logger logger = IoTDBTestLogger.logger;
+ private static final int DEFAULT_CLUSTER_READY_RETRY_COUNT = 30;
+ private static final String CLUSTER_READY_RETRY_COUNT_PROPERTY =
+ "integrationTest.clusterReadyRetryCount";
private final Random rand = new Random();
protected List<ConfigNodeWrapper> configNodeWrapperList =
Collections.emptyList();
@@ -87,7 +91,7 @@ public abstract class AbstractEnv implements BaseEnv {
protected String testMethodName = null;
protected int index = 0;
protected long startTime;
- protected int retryCount = 30;
+ protected int retryCount = getDefaultClusterReadyRetryCount();
private IClientManager<TEndPoint, SyncConfigNodeIServiceClient>
clientManager;
private List<String> configNodeKillPoints = new ArrayList<>();
private List<String> dataNodeKillPoints = new ArrayList<>();
@@ -110,6 +114,12 @@ public abstract class AbstractEnv implements BaseEnv {
this.clusterConfig = new MppClusterConfig();
}
+ private static int getDefaultClusterReadyRetryCount() {
+ final int configuredRetryCount =
+ Integer.getInteger(CLUSTER_READY_RETRY_COUNT_PROPERTY,
DEFAULT_CLUSTER_READY_RETRY_COUNT);
+ return configuredRetryCount > 0 ? configuredRetryCount :
DEFAULT_CLUSTER_READY_RETRY_COUNT;
+ }
+
@Override
public ClusterConfig getConfig() {
return clusterConfig;
@@ -331,12 +341,14 @@ public abstract class AbstractEnv implements BaseEnv {
}
public void checkNodeInStatus(int nodeId, NodeStatus expectation) {
- checkClusterStatus(nodeStatusMap ->
expectation.getStatus().equals(nodeStatusMap.get(nodeId)));
+ checkClusterStatus(
+ nodeStatusMap ->
expectation.getStatus().equals(nodeStatusMap.get(nodeId)), m -> true);
}
public void checkClusterStatusWithoutUnknown() {
checkClusterStatus(
- nodeStatusMap ->
nodeStatusMap.values().stream().noneMatch("Unknown"::equals));
+ nodeStatusMap ->
nodeStatusMap.values().stream().noneMatch("Unknown"::equals),
+ processStatus -> processStatus.values().stream().noneMatch(i -> i !=
0));
testJDBCConnection();
}
@@ -346,6 +358,10 @@ public abstract class AbstractEnv implements BaseEnv {
Map<String, Integer> count = countNodeStatus(nodeStatus);
return count.getOrDefault("Unknown", 0) == 1
&& count.getOrDefault("Running", 0) == nodeStatus.size() - 1;
+ },
+ processStatus -> {
+ long aliveProcessCount = processStatus.values().stream().filter(i ->
i == 0).count();
+ return aliveProcessCount == processStatus.size() - 1;
});
testJDBCConnection();
}
@@ -357,19 +373,52 @@ public abstract class AbstractEnv implements BaseEnv {
* @param statusCheck the predicate to test the status of nodes
*/
public void checkClusterStatus(final Predicate<Map<Integer, String>>
statusCheck) {
+ checkClusterStatus(
+ statusCheck, processStatus ->
processStatus.values().stream().noneMatch(i -> i != 0));
+ }
+
+ /**
+ * check whether all nodes' status match the provided predicate with RPC.
after retryCount times,
+ * if the status of all nodes still not match the predicate, throw
AssertionError.
+ *
+ * @param nodeStatusCheck the predicate to test the status of nodes
+ * @param processStatusCheck the predicate to test the status of processes
+ */
+ public void checkClusterStatus(
+ final Predicate<Map<Integer, String>> nodeStatusCheck,
+ final Predicate<Map<AbstractNodeWrapper, Integer>> processStatusCheck) {
logger.info("Testing cluster environment...");
TShowClusterResp showClusterResp;
Exception lastException = null;
- boolean flag;
+ boolean passed;
+ boolean showClusterPassed = true;
+ boolean nodeSizePassed = true;
+ boolean nodeStatusPassed = true;
+ boolean processStatusPassed = true;
+ TSStatus showClusterStatus = null;
+ int actualNodeSize = 0;
+ Map<Integer, String> lastNodeStatus = null;
+ Map<AbstractNodeWrapper, Integer> processStatusMap = new HashMap<>();
+
for (int i = 0; i < retryCount; i++) {
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) {
- flag = true;
+ passed = true;
+ showClusterPassed = true;
+ nodeSizePassed = true;
+ nodeStatusPassed = true;
+ processStatusPassed = true;
+ processStatusMap.clear();
+
showClusterResp = client.showCluster();
+ showClusterStatus = showClusterResp.getStatus();
+ actualNodeSize = showClusterResp.getNodeStatusSize();
+ lastNodeStatus = showClusterResp.getNodeStatus();
// Check resp status
if (showClusterResp.getStatus().getCode() !=
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
- flag = false;
+ passed = false;
+ showClusterPassed = false;
}
// Check the number of nodes
@@ -377,18 +426,36 @@ public abstract class AbstractEnv implements BaseEnv {
!= configNodeWrapperList.size()
+ dataNodeWrapperList.size()
+ aiNodeWrapperList.size()) {
- flag = false;
+ passed = false;
+ nodeSizePassed = false;
}
// Check the status of nodes
- if (flag) {
- flag = statusCheck.test(showClusterResp.getNodeStatus());
+ if (passed) {
+ passed = nodeStatusCheck.test(showClusterResp.getNodeStatus());
+ if (!passed) {
+ nodeStatusPassed = false;
+ }
+ }
+
+ collectProcessStatus(processStatusMap);
+ processStatusPassed = processStatusCheck.test(processStatusMap);
+ if (!processStatusPassed) {
+ passed = false;
+ handleProcessStatus(processStatusMap);
}
- if (flag) {
+ if (passed) {
logger.info("The cluster is now ready for testing!");
return;
}
+ logger.info(
+ "Retry {}: showClusterPassed={}, nodeSizePassed={},
nodeStatusPassed={}, processStatusPassed={}",
+ i,
+ showClusterPassed,
+ nodeSizePassed,
+ nodeStatusPassed,
+ processStatusPassed);
} catch (final Exception e) {
lastException = e;
}
@@ -405,8 +472,132 @@ public abstract class AbstractEnv implements BaseEnv {
lastException.getMessage(),
lastException);
}
+ if (!showClusterPassed) {
+ logger.error("Show cluster failed: {}", showClusterStatus);
+ }
+ if (!nodeSizePassed) {
+ logger.error("Only {} nodes detected", actualNodeSize);
+ }
+ if (!nodeStatusPassed) {
+ logger.error("Some node status incorrect: {}", lastNodeStatus);
+ }
+ if (!processStatusPassed) {
+ logger.error("Some process status incorrect: {}",
formatProcessStatus(processStatusMap));
+ }
+
+ dumpTestJVMSnapshotQuietly("cluster status check failed");
throw new AssertionError(
- String.format("After %d times retry, the cluster can't work!",
retryCount));
+ buildClusterStatusFailureMessage(
+ showClusterPassed,
+ nodeSizePassed,
+ nodeStatusPassed,
+ processStatusPassed,
+ showClusterStatus,
+ actualNodeSize,
+ lastNodeStatus,
+ processStatusMap,
+ lastException));
+ }
+
+ private void collectProcessStatus(final Map<AbstractNodeWrapper, Integer>
processStatusMap)
+ throws InterruptedException {
+ final List<AbstractNodeWrapper> allNodeWrappers = new ArrayList<>();
+ allNodeWrappers.addAll(configNodeWrapperList);
+ allNodeWrappers.addAll(dataNodeWrapperList);
+ allNodeWrappers.addAll(aiNodeWrapperList);
+ for (final AbstractNodeWrapper nodeWrapper : allNodeWrappers) {
+ final Process process = nodeWrapper.getInstance();
+ if (process == null) {
+ processStatusMap.put(nodeWrapper, -1);
+ } else {
+ processStatusMap.put(nodeWrapper, process.isAlive() ? 0 :
process.waitFor());
+ }
+ }
+ }
+
+ private String buildClusterStatusFailureMessage(
+ final boolean showClusterPassed,
+ final boolean nodeSizePassed,
+ final boolean nodeStatusPassed,
+ final boolean processStatusPassed,
+ final TSStatus showClusterStatus,
+ final int actualNodeSize,
+ final Map<Integer, String> lastNodeStatus,
+ final Map<AbstractNodeWrapper, Integer> processStatusMap,
+ final Exception lastException) {
+ final StringBuilder builder =
+ new StringBuilder(
+ String.format("After %d times retry, the cluster status check
failed", retryCount));
+ builder
+ .append(": showClusterPassed=")
+ .append(showClusterPassed)
+ .append(", nodeSizePassed=")
+ .append(nodeSizePassed)
+ .append(", nodeStatusPassed=")
+ .append(nodeStatusPassed)
+ .append(", processStatusPassed=")
+ .append(processStatusPassed)
+ .append(", expectedNodeSize=")
+ .append(
+ configNodeWrapperList.size() + dataNodeWrapperList.size() +
aiNodeWrapperList.size())
+ .append(", actualNodeSize=")
+ .append(actualNodeSize);
+ if (showClusterStatus != null) {
+ builder.append(", showClusterStatus=").append(showClusterStatus);
+ }
+ if (lastNodeStatus != null) {
+ builder.append(", lastNodeStatus=").append(lastNodeStatus);
+ }
+ if (!processStatusMap.isEmpty()) {
+ builder.append(",
processStatus=").append(formatProcessStatus(processStatusMap));
+ }
+ if (lastException != null) {
+ builder
+ .append(", lastException=")
+ .append(lastException.getClass().getName())
+ .append(": ")
+ .append(lastException.getMessage());
+ }
+ builder.append(", logDirs=").append(getClusterLogDirs());
+ return builder.toString();
+ }
+
+ private Map<String, Integer> formatProcessStatus(
+ final Map<AbstractNodeWrapper, Integer> processStatusMap) {
+ final Map<String, Integer> result = new LinkedHashMap<>();
+ processStatusMap.forEach(
+ (nodeWrapper, statusCode) -> result.put(nodeWrapper.getId(),
statusCode));
+ return result;
+ }
+
+ private List<String> getClusterLogDirs() {
+ final List<AbstractNodeWrapper> allNodeWrappers = new ArrayList<>();
+ allNodeWrappers.addAll(configNodeWrapperList);
+ allNodeWrappers.addAll(dataNodeWrapperList);
+ allNodeWrappers.addAll(aiNodeWrapperList);
+ return allNodeWrappers.stream()
+ .map(AbstractNodeWrapper::getLogDirPath)
+ .distinct()
+ .collect(Collectors.toList());
+ }
+
+ private void dumpTestJVMSnapshotQuietly(final String reason) {
+ try {
+ logger.info("Dumping test JVM snapshots because {}.", reason);
+ dumpTestJVMSnapshot();
+ } catch (final Exception e) {
+ logger.warn("Failed to dump test JVM snapshots after {}", reason, e);
+ }
+ }
+
+ private void handleProcessStatus(final Map<AbstractNodeWrapper, Integer>
processStatusMap) {
+ for (final Map.Entry<AbstractNodeWrapper, Integer> entry :
processStatusMap.entrySet()) {
+ final Integer statusCode = entry.getValue();
+ final AbstractNodeWrapper nodeWrapper = entry.getKey();
+ if (statusCode != 0) {
+ logger.info("Node {} is not running due to {}", nodeWrapper.getId(),
statusCode);
+ }
+ }
}
@Override
@@ -698,6 +889,7 @@ public abstract class AbstractEnv implements BaseEnv {
.collect(Collectors.toList());
final RequestDelegate<Void> testDelegate =
new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT, this);
+ final Map<String, String> lastConnectionFailures =
Collections.synchronizedMap(new HashMap<>());
for (final DataNodeWrapper dataNode : dataNodeWrapperList) {
final String dataNodeEndpoint = dataNode.getIpAndPortString();
testDelegate.addRequest(
@@ -716,6 +908,8 @@ public abstract class AbstractEnv implements BaseEnv {
return null;
} catch (final Exception e) {
lastException = e;
+ lastConnectionFailures.put(
+ dataNodeEndpoint, e.getClass().getName() + ": " +
e.getMessage());
TimeUnit.SECONDS.sleep(1L);
}
}
@@ -729,8 +923,11 @@ public abstract class AbstractEnv implements BaseEnv {
testDelegate.requestAll();
} catch (final Exception e) {
logger.error("exception in test Cluster with RPC, message: {}",
e.getMessage(), e);
+ dumpTestJVMSnapshotQuietly("JDBC connection check failed");
throw new AssertionError(
- String.format("After %d times retry, the cluster can't work!",
retryCount));
+ String.format(
+ "After %d times retry, JDBC connections to DataNodes are not
ready. endpoints=%s, lastConnectionFailures=%s, logDirs=%s",
+ retryCount, endpoints, lastConnectionFailures,
getClusterLogDirs()));
}
}