This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch enhance_it_for_restart_test
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit fe2f1893cc5be6b93649ddce99ca82f105b145f2
Author: Tian Jiang <[email protected]>
AuthorDate: Fri Jun 20 10:32:01 2025 +0800

    Enhance IT frame to run tests that restart the cluster
---
 .../org/apache/iotdb/it/env/cluster/EnvUtils.java  |  82 ++++++-
 .../iotdb/it/env/cluster/env/AbstractEnv.java      | 235 ++++++++++++++++-----
 .../it/env/cluster/node/AbstractNodeWrapper.java   |   9 +-
 .../iotdb/db/it/IoTDBCustomizedClusterIT.java      | 113 +++++++++-
 .../java/org/apache/iotdb/rpc/TSStatusCode.java    |   2 +
 .../iotdb/confignode/service/ConfigNode.java       |  12 +-
 .../java/org/apache/iotdb/db/service/DataNode.java |  13 +-
 .../apache/iotdb/commons/client/ClientManager.java |   5 +
 .../iotdb/commons/client/IClientManager.java       |   3 +
 .../commons/exception/PortOccupiedException.java   |  31 +++
 .../apache/iotdb/commons/utils/StatusUtils.java    |  14 ++
 11 files changed, 450 insertions(+), 69 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java
index f9315ac787f..d5143c58742 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java
@@ -24,9 +24,10 @@ import org.apache.iotdb.it.framework.IoTDBTestLogger;
 import org.apache.commons.lang3.SystemUtils;
 import org.apache.tsfile.utils.Pair;
 
-import java.io.File;
-import java.io.IOException;
+import java.io.*;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
@@ -98,15 +99,74 @@ public class EnvUtils {
   }
 
   private static boolean checkPortsAvailable(final List<Integer> ports) {
-    final String cmd = getSearchAvailablePortCmd(ports);
-    try {
-      return Runtime.getRuntime().exec(cmd).waitFor() == 1;
-    } catch (final IOException ignore) {
-      // ignore
-    } catch (final InterruptedException e) {
-      Thread.currentThread().interrupt();
+      try {
+          return listPortOccupation(ports).isEmpty();
+      } catch (IOException e) {
+        IoTDBTestLogger.logger.error("Cannot check available ports", e);
+        return false;
+      }
+  }
+
+  public static Map<Integer, Long> listPortOccupation(final List<Integer> 
ports) throws IOException {
+    return SystemUtils.IS_OS_WINDOWS ? listPortOccupationWindows(ports) : 
listPortOccupationUnix(ports);
+  }
+
+  /**
+   * List occupied port and the associated pid on windows.
+   * @param ports ports to be checked
+   * @return (occupiedPort, pid) pairs
+   */
+  public static Map<Integer, Long> listPortOccupationWindows(final 
List<Integer> ports) throws IOException {
+    Process process = Runtime.getRuntime().exec("netstat -aon -p tcp");
+    Map<Integer, Long> result = new HashMap<>();
+    try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(process.getInputStream()))) {
+      String line;
+      while ((line = reader.readLine()) != null) {
+        String[] split = line.trim().split("\\s+");
+        if (split.length != 5) {
+          continue;
+        }
+        String localAddress = split[1];
+        for (Integer port : ports) {
+          if (localAddress.equals("127.0.0.1:" + port)) {
+            result.put(port, Long.parseLong(split[4]));
+            break;
+          }
+        }
+
+      }
+    } catch (EOFException ignored) {
+    }
+    return result;
+  }
+
+  /**
+   * List occupied port and the associated pid on Unix.
+   * @param ports ports to be checked
+   * @return (occupiedPort, pid) pairs
+   */
+  public static Map<Integer, Long> listPortOccupationUnix(final List<Integer> 
ports) throws IOException {
+    Process process = Runtime.getRuntime().exec("lsof -iTCP -sTCP:LISTEN -P 
-n");
+    Map<Integer, Long> result = new HashMap<>();
+    try (BufferedReader reader = new BufferedReader(new 
InputStreamReader(process.getInputStream()))) {
+      String line;
+      while ((line = reader.readLine()) != null) {
+        String[] split = line.trim().split("\\s+");
+        if (split.length != 10) {
+          continue;
+        }
+        String localAddress = split[9];
+        for (Integer port : ports) {
+          if (localAddress.equals("*:" + port)) {
+            result.put(port, Long.parseLong(split[1]));
+            break;
+          }
+        }
+
+      }
+    } catch (EOFException ignored) {
     }
-    return false;
+    return result;
   }
 
   private static String getSearchAvailablePortCmd(final List<Integer> ports) {
@@ -115,7 +175,7 @@ public class EnvUtils {
 
   private static String getWindowsSearchPortCmd(final List<Integer> ports) {
     return "netstat -aon -p tcp | findStr "
-        + ports.stream().map(v -> "/C:'127.0.0.1:" + v + 
"'").collect(Collectors.joining(" "));
+        + ports.stream().map(v -> "/C:\"127.0.0.1:" + v + 
"\"").collect(Collectors.joining(" "));
   }
 
   private static String getUnixSearchPortCmd(final List<Integer> ports) {
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index fa2cb85d191..d1b1bec8629 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -21,11 +21,13 @@ package org.apache.iotdb.it.env.cluster.env;
 
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
 import org.apache.iotdb.commons.client.exception.ClientManagerException;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.commons.cluster.NodeStatus;
+import org.apache.iotdb.commons.exception.PortOccupiedException;
 import org.apache.iotdb.confignode.rpc.thrift.IConfigNodeRPCService;
 import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
 import org.apache.iotdb.confignode.rpc.thrift.TRegionInfo;
@@ -78,13 +80,7 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Random;
+import java.util.*;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
@@ -223,25 +219,9 @@ public abstract class AbstractEnv implements BaseEnv {
     final RequestDelegate<Void> configNodesDelegate =
         new SerialRequestDelegate<>(configNodeEndpoints);
     for (int i = 1; i < configNodesNum; i++) {
-      final ConfigNodeWrapper configNodeWrapper =
-          new ConfigNodeWrapper(
-              false,
-              seedConfigNode,
-              testClassName,
-              testMethodName,
-              EnvUtils.searchAvailablePorts(),
-              index,
-              this instanceof MultiClusterEnv,
-              startTime);
+      ConfigNodeWrapper configNodeWrapper = newConfigNode();
       this.configNodeWrapperList.add(configNodeWrapper);
       configNodeEndpoints.add(configNodeWrapper.getIpAndPortString());
-      configNodeWrapper.createNodeDir();
-      configNodeWrapper.changeConfig(
-          (MppConfigNodeConfig) clusterConfig.getConfigNodeConfig(),
-          (MppCommonConfig) clusterConfig.getConfigNodeCommonConfig(),
-          (MppJVMConfig) clusterConfig.getConfigNodeJVMConfig());
-      configNodeWrapper.createLogDir();
-      configNodeWrapper.setKillPoints(configNodeKillPoints);
       configNodesDelegate.addRequest(
           () -> {
             configNodeWrapper.start();
@@ -259,24 +239,9 @@ public abstract class AbstractEnv implements BaseEnv {
     final RequestDelegate<Void> dataNodesDelegate =
         new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT);
     for (int i = 0; i < dataNodesNum; i++) {
-      final DataNodeWrapper dataNodeWrapper =
-          new DataNodeWrapper(
-              seedConfigNode,
-              testClassName,
-              testMethodName,
-              EnvUtils.searchAvailablePorts(),
-              index,
-              this instanceof MultiClusterEnv,
-              startTime);
-      this.dataNodeWrapperList.add(dataNodeWrapper);
+      DataNodeWrapper dataNodeWrapper = newDataNode();
       dataNodeEndpoints.add(dataNodeWrapper.getIpAndPortString());
-      dataNodeWrapper.createNodeDir();
-      dataNodeWrapper.changeConfig(
-          (MppDataNodeConfig) clusterConfig.getDataNodeConfig(),
-          (MppCommonConfig) clusterConfig.getDataNodeCommonConfig(),
-          (MppJVMConfig) clusterConfig.getDataNodeJVMConfig());
-      dataNodeWrapper.createLogDir();
-      dataNodeWrapper.setKillPoints(dataNodeKillPoints);
+      this.dataNodeWrapperList.add(dataNodeWrapper);
       dataNodesDelegate.addRequest(
           () -> {
             dataNodeWrapper.start();
@@ -299,6 +264,49 @@ public abstract class AbstractEnv implements BaseEnv {
     checkClusterStatusWithoutUnknown();
   }
 
+  private ConfigNodeWrapper newConfigNode() {
+    final ConfigNodeWrapper configNodeWrapper =
+        new ConfigNodeWrapper(
+            false,
+            configNodeWrapperList.get(0).getIpAndPortString(),
+            getTestClassName(),
+            testMethodName,
+            EnvUtils.searchAvailablePorts(),
+            index,
+            this instanceof MultiClusterEnv,
+            startTime);
+
+    configNodeWrapper.createNodeDir();
+    configNodeWrapper.changeConfig(
+        (MppConfigNodeConfig) clusterConfig.getConfigNodeConfig(),
+        (MppCommonConfig) clusterConfig.getConfigNodeCommonConfig(),
+        (MppJVMConfig) clusterConfig.getConfigNodeJVMConfig());
+    configNodeWrapper.createLogDir();
+    configNodeWrapper.setKillPoints(configNodeKillPoints);
+    return configNodeWrapper;
+  }
+
+  private DataNodeWrapper newDataNode() {
+    final DataNodeWrapper dataNodeWrapper =
+        new DataNodeWrapper(
+            configNodeWrapperList.get(0).getIpAndPortString(),
+            getTestClassName(),
+            testMethodName,
+            EnvUtils.searchAvailablePorts(),
+            index,
+            this instanceof MultiClusterEnv,
+            startTime);
+
+    dataNodeWrapper.createNodeDir();
+    dataNodeWrapper.changeConfig(
+        (MppDataNodeConfig) clusterConfig.getDataNodeConfig(),
+        (MppCommonConfig) clusterConfig.getDataNodeCommonConfig(),
+        (MppJVMConfig) clusterConfig.getDataNodeJVMConfig());
+    dataNodeWrapper.createLogDir();
+    dataNodeWrapper.setKillPoints(dataNodeKillPoints);
+    return dataNodeWrapper;
+  }
+
   private void startAINode(final String seedConfigNode, final String 
testClassName) {
     final String aiNodeEndPoint;
     final AINodeWrapper aiNodeWrapper =
@@ -380,16 +388,33 @@ public abstract class AbstractEnv implements BaseEnv {
     logger.info("Testing cluster environment...");
     TShowClusterResp showClusterResp;
     Exception lastException = null;
-    boolean flag;
+    boolean passed;
+    boolean showClusterPassed = true;
+    boolean nodeSizePassed = true;
+    boolean nodeStatusPassed = true;
+    boolean processStatusPassed = true;
+    TSStatus showClusterStatus = null;
+    int actualNodeSize = 0;
+    Map<Integer, String> lastNodeStatus = null;
+    Map<AbstractNodeWrapper, Integer> processStatusMap = new HashMap<>();
+
     for (int i = 0; i < retryCount; i++) {
       try (final SyncConfigNodeIServiceClient client =
           (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) {
-        flag = true;
+        passed = true;
+        showClusterPassed = true;
+        nodeSizePassed = true;
+        nodeStatusPassed = true;
+        processStatusPassed = true;
+        processStatusMap.clear();
+
         showClusterResp = client.showCluster();
 
         // Check resp status
         if (showClusterResp.getStatus().getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-          flag = false;
+          passed = false;
+          showClusterPassed = false;
+          showClusterStatus = showClusterResp.getStatus();
         }
 
         // Check the number of nodes
@@ -397,18 +422,61 @@ public abstract class AbstractEnv implements BaseEnv {
             != configNodeWrapperList.size()
                 + dataNodeWrapperList.size()
                 + aiNodeWrapperList.size()) {
-          flag = false;
+          passed = false;
+          nodeSizePassed = false;
+          actualNodeSize = showClusterResp.getNodeStatusSize();
         }
 
         // Check the status of nodes
-        if (flag) {
-          flag = statusCheck.test(showClusterResp.getNodeStatus());
+        if (passed) {
+          passed = statusCheck.test(showClusterResp.getNodeStatus());
+          if (!passed) {
+            nodeStatusPassed = false;
+            lastNodeStatus = showClusterResp.getNodeStatus();
+          }
+        }
+
+        // check the status of processes
+        for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) {
+          boolean alive = dataNodeWrapper.getInstance().isAlive();
+          if (!alive) {
+            passed = false;
+            processStatusPassed = false;
+            processStatusMap.put(dataNodeWrapper, 
dataNodeWrapper.getInstance().waitFor());
+          } else {
+            processStatusMap.put(dataNodeWrapper, 0);
+          }
+        }
+        for (ConfigNodeWrapper nodeWrapper : configNodeWrapperList) {
+          boolean alive = nodeWrapper.getInstance().isAlive();
+          if (!alive) {
+            passed = false;
+            processStatusPassed = false;
+            processStatusMap.put(nodeWrapper, 
nodeWrapper.getInstance().waitFor());
+          } else {
+            processStatusMap.put(nodeWrapper, 0);
+          }
+        }
+        for (AINodeWrapper nodeWrapper : aiNodeWrapperList) {
+          boolean alive = nodeWrapper.getInstance().isAlive();
+          if (!alive) {
+            passed = false;
+            processStatusPassed = false;
+            processStatusMap.put(nodeWrapper, 
nodeWrapper.getInstance().waitFor());
+          } else {
+            processStatusMap.put(nodeWrapper, 0);
+          }
         }
 
-        if (flag) {
+        if (!processStatusPassed) {
+          handleProcessStatus(processStatusMap);
+        }
+
+        if (passed) {
           logger.info("The cluster is now ready for testing!");
           return;
         }
+        logger.info("Retry {}: showClusterPassed={}, nodeSizePassed={}, 
nodeStatusPassed={}, processStatusPassed={}", i, showClusterPassed, 
nodeSizePassed, nodeStatusPassed, processStatusPassed);
       } catch (final Exception e) {
         lastException = e;
       }
@@ -425,10 +493,73 @@ public abstract class AbstractEnv implements BaseEnv {
           lastException.getMessage(),
           lastException);
     }
+    if (!showClusterPassed) {
+      logger.error("Show cluster failed: {}", showClusterStatus);
+    }
+    if (!nodeSizePassed) {
+      logger.error("Only {} nodes detected", actualNodeSize);
+    }
+    if (!nodeStatusPassed) {
+      logger.error("Some node status incorrect: {}", lastNodeStatus);
+    }
+    if (!processStatusPassed) {
+      logger.error(
+          "Some process status incorrect: {}",
+          processStatusMap.entrySet().stream()
+              .collect(Collectors.toMap(e -> e.getKey().getId(), 
Map.Entry::getValue)));
+
+      if 
(processStatusMap.containsValue(TSStatusCode.PORT_OCCUPIED.getStatusCode())) {
+        throw new PortOccupiedException();
+      }
+    }
+
     throw new AssertionError(
         String.format("After %d times retry, the cluster can't work!", 
retryCount));
   }
 
+  private void handleProcessStatus(Map<AbstractNodeWrapper, Integer> 
processStatusMap) {
+    for (Map.Entry<AbstractNodeWrapper, Integer> entry : 
processStatusMap.entrySet()) {
+      Integer statusCode = entry.getValue();
+      AbstractNodeWrapper nodeWrapper = entry.getKey();
+      if(statusCode != 0) {
+        logger.info("Node {} is not running due to {}", nodeWrapper.getId(), 
statusCode);
+      }
+      if (statusCode == TSStatusCode.PORT_OCCUPIED.getStatusCode()) {
+          try {
+            Map<Integer, Long> portOccupationMap = EnvUtils.
+                    
listPortOccupation(Arrays.stream(nodeWrapper.getPortList()).boxed().collect(Collectors.toList()));
+            logger.info("Check port result: {}", portOccupationMap);
+            for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) {
+              if (portOccupationMap.containsValue(dataNodeWrapper.getPid())) {
+                logger.info("A port is occupied by another DataNode {}-{}, 
restart it", dataNodeWrapper.getIpAndPortString(), dataNodeWrapper.getPid());
+                dataNodeWrapper.stop();
+                dataNodeWrapper.start();
+              }
+            }
+            for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
+              if (portOccupationMap.containsValue(configNodeWrapper.getPid())) 
{
+                logger.info("A port is occupied by another ConfigNode {}-{}, 
restart it", configNodeWrapper.getIpAndPortString(), 
configNodeWrapper.getPid());
+                configNodeWrapper.stop();
+                configNodeWrapper.start();
+              }
+            }
+            for (AINodeWrapper aiNodeWrapper : aiNodeWrapperList) {
+              if (portOccupationMap.containsValue(aiNodeWrapper.getPid())) {
+                logger.info("A port is occupied by another datanode {}-{}, 
restart it", aiNodeWrapper.getIpAndPortString(), aiNodeWrapper.getPid());
+                aiNodeWrapper.stop();
+                aiNodeWrapper.start();
+              }
+            }
+          } catch (IOException e) {
+              logger.error("Cannot check port occupation", e);
+          }
+          logger.info("Restarting it");
+        nodeWrapper.stop();
+        nodeWrapper.start();
+      }
+    }
+  }
+
   @Override
   public void cleanClusterEnvironment() {
     final List<AbstractNodeWrapper> allNodeWrappers =
@@ -890,6 +1021,10 @@ public abstract class AbstractEnv implements BaseEnv {
     for (int i = 0; i < retryCount; i++) {
       for (final ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
         try {
+          if (!configNodeWrapper.getInstance().isAlive()) {
+            throw new IOException("ConfigNode " + configNodeWrapper.getId() + 
" is not alive");
+          }
+
           lastErrorNode = configNodeWrapper;
           final SyncConfigNodeIServiceClient client =
               clientManager.borrowClient(
@@ -1335,4 +1470,8 @@ public abstract class AbstractEnv implements BaseEnv {
   public void registerDataNodeKillPoints(final List<String> killPoints) {
     this.dataNodeKillPoints = killPoints;
   }
+
+  public void clearClientManager() {
+    clientManager.clearAll();
+  }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index 0e33674fdea..27c688dc328 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -546,7 +546,10 @@ public abstract class AbstractNodeWrapper implements 
BaseNodeWrapper {
     this.instance.destroy();
     try {
       if (!this.instance.waitFor(20, TimeUnit.SECONDS)) {
-        this.instance.destroyForcibly().waitFor(10, TimeUnit.SECONDS);
+        logger.warn("Node {} does not exit within 20s, killing it", getId());
+        if (!this.instance.destroyForcibly().waitFor(10, TimeUnit.SECONDS)) {
+          logger.error("Cannot forcibly stop node {}", getId());
+        }
       }
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
@@ -822,4 +825,8 @@ public abstract class AbstractNodeWrapper implements 
BaseNodeWrapper {
   public Process getInstance() {
     return instance;
   }
+
+  public int[] getPortList() {
+    return portList;
+  }
 }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java
index 5812e255d16..2c5996a83c9 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBCustomizedClusterIT.java
@@ -19,10 +19,13 @@
 
 package org.apache.iotdb.db.it;
 
+import org.apache.iotdb.commons.exception.PortOccupiedException;
+import org.apache.iotdb.it.env.cluster.env.AbstractEnv;
 import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.ClusterIT;
+import org.apache.iotdb.itbase.category.DailyIT;
 import org.apache.iotdb.rpc.IoTDBConnectionException;
 import org.apache.iotdb.rpc.StatementExecutionException;
 import org.apache.iotdb.session.Session;
@@ -33,12 +36,10 @@ import org.junit.runner.RunWith;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.*;
 import java.util.Date;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
 /** Tests that may not be satisfied with the default cluster settings. */
@@ -48,6 +49,110 @@ public class IoTDBCustomizedClusterIT {
 
   private final Logger logger = 
LoggerFactory.getLogger(IoTDBCustomizedClusterIT.class);
 
+  @FunctionalInterface
+  private interface RestartAction {
+    void act(Statement statement, int round, AbstractEnv env) throws Exception;
+  }
+
+  @Category(DailyIT.class)
+  @Test
+  public void testRepeatedlyRestartWholeClusterWithWrite() throws Exception {
+    testRepeatedlyRestartWholeCluster((s, i, env) -> {
+      if (i != 0) {
+        ResultSet resultSet = s.executeQuery("SELECT last s1 FROM root.**");
+        ResultSetMetaData metaData = resultSet.getMetaData();
+        assertEquals(4, metaData.getColumnCount());
+        int cnt = 0;
+        while (resultSet.next()) {
+          cnt ++;
+          StringBuilder result = new StringBuilder();
+          for (int j = 0; j < metaData.getColumnCount(); j++) {
+            result.append(metaData.getColumnName(j + 
1)).append(":").append(resultSet.getString(j + 1)).append(",");
+          }
+          System.out.println(result);
+        }
+      }
+      s.execute("INSERT INTO root.db1.d1 (time, s1) VALUES (1, 1)");
+      s.execute("INSERT INTO root.db2.d1 (time, s1) VALUES (1, 1)");
+      s.execute("INSERT INTO root.db3.d1 (time, s1) VALUES (1, 1)");
+    });
+  }
+
+  @Category(DailyIT.class)
+  @Test
+  public void testRepeatedlyRestartWholeClusterWithPipeCreation() throws 
Exception {
+    SimpleEnv receiverEnv = new SimpleEnv();
+    receiverEnv.initClusterEnvironment(1, 1);
+    try {
+      testRepeatedlyRestartWholeCluster((s, i, env) -> {
+        // use another thread to make creating and restart concurrent
+        // otherwise, all tasks will be done before restart and the cluster 
will not attempt to recover tasks
+        s.execute(String.format("CREATE PIPE p%d_1 WITH SINK 
('sink'='iotdb-thrift-sink', 'node-urls' = '%s')", i, 
receiverEnv.getDataNodeWrapper(0).getIpAndPortString()));
+        s.execute(String.format("CREATE PIPE p%d_2 WITH SINK 
('sink'='iotdb-thrift-sink', 'node-urls' = '%s')", i, 
receiverEnv.getDataNodeWrapper(0).getIpAndPortString()));
+        s.execute(String.format("CREATE PIPE p%d_3 WITH SINK 
('sink'='iotdb-thrift-sink', 'node-urls' = '%s')", i, 
receiverEnv.getDataNodeWrapper(0).getIpAndPortString()));
+      });
+    } finally {
+      receiverEnv.cleanClusterEnvironment();
+    }
+  }
+
+
+  private void testRepeatedlyRestartWholeCluster(RestartAction restartAction) 
throws Exception {
+    SimpleEnv simpleEnv = new SimpleEnv();
+    try {
+      simpleEnv
+              .getConfig()
+              .getCommonConfig()
+              .setDataReplicationFactor(3)
+              .setSchemaReplicationFactor(3)
+              
.setSchemaRegionConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus")
+              
.setConfigNodeConsensusProtocolClass("org.apache.iotdb.consensus.ratis.RatisConsensus")
+              
.setDataRegionConsensusProtocolClass("org.apache.iotdb.consensus.iot.IoTConsensus");
+      simpleEnv.initClusterEnvironment(3, 3);
+
+      int repeat = 100;
+      for (int i = 0; i < repeat; i++) {
+        logger.info("Round {} restart", i);
+        try (Connection connection = simpleEnv.getConnection();
+             Statement statement = connection.createStatement()) {
+          ResultSet resultSet = statement.executeQuery("SHOW CLUSTER");
+          ResultSetMetaData metaData = resultSet.getMetaData();
+          int columnCount = metaData.getColumnCount();
+          while (resultSet.next()) {
+            StringBuilder row = new StringBuilder();
+            for (int j = 0; j < columnCount; j++) {
+              row.append(metaData.getColumnName(j + 1))
+                      .append(":")
+                      .append(resultSet.getString(j + 1))
+                      .append(",");
+            }
+            System.out.println(row);
+          }
+
+          restartAction.act(statement, i, simpleEnv);
+        }
+
+        simpleEnv.shutdownAllConfigNodes();
+        simpleEnv.shutdownAllDataNodes();
+
+        simpleEnv.startAllConfigNodes();
+        simpleEnv.startAllDataNodes();
+
+        simpleEnv.clearClientManager();
+
+        try {
+          simpleEnv.checkClusterStatusWithoutUnknown();
+        } catch (PortOccupiedException e) {
+          logger.info("Some ports are occupied during restart, which cannot be 
processed, just pass the test.");
+          return;
+        }
+      }
+
+    } finally {
+      simpleEnv.cleanClusterEnvironment();
+    }
+  }
+
   /**
    * When the wal size exceeds `walThrottleSize` * 0.8, the timed 
wal-delete-thread will try
    * deleting wal forever, which will block the DataNode from exiting, because 
task of deleting wal
diff --git 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
index 3f7eb8f6507..6359eafde5c 100644
--- 
a/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
+++ 
b/iotdb-client/service-rpc/src/main/java/org/apache/iotdb/rpc/TSStatusCode.java
@@ -40,6 +40,8 @@ public enum TSStatusCode {
 
   UNSUPPORTED_SQL_DIALECT(205),
 
+  PORT_OCCUPIED(206),
+
   // General Error
   UNSUPPORTED_OPERATION(300),
   EXECUTE_STATEMENT_ERROR(301),
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
index 7f0db7a3784..39612202720 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/service/ConfigNode.java
@@ -40,6 +40,7 @@ import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.service.metric.JvmGcMonitorMetrics;
 import org.apache.iotdb.commons.service.metric.MetricService;
 import org.apache.iotdb.commons.service.metric.cpu.CpuUsageMetrics;
+import org.apache.iotdb.commons.utils.StatusUtils;
 import org.apache.iotdb.commons.utils.TestOnly;
 import org.apache.iotdb.confignode.client.CnToCnNodeRequestType;
 import org.apache.iotdb.confignode.client.sync.SyncConfigNodeClientPool;
@@ -67,6 +68,7 @@ import org.apache.iotdb.metrics.metricsets.net.NetMetrics;
 import org.apache.iotdb.metrics.metricsets.system.SystemMetrics;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.ratis.util.ExitUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -106,6 +108,8 @@ public class ConfigNode extends ServerCommandLine 
implements ConfigNodeMBean {
 
   protected ConfigManager configManager;
 
+  private int exitStatusCode = 0;
+
   public ConfigNode() {
     super("ConfigNode");
     // We do not init anything here, so that we can re-initialize the instance 
in IT.
@@ -121,6 +125,8 @@ public class ConfigNode extends ServerCommandLine 
implements ConfigNodeMBean {
         "{} default charset is: {}",
         ConfigNodeConstant.GLOBAL_NAME,
         Charset.defaultCharset().displayName());
+    // let IoTDB handle the exception instead of ratis
+    ExitUtils.disableSystemExit();
     ConfigNode configNode = new ConfigNode();
     int returnCode = configNode.run(args);
     if (returnCode != 0) {
@@ -140,6 +146,7 @@ public class ConfigNode extends ServerCommandLine 
implements ConfigNodeMBean {
       throw new IoTDBException("Error starting", -1);
     }
     active();
+    LOGGER.info("IoTDB started");
   }
 
   @Override
@@ -266,8 +273,9 @@ public class ConfigNode extends ServerCommandLine 
implements ConfigNodeMBean {
             "The current ConfigNode can't joined the cluster because leader's 
scheduling failed. The possible cause is that the ip:port configuration is 
incorrect.");
         stop();
       }
-    } catch (StartupException | IOException | IllegalPathException e) {
+    } catch (Throwable e) {
       LOGGER.error("Meet error while starting up.", e);
+      exitStatusCode = StatusUtils.retrieveExitStatusCode(e);
       stop();
     }
   }
@@ -467,7 +475,7 @@ public class ConfigNode extends ServerCommandLine 
implements ConfigNodeMBean {
     } catch (IOException e) {
       LOGGER.error("Meet error when deactivate ConfigNode", e);
     }
-    System.exit(-1);
+    System.exit(exitStatusCode);
   }
 
   /**
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
index 714edde57c2..55268d72ab2 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/service/DataNode.java
@@ -114,6 +114,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.udf.api.exception.UDFManagementException;
 
 import org.antlr.v4.runtime.CommonTokenStream;
+import org.apache.ratis.util.ExitUtils;
 import org.apache.thrift.TException;
 import org.apache.tsfile.utils.ReadWriteIOUtils;
 import org.slf4j.Logger;
@@ -134,6 +135,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 import static org.apache.iotdb.commons.conf.IoTDBConstant.DEFAULT_CLUSTER_NAME;
+import static 
org.apache.iotdb.commons.utils.StatusUtils.retrieveExitStatusCode;
 import static org.apache.iotdb.db.conf.IoTDBStartCheck.PROPERTIES_FILE_NAME;
 
 public class DataNode extends ServerCommandLine implements DataNodeMBean {
@@ -193,6 +195,8 @@ public class DataNode extends ServerCommandLine implements 
DataNodeMBean {
   public static void main(String[] args) {
     logger.info("IoTDB-DataNode environment variables: {}", 
IoTDBConfig.getEnvironmentVariables());
     logger.info("IoTDB-DataNode default charset is: {}", 
Charset.defaultCharset().displayName());
+    // let IoTDB handle the exception instead of ratis
+    ExitUtils.disableSystemExit();
     DataNode dataNode = new DataNode();
     int returnCode = dataNode.run(args);
     if (returnCode != 0) {
@@ -202,6 +206,7 @@ public class DataNode extends ServerCommandLine implements 
DataNodeMBean {
 
   @Override
   protected void start() {
+    logger.info("Starting DataNode...");
     boolean isFirstStart;
     try {
       // Check if this DataNode is start for the first time and do other 
pre-checks
@@ -274,11 +279,13 @@ public class DataNode extends ServerCommandLine 
implements DataNodeMBean {
         dataRegionConsensusStarted = true;
       }
 
-    } catch (StartupException | IOException e) {
+    } catch (Throwable e) {
+      int exitStatusCode = retrieveExitStatusCode(e);
       logger.error("Fail to start server", e);
       stop();
-      System.exit(-1);
+      System.exit(exitStatusCode);
     }
+    logger.info("DataNode started");
   }
 
   @Override
@@ -683,7 +690,7 @@ public class DataNode extends ServerCommandLine implements 
DataNodeMBean {
       setUp();
     } catch (StartupException e) {
       logger.error("Meet error while starting up.", e);
-      throw new StartupException("Error in activating IoTDB DataNode.");
+      throw e;
     }
     logger.info("IoTDB DataNode has started.");
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
index 56bc67b6399..79fcc799ae9 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/ClientManager.java
@@ -88,6 +88,11 @@ public class ClientManager<K, V> implements 
IClientManager<K, V> {
             });
   }
 
+  @Override
+  public void clearAll() {
+    pool.clear();
+  }
+
   @Override
   public void close() {
     pool.close();
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
index 81344e46719..cba9b840740 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/client/IClientManager.java
@@ -44,6 +44,9 @@ public interface IClientManager<K, V> {
    */
   void clear(K node);
 
+  /** clear all clients; */
+  void clearAll();
+
   /** close IClientManager, which means closing all clients for all nodes. */
   void close();
 
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/PortOccupiedException.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/PortOccupiedException.java
new file mode 100644
index 00000000000..0e0453d267b
--- /dev/null
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/exception/PortOccupiedException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.commons.exception;
+
+import java.util.Arrays;
+
+public class PortOccupiedException extends RuntimeException{
+    public PortOccupiedException() {
+        super("Some ports are occupied");
+    }
+
+    public PortOccupiedException(int... ports) {
+        super(String.format("Ports %s are occupied", Arrays.toString(ports)));
+    }
+}
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
index d05fb8c9d7e..ac64699b9cf 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/utils/StatusUtils.java
@@ -25,6 +25,8 @@ import org.apache.iotdb.commons.conf.CommonDescriptor;
 import org.apache.iotdb.rpc.RpcUtils;
 import org.apache.iotdb.rpc.TSStatusCode;
 
+import org.apache.ratis.util.ExitUtils;
+
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Map;
@@ -248,4 +250,16 @@ public class StatusUtils {
   public static boolean isUnknownError(int statusCode) {
     return UNKNOWN_ERRORS.contains(statusCode);
   }
+
+  public static int retrieveExitStatusCode(Throwable e) {
+    if (e instanceof ExitUtils.ExitException && e.getCause() != null) {
+      e = e.getCause();
+    }
+    if (e.getMessage().contains("because Could not create ServerSocket")
+        || e.getMessage().contains("Failed to bind to address")
+        || e.getMessage().contains("Address already in use: bind")) {
+      return TSStatusCode.PORT_OCCUPIED.getStatusCode();
+    }
+    return -1;
+  }
 }


Reply via email to