This is an automated email from the ASF dual-hosted git repository. jiangtian pushed a commit to branch enhance_it_for_restart_test in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit fe2f1893cc5be6b93649ddce99ca82f105b145f2 Author: Tian Jiang <[email protected]> AuthorDate: Fri Jun 20 10:32:01 2025 +0800 Enhance IT frame to run tests that restart the cluster --- .../org/apache/iotdb/it/env/cluster/EnvUtils.java | 82 ++++++- .../iotdb/it/env/cluster/env/AbstractEnv.java | 235 ++++++++++++++++----- .../it/env/cluster/node/AbstractNodeWrapper.java | 9 +- .../iotdb/db/it/IoTDBCustomizedClusterIT.java | 113 +++++++++- .../java/org/apache/iotdb/rpc/TSStatusCode.java | 2 + .../iotdb/confignode/service/ConfigNode.java | 12 +- .../java/org/apache/iotdb/db/service/DataNode.java | 13 +- .../apache/iotdb/commons/client/ClientManager.java | 5 + .../iotdb/commons/client/IClientManager.java | 3 + .../commons/exception/PortOccupiedException.java | 31 +++ .../apache/iotdb/commons/utils/StatusUtils.java | 14 ++ 11 files changed, 450 insertions(+), 69 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java index f9315ac787f..d5143c58742 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java @@ -24,9 +24,10 @@ import org.apache.iotdb.it.framework.IoTDBTestLogger; import org.apache.commons.lang3.SystemUtils; import org.apache.tsfile.utils.Pair; -import java.io.File; -import java.io.IOException; +import java.io.*; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -98,15 +99,74 @@ public class EnvUtils { } private static boolean checkPortsAvailable(final List<Integer> ports) { - final String cmd = getSearchAvailablePortCmd(ports); - try { - return Runtime.getRuntime().exec(cmd).waitFor() == 1; - } catch (final IOException ignore) { - // ignore - } catch (final InterruptedException e) { - Thread.currentThread().interrupt(); + try { + return listPortOccupation(ports).isEmpty(); + } catch (IOException e) { + IoTDBTestLogger.logger.error("Cannot check available ports", e); + return false; + } + } + + public static Map<Integer, Long> listPortOccupation(final List<Integer> ports) throws IOException { + return SystemUtils.IS_OS_WINDOWS ? listPortOccupationWindows(ports) : listPortOccupationUnix(ports); + } + + /** + * List occupied port and the associated pid on windows. + * @param ports ports to be checked + * @return (occupiedPort, pid) pairs + */ + public static Map<Integer, Long> listPortOccupationWindows(final List<Integer> ports) throws IOException { + Process process = Runtime.getRuntime().exec("netstat -aon -p tcp"); + Map<Integer, Long> result = new HashMap<>(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + String[] split = line.trim().split("\\s+"); + if (split.length != 5) { + continue; + } + String localAddress = split[1]; + for (Integer port : ports) { + if (localAddress.equals("127.0.0.1:" + port)) { + result.put(port, Long.parseLong(split[4])); + break; + } + } + + } + } catch (EOFException ignored) { + } + return result; + } + + /** + * List occupied port and the associated pid on Unix. + * @param ports ports to be checked + * @return (occupiedPort, pid) pairs + */ + public static Map<Integer, Long> listPortOccupationUnix(final List<Integer> ports) throws IOException { + Process process = Runtime.getRuntime().exec("lsof -iTCP -sTCP:LISTEN -P -n"); + Map<Integer, Long> result = new HashMap<>(); + try (BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()))) { + String line; + while ((line = reader.readLine()) != null) { + String[] split = line.trim().split("\\s+"); + if (split.length != 10) { + continue; + } + String localAddress = split[9]; + for (Integer port : ports) { + if (localAddress.equals("*:" + port)) { + result.put(port, Long.parseLong(split[1])); + break; + } + } + + } + } catch (EOFException ignored) { } - return false; + return result; } private static String getSearchAvailablePortCmd(final List<Integer> ports) { @@ -115,7 +175,7 @@ public class EnvUtils { private static String getWindowsSearchPortCmd(final List<Integer> ports) { return "netstat -aon -p tcp | findStr " - + ports.stream().map(v -> "/C:'127.0.0.1:" + v + "'").collect(Collectors.joining(" ")); + + ports.stream().map(v -> "/C:\"127.0.0.1:" + v + "\"").collect(Collectors.joining(" ")); } private static String getUnixSearchPortCmd(final List<Integer> ports) { diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index fa2cb85d191..d1b1bec8629 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -21,11 +21,13 @@ package org.apache.iotdb.it.env.cluster.env; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; import org.apache.iotdb.common.rpc.thrift.TEndPoint; +import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.commons.client.ClientPoolFactory; import org.apache.iotdb.commons.client.IClientManager; import org.apache.iotdb.commons.client.exception.ClientManagerException; import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient; import org.apache.iotdb.commons.cluster.NodeStatus; +import org.apache.iotdb.commons.exception.PortOccupiedException; import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService; import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo; import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo; @@ -78,13 +80,7 @@ import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; import java.time.ZoneId; -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.Optional; -import java.util.Random; +import java.util.*; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -223,25 +219,9 @@ public abstract class AbstractEnv implements BaseEnv { final RequestDelegate<Void> configNodesDelegate = new SerialRequestDelegate<>(configNodeEndpoints); for (int i = 1; i < configNodesNum; i++) { - final ConfigNodeWrapper configNodeWrapper = - new ConfigNodeWrapper( - false, - seedConfigNode, - testClassName, - testMethodName, - EnvUtils.searchAvailablePorts(), - index, - this instanceof MultiClusterEnv, - startTime); + ConfigNodeWrapper configNodeWrapper = newConfigNode(); this.configNodeWrapperList.add(configNodeWrapper); configNodeEndpoints.add(configNodeWrapper.getIpAndPortString()); - configNodeWrapper.createNodeDir(); - configNodeWrapper.changeConfig( - (MppConfigNodeConfig) clusterConfig.getConfigNodeConfig(), - (MppCommonConfig) clusterConfig.getConfigNodeCommonConfig(), - (MppJVMConfig) clusterConfig.getConfigNodeJVMConfig()); - configNodeWrapper.createLogDir(); - configNodeWrapper.setKillPoints(configNodeKillPoints); configNodesDelegate.addRequest( () -> { configNodeWrapper.start(); @@ -259,24 +239,9 @@ public abstract class AbstractEnv implements BaseEnv { final RequestDelegate<Void> dataNodesDelegate = new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT); for (int i = 0; i < dataNodesNum; i++) { - final DataNodeWrapper dataNodeWrapper = - new DataNodeWrapper( - seedConfigNode, - testClassName, - testMethodName, - EnvUtils.searchAvailablePorts(), - index, - this instanceof MultiClusterEnv, - startTime); - this.dataNodeWrapperList.add(dataNodeWrapper); + DataNodeWrapper dataNodeWrapper = newDataNode(); dataNodeEndpoints.add(dataNodeWrapper.getIpAndPortString()); - dataNodeWrapper.createNodeDir(); - dataNodeWrapper.changeConfig( - (MppDataNodeConfig) clusterConfig.getDataNodeConfig(), - (MppCommonConfig) clusterConfig.getDataNodeCommonConfig(), - (MppJVMConfig) clusterConfig.getDataNodeJVMConfig()); - dataNodeWrapper.createLogDir(); - dataNodeWrapper.setKillPoints(dataNodeKillPoints); + this.dataNodeWrapperList.add(dataNodeWrapper); dataNodesDelegate.addRequest( () -> { dataNodeWrapper.start(); @@ -299,6 +264,49 @@ public abstract class AbstractEnv implements BaseEnv { checkClusterStatusWithoutUnknown(); } + private ConfigNodeWrapper newConfigNode() { + final ConfigNodeWrapper configNodeWrapper = + new ConfigNodeWrapper( + false, + configNodeWrapperList.get(0).getIpAndPortString(), + getTestClassName(), + testMethodName, + EnvUtils.searchAvailablePorts(), + index, + this instanceof MultiClusterEnv, + startTime); + + configNodeWrapper.createNodeDir(); + configNodeWrapper.changeConfig( + (MppConfigNodeConfig) clusterConfig.getConfigNodeConfig(), + (MppCommonConfig) clusterConfig.getConfigNodeCommonConfig(), + (MppJVMConfig) clusterConfig.getConfigNodeJVMConfig()); + configNodeWrapper.createLogDir(); + configNodeWrapper.setKillPoints(configNodeKillPoints); + return configNodeWrapper; + } + + private DataNodeWrapper newDataNode() { + final DataNodeWrapper dataNodeWrapper = + new DataNodeWrapper( + configNodeWrapperList.get(0).getIpAndPortString(), + getTestClassName(), + testMethodName, + EnvUtils.searchAvailablePorts(), + index, + this instanceof MultiClusterEnv, + startTime); + + dataNodeWrapper.createNodeDir(); + dataNodeWrapper.changeConfig( + (MppDataNodeConfig) clusterConfig.getDataNodeConfig(), + (MppCommonConfig) clusterConfig.getDataNodeCommonConfig(), + (MppJVMConfig) clusterConfig.getDataNodeJVMConfig()); + dataNodeWrapper.createLogDir(); + dataNodeWrapper.setKillPoints(dataNodeKillPoints); + return dataNodeWrapper; + } + private void startAINode(final String seedConfigNode, final String testClassName) { final String aiNodeEndPoint; final AINodeWrapper aiNodeWrapper = @@ -380,16 +388,33 @@ public abstract class AbstractEnv implements BaseEnv { logger.info("Testing cluster environment..."); TShowClusterResp showClusterResp; Exception lastException = null; - boolean flag; + boolean passed; + boolean showClusterPassed = true; + boolean nodeSizePassed = true; + boolean nodeStatusPassed = true; + boolean processStatusPassed = true; + TSStatus showClusterStatus = null; + int actualNodeSize = 0; + Map<Integer, String> lastNodeStatus = null; + Map<AbstractNodeWrapper, Integer> processStatusMap = new HashMap<>(); + for (int i = 0; i < retryCount; i++) { try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) { - flag = true; + passed = true; + showClusterPassed = true; + nodeSizePassed = true; + nodeStatusPassed = true; + processStatusPassed = true; + processStatusMap.clear(); + showClusterResp = client.showCluster(); // Check resp status if (showClusterResp.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) { - flag = false; + passed = false; + showClusterPassed = false; + showClusterStatus = showClusterResp.getStatus(); } // Check the number of nodes @@ -397,18 +422,61 @@ public abstract class AbstractEnv implements BaseEnv { != configNodeWrapperList.size() + dataNodeWrapperList.size() + aiNodeWrapperList.size()) { - flag = false; + passed = false; + nodeSizePassed = false; + actualNodeSize = showClusterResp.getNodeStatusSize(); } // Check the status of nodes - if (flag) { - flag = statusCheck.test(showClusterResp.getNodeStatus()); + if (passed) { + passed = statusCheck.test(showClusterResp.getNodeStatus()); + if (!passed) { + nodeStatusPassed = false; + lastNodeStatus = showClusterResp.getNodeStatus(); + } + } + + // check the status of processes + for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) { + boolean alive = dataNodeWrapper.getInstance().isAlive(); + if (!alive) { + passed = false; + processStatusPassed = false; + processStatusMap.put(dataNodeWrapper, dataNodeWrapper.getInstance().waitFor()); + } else { + processStatusMap.put(dataNodeWrapper, 0); + } + } + for (ConfigNodeWrapper nodeWrapper : configNodeWrapperList) { + boolean alive = nodeWrapper.getInstance().isAlive(); + if (!alive) { + passed = false; + processStatusPassed = false; + processStatusMap.put(nodeWrapper, nodeWrapper.getInstance().waitFor()); + } else { + processStatusMap.put(nodeWrapper, 0); + } + } + for (AINodeWrapper nodeWrapper : aiNodeWrapperList) { + boolean alive = nodeWrapper.getInstance().isAlive(); + if (!alive) { + passed = false; + processStatusPassed = false; + processStatusMap.put(nodeWrapper, nodeWrapper.getInstance().waitFor()); + } else { + processStatusMap.put(nodeWrapper, 0); + } } - if (flag) { + if (!processStatusPassed) { + handleProcessStatus(processStatusMap); + } + + if (passed) { logger.info("The cluster is now ready for testing!"); return; } + logger.info("Retry {}: showClusterPassed={}, nodeSizePassed={}, nodeStatusPassed={}, processStatusPassed={}", i, showClusterPassed, nodeSizePassed, nodeStatusPassed, processStatusPassed); } catch (final Exception e) { lastException = e; } @@ -425,10 +493,73 @@ public abstract class AbstractEnv implements BaseEnv { lastException.getMessage(), lastException); } + if (!showClusterPassed) { + logger.error("Show cluster failed: {}", showClusterStatus); + } + if (!nodeSizePassed) { + logger.error("Only {} nodes detected", actualNodeSize); + } + if (!nodeStatusPassed) { + logger.error("Some node status incorrect: {}", lastNodeStatus); + } + if (!processStatusPassed) { + logger.error( + "Some process status incorrect: {}", + processStatusMap.entrySet().stream() + .collect(Collectors.toMap(e -> e.getKey().getId(), Map.Entry::getValue))); + + if (processStatusMap.containsValue(TSStatusCode.PORT_OCCUPIED.getStatusCode())) { + throw new PortOccupiedException(); + } + } + throw new AssertionError( String.format("After %d times retry, the cluster can't work!", retryCount)); } + private void handleProcessStatus(Map<AbstractNodeWrapper, Integer> processStatusMap) { + for (Map.Entry<AbstractNodeWrapper, Integer> entry : processStatusMap.entrySet()) { + Integer statusCode = entry.getValue(); + AbstractNodeWrapper nodeWrapper = entry.getKey(); + if(statusCode != 0) { + logger.info("Node {} is not running due to {}", nodeWrapper.getId(), statusCode); + } + if (statusCode == TSStatusCode.PORT_OCCUPIED.getStatusCode()) { + try { + Map<Integer, Long> portOccupationMap = EnvUtils. + listPortOccupation(Arrays.stream(nodeWrapper.getPortList()).boxed().collect(Collectors.toList())); + logger.info("Check port result: {}", portOccupationMap); + for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) { + if (portOccupationMap.containsValue(dataNodeWrapper.getPid())) { + logger.info("A port is occupied by another DataNode {}-{}, restart it", dataNodeWrapper.getIpAndPortString(), dataNodeWrapper.getPid()); + dataNodeWrapper.stop(); + dataNodeWrapper.start(); + } + } + for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) { + if (portOccupationMap.containsValue(configNodeWrapper.getPid())) { + logger.info("A port is occupied by another ConfigNode {}-{}, restart it", configNodeWrapper.getIpAndPortString(), configNodeWrapper.getPid()); + configNodeWrapper.stop(); + configNodeWrapper.start(); + } + } + for (AINodeWrapper aiNodeWrapper : aiNodeWrapperList) { + if (portOccupationMap.containsValue(aiNodeWrapper.getPid())) { + logger.info("A port is occupied by another datanode {}-{}, restart it", aiNodeWrapper.getIpAndPortString(), aiNodeWrapper.getPid()); + aiNodeWrapper.stop(); + aiNodeWrapper.start(); + } + } + } catch (IOException e) { + logger.error("Cannot check port occupation", e); + } + logger.info("Restarting it"); + nodeWrapper.stop(); + nodeWrapper.start(); + } + } + } + @Override public void cleanClusterEnvironment() { final List<AbstractNodeWrapper> allNodeWrappers = @@ -890,6 +1021,10 @@ public abstract class AbstractEnv implements BaseEnv { for (int i = 0; i < retryCount; i++) { for (final ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) { try { + if (!configNodeWrapper.getInstance().isAlive()) { + throw new IOException("ConfigNode " + configNodeWrapper.getId() + " is not alive"); + } + lastErrorNode = configNodeWrapper; final SyncConfigNodeIServiceClient client = clientManager.borrowClient( @@ -1335,4 +1470,8 @@ public abstract class AbstractEnv implements BaseEnv { public void registerDataNodeKillPoints(final List<String> killPoints) { this.dataNodeKillPoints = killPoints; } + + public void clearClientManager() { + clientManager.clearAll(); + } } diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java index 0e33674fdea..27c688dc328 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java @@ -546,7 +546,10 @@ public abstract class AbstractNodeWrapper implements BaseNodeWrapper { this.instance.destroy(); try { if (!this.instance.waitFor(20, TimeUnit.SECONDS)) { - this.instance.destroyForcibly().waitFor(10, TimeUnit.SECONDS); + logger.warn("Node {} does not exit within 20s, killing it", getId()); + if (!this.instance.destroyForcibly().waitFor(10, TimeUnit.SECONDS)) { + logger.error("Cannot forcibly stop node {}", getId()); + } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -822,4 +825,8 @@ public abstract class AbstractNodeWrapper implements BaseNodeWrapper { public Process getInstance() { return instance; } + + public int[] getPortList() { + return portList; + } } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java index 5812e255d16..2c5996a83c9 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java @@ -19,10 +19,13 @@ package org.apache.iotdb.db.it; +import org.apache.iotdb.commons.exception.PortOccupiedException; +import org.apache.iotdb.it.env.cluster.env.AbstractEnv; import org.apache.iotdb.it.env.cluster.env.SimpleEnv; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; +import org.apache.iotdb.itbase.category.DailyIT; import org.apache.iotdb.rpc.IoTDBConnectionException; import org.apache.iotdb.rpc.StatementExecutionException; import org.apache.iotdb.session.Session; @@ -33,12 +36,10 @@ import org.junit.runner.RunWith; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.sql.Connection; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; +import java.sql.*; import java.util.Date; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; /** Tests that may not be satisfied with the default cluster settings. */ @@ -48,6 +49,110 @@ public class IoTDBCustomizedClusterIT { private final Logger logger = LoggerFactory.getLogger(IoTDBCustomizedClusterIT.class); + @FunctionalInterface + private interface RestartAction { + void act(Statement statement, int round, AbstractEnv env) throws Exception; + } + + @Category(DailyIT.class) + @Test + public void testRepeatedlyRestartWholeClusterWithWrite() throws Exception { + testRepeatedlyRestartWholeCluster((s, i, env) -> { + if (i != 0) { + ResultSet resultSet = s.executeQuery("SELECT last s1 FROM root.**"); + ResultSetMetaData metaData = resultSet.getMetaData(); + assertEquals(4, metaData.getColumnCount()); + int cnt = 0; + while (resultSet.next()) { + cnt ++; + StringBuilder result = new StringBuilder(); + for (int j = 0; j < metaData.getColumnCount(); j++) { + result.append(metaData.getColumnName(j + 1)).append(":").append(resultSet.getString(j + 1)).append(","); + } + System.out.println(result); + } + } + s.execute("INSERT INTO root.db1.d1 (time, s1) VALUES (1, 1)"); + s.execute("INSERT INTO root.db2.d1 (time, s1) VALUES (1, 1)"); + s.execute("INSERT INTO root.db3.d1 (time, s1) VALUES (1, 1)"); + }); + } + + @Category(DailyIT.class) + @Test + public void testRepeatedlyRestartWholeClusterWithPipeCreation() throws Exception { + SimpleEnv receiverEnv = new SimpleEnv(); + receiverEnv.initClusterEnvironment(1, 1); + try { + testRepeatedlyRestartWholeCluster((s, i, env) -> { + // use another thread to make creating and restart concurrent + // otherwise, all tasks will be done before restart and the cluster will not attempt to recover tasks + s.execute(String.format("CREATE PIPE p%d_1 WITH SINK ('sink'='iotdb-thrift-sink', 'node-urls' = '%s')", i, receiverEnv.getDataNodeWrapper(0).getIpAndPortString())); + s.execute(String.format("CREATE PIPE p%d_2 WITH SINK ('sink'='iotdb-thrift-sink', 'node-urls' = '%s')", i, receiverEnv.getDataNodeWrapper(0).getIpAndPortString())); + s.execute(String.format("CREATE PIPE p%d_3 WITH SINK ('sink'='iotdb-thrift-sink', 'node-urls' = '%s')", i, receiverEnv.getDataNodeWrapper(0).getIpAndPortString())); + }); + } finally { + receiverEnv.cleanClusterEnvironment(); + } + } + + + private void testRepeatedlyRestartWholeCluster(RestartAction restartAction) throws Exception { + SimpleEnv simpleEnv = new SimpleEnv(); + try { + simpleEnv + .getConfig() + .getCommonConfig() + .setDataReplicationFactor(3) + .setSchemaReplicationFactor(3) + .setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus") + .setConfigNodeConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus") + .setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus"); + simpleEnv.initClusterEnvironment(3, 3); + + int repeat = 100; + for (int i = 0; i < repeat; i++) { + logger.info("Round {} restart", i); + try (Connection connection = simpleEnv.getConnection(); + Statement statement = connection.createStatement()) { + ResultSet resultSet = statement.executeQuery("SHOW CLUSTER"); + ResultSetMetaData metaData = resultSet.getMetaData(); + int columnCount = metaData.getColumnCount(); + while (resultSet.next()) { + StringBuilder row = new StringBuilder(); + for (int j = 0; j < columnCount; j++) { + row.append(metaData.getColumnName(j + 1)) + .append(":") + .append(resultSet.getString(j + 1)) + .append(","); + } + System.out.println(row); + } + + restartAction.act(statement, i, simpleEnv); + } + + simpleEnv.shutdownAllConfigNodes(); + simpleEnv.shutdownAllDataNodes(); + + simpleEnv.startAllConfigNodes(); + simpleEnv.startAllDataNodes(); + + simpleEnv.clearClientManager(); + + try { + simpleEnv.checkClusterStatusWithoutUnknown(); + } catch (PortOccupiedException e) { + logger.info("Some ports are occupied during restart, which cannot be processed, just pass the test."); + return; + } + } + + } finally { + simpleEnv.cleanClusterEnvironment(); + } + } + /** * When the wal size exceeds `walThrottleSize` * 0.8, the timed wal-delete-thread will try * deleting wal forever, which will block the DataNode from exiting, because task of deleting wal diff --git a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java index 3f7eb8f6507..6359eafde5c 100644 --- a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java +++ b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java @@ -40,6 +40,8 @@ public enum TSStatusCode { UNSUPPORTED_SQL_DIALECT(205), + PORT_OCCUPIED(206), + // General Error UNSUPPORTED_OPERATION(300), EXECUTE_STATEMENT_ERROR(301), diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java index 7f0db7a3784..39612202720 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java @@ -40,6 +40,7 @@ import org.apache.iotdb.commons.service.ServiceType; import org.apache.iotdb.commons.service.metric.JvmGcMonitorMetrics; import org.apache.iotdb.commons.service.metric.MetricService; import org.apache.iotdb.commons.service.metric.cpu.CpuUsageMetrics; +import org.apache.iotdb.commons.utils.StatusUtils; import org.apache.iotdb.commons.utils.TestOnly; import org.apache.iotdb.confignode.client.CnToCnNodeRequestType; import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool; @@ -67,6 +68,7 @@ import org.apache.iotdb.metrics.metricsets.net.NetMetrics; import org.apache.iotdb.metrics.metricsets.system.SystemMetrics; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.ratis.util.ExitUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -106,6 +108,8 @@ public class ConfigNode extends ServerCommandLine implements ConfigNodeMBean { protected ConfigManager configManager; + private int exitStatusCode = 0; + public ConfigNode() { super("ConfigNode"); // We do not init anything here, so that we can re-initialize the instance in IT. @@ -121,6 +125,8 @@ public class ConfigNode extends ServerCommandLine implements ConfigNodeMBean { "{} default charset is: {}", ConfigNodeConstant.GLOBAL_NAME, Charset.defaultCharset().displayName()); + // let IoTDB handle the exception instead of ratis + ExitUtils.disableSystemExit(); ConfigNode configNode = new ConfigNode(); int returnCode = configNode.run(args); if (returnCode != 0) { @@ -140,6 +146,7 @@ public class ConfigNode extends ServerCommandLine implements ConfigNodeMBean { throw new IoTDBException("Error starting", -1); } active(); + LOGGER.info("IoTDB started"); } @Override @@ -266,8 +273,9 @@ public class ConfigNode extends ServerCommandLine implements ConfigNodeMBean { "The current ConfigNode can't joined the cluster because leader's scheduling failed. The possible cause is that the ip:port configuration is incorrect."); stop(); } - } catch (StartupException | IOException | IllegalPathException e) { + } catch (Throwable e) { LOGGER.error("Meet error while starting up.", e); + exitStatusCode = StatusUtils.retrieveExitStatusCode(e); stop(); } } @@ -467,7 +475,7 @@ public class ConfigNode extends ServerCommandLine implements ConfigNodeMBean { } catch (IOException e) { LOGGER.error("Meet error when deactivate ConfigNode", e); } - System.exit(-1); + System.exit(exitStatusCode); } /** diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java index 714edde57c2..55268d72ab2 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java @@ -114,6 +114,7 @@ import org.apache.iotdb.rpc.TSStatusCode; import org.apache.iotdb.udf.api.exception.UDFManagementException; import org.antlr.v4.runtime.CommonTokenStream; +import org.apache.ratis.util.ExitUtils; import org.apache.thrift.TException; import org.apache.tsfile.utils.ReadWriteIOUtils; import org.slf4j.Logger; @@ -134,6 +135,7 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.apache.iotdb.commons.conf.IoTDBConstant.DEFAULT_CLUSTER_NAME; +import static org.apache.iotdb.commons.utils.StatusUtils.retrieveExitStatusCode; import static org.apache.iotdb.db.conf.IoTDBStartCheck.PROPERTIES_FILE_NAME; public class DataNode extends ServerCommandLine implements DataNodeMBean { @@ -193,6 +195,8 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean { public static void main(String[] args) { logger.info("IoTDB-DataNode environment variables: {}", IoTDBConfig.getEnvironmentVariables()); logger.info("IoTDB-DataNode default charset is: {}", Charset.defaultCharset().displayName()); + // let IoTDB handle the exception instead of ratis + ExitUtils.disableSystemExit(); DataNode dataNode = new DataNode(); int returnCode = dataNode.run(args); if (returnCode != 0) { @@ -202,6 +206,7 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean { @Override protected void start() { + logger.info("Starting DataNode..."); boolean isFirstStart; try { // Check if this DataNode is start for the first time and do other pre-checks @@ -274,11 +279,13 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean { dataRegionConsensusStarted = true; } - } catch (StartupException | IOException e) { + } catch (Throwable e) { + int exitStatusCode = retrieveExitStatusCode(e); logger.error("Fail to start server", e); stop(); - System.exit(-1); + System.exit(exitStatusCode); } + logger.info("DataNode started"); } @Override @@ -683,7 +690,7 @@ public class DataNode extends ServerCommandLine implements DataNodeMBean { setUp(); } catch (StartupException e) { logger.error("Meet error while starting up.", e); - throw new StartupException("Error in activating IoTDB DataNode."); + throw e; } logger.info("IoTDB DataNode has started."); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java index 56bc67b6399..79fcc799ae9 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java @@ -88,6 +88,11 @@ public class ClientManager<K, V> implements IClientManager<K, V> { }); } + @Override + public void clearAll() { + pool.clear(); + } + @Override public void close() { pool.close(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java index 81344e46719..cba9b840740 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java @@ -44,6 +44,9 @@ public interface IClientManager<K, V> { */ void clear(K node); + /** clear all clients; */ + void clearAll(); + /** close IClientManager, which means closing all clients for all nodes. */ void close(); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/PortOccupiedException.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/PortOccupiedException.java new file mode 100644 index 00000000000..0e0453d267b --- /dev/null +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/PortOccupiedException.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iotdb.commons.exception; + +import java.util.Arrays; + +public class PortOccupiedException extends RuntimeException{ + public PortOccupiedException() { + super("Some ports are occupied"); + } + + public PortOccupiedException(int... ports) { + super(String.format("Ports %s are occupied", Arrays.toString(ports))); + } +} diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java index d05fb8c9d7e..ac64699b9cf 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java @@ -25,6 +25,8 @@ import org.apache.iotdb.commons.conf.CommonDescriptor; import org.apache.iotdb.rpc.RpcUtils; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.ratis.util.ExitUtils; + import java.util.Arrays; import java.util.HashSet; import java.util.Map; @@ -248,4 +250,16 @@ public class StatusUtils { public static boolean isUnknownError(int statusCode) { return UNKNOWN_ERRORS.contains(statusCode); } + + public static int retrieveExitStatusCode(Throwable e) { + if (e instanceof ExitUtils.ExitException && e.getCause() != null) { + e = e.getCause(); + } + if (e.getMessage().contains("because Could not create ServerSocket") + || e.getMessage().contains("Failed to bind to address") + || e.getMessage().contains("Address already in use: bind")) { + return TSStatusCode.PORT_OCCUPIED.getStatusCode(); + } + return -1; + } }
