This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch rc/1.3.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rc/1.3.3 by this push:
     new bc4d4c00e55 Pipe & IT : Refactor pipe / IT and improved IT behaviour 
(#13530)
bc4d4c00e55 is described below

commit bc4d4c00e55366fdcf4b5f50866a3aefee9f5ad9
Author: Caideyipi <[email protected]>
AuthorDate: Wed Sep 18 15:46:52 2024 +0800

    Pipe & IT : Refactor pipe / IT and improved IT behaviour (#13530)
---
 .../org/apache/iotdb/it/env/MultiEnvFactory.java   |  11 +-
 .../iotdb/it/env/cluster/ClusterConstant.java      |  10 -
 .../org/apache/iotdb/it/env/cluster/EnvUtils.java  |  74 +++---
 .../iotdb/it/env/cluster/env/AbstractEnv.java      | 284 +++++++++++----------
 .../iotdb/it/env/cluster/env/MultiClusterEnv.java  |   8 +-
 .../it/env/cluster/node/AbstractNodeWrapper.java   |   2 +-
 .../it/env/cluster/node/ConfigNodeWrapper.java     |  35 ++-
 .../iotdb/it/env/cluster/node/DataNodeWrapper.java |  32 +--
 .../apache/iotdb/it/framework/IoTDBTestRunner.java |  21 +-
 .../iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java     |   6 +
 .../PipeHistoricalDataRegionTsFileExtractor.java   |   1 -
 .../tools/schema/SchemaRegionSnapshotParser.java   |  68 +----
 .../org/apache/iotdb/db/utils/DateTimeUtils.java   |   2 +
 13 files changed, 251 insertions(+), 303 deletions(-)

diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/MultiEnvFactory.java 
b/integration-test/src/main/java/org/apache/iotdb/it/env/MultiEnvFactory.java
index f8c28567be1..5832f1c485b 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/MultiEnvFactory.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/MultiEnvFactory.java
@@ -38,24 +38,25 @@ public class MultiEnvFactory {
     // Empty constructor
   }
 
-  public static void setTestMethodName(String testMethodName) {
+  public static void setTestMethodName(final String testMethodName) {
     currentMethodName = testMethodName;
+    envList.forEach(baseEnv -> baseEnv.setTestMethodName(testMethodName));
   }
 
   /** Get an environment with the specific index. */
-  public static BaseEnv getEnv(int index) throws IndexOutOfBoundsException {
+  public static BaseEnv getEnv(final int index) throws 
IndexOutOfBoundsException {
     return envList.get(index);
   }
 
   /** Create several environments according to the specific number. */
-  public static void createEnv(int num) {
+  public static void createEnv(final int num) {
     // Not judge EnvType for individual test convenience
-    long startTime = System.currentTimeMillis();
+    final long startTime = System.currentTimeMillis();
     for (int i = 0; i < num; ++i) {
       try {
         Class.forName(Config.JDBC_DRIVER_NAME);
         envList.add(new MultiClusterEnv(startTime, i, currentMethodName));
-      } catch (ClassNotFoundException e) {
+      } catch (final ClassNotFoundException e) {
         logger.error("Create env error", e);
         System.exit(-1);
       }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
index b3802f3ea8d..147b57f2f67 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/ClusterConstant.java
@@ -115,9 +115,6 @@ public class ClusterConstant {
       "strongConsistencyClusterMode.dataRegionReplicaNumber";
 
   // Property file names
-  public static final String CONFIG_NODE_PROPERTIES_FILE = 
"iotdb-confignode.properties";
-  public static final String DATA_NODE_PROPERTIES_FILE = 
"iotdb-datanode.properties";
-  public static final String COMMON_PROPERTIES_FILE = 
"iotdb-common.properties";
   public static final String IOTDB_SYSTEM_PROPERTIES_FILE = 
"iotdb-system.properties";
 
   // Properties' keys
@@ -142,10 +139,7 @@ public class ClusterConstant {
   // ConfigNode
   public static final String CN_SYSTEM_DIR = "cn_system_dir";
   public static final String CN_CONSENSUS_DIR = "cn_consensus_dir";
-  public static final String CN_METRIC_PROMETHEUS_REPORTER_PORT =
-      "cn_metric_prometheus_reporter_port";
   public static final String CN_METRIC_IOTDB_REPORTER_HOST = 
"cn_metric_iotdb_reporter_host";
-  public static final String CN_METRIC_IOTDB_REPORTER_PORT = 
"cn_metric_iotdb_reporter_port";
 
   public static final String CN_CONNECTION_TIMEOUT_MS = 
"cn_connection_timeout_ms";
 
@@ -157,13 +151,10 @@ public class ClusterConstant {
   public static final String DN_TRACING_DIR = "dn_tracing_dir";
   public static final String DN_SYNC_DIR = "dn_sync_dir";
   public static final String DN_METRIC_IOTDB_REPORTER_HOST = 
"dn_metric_iotdb_reporter_host";
-  public static final String DN_METRIC_PROMETHEUS_REPORTER_PORT =
-      "dn_metric_prometheus_reporter_port";
 
   public static final String DN_MPP_DATA_EXCHANGE_PORT = 
"dn_mpp_data_exchange_port";
   public static final String DN_DATA_REGION_CONSENSUS_PORT = 
"dn_data_region_consensus_port";
   public static final String DN_SCHEMA_REGION_CONSENSUS_PORT = 
"dn_schema_region_consensus_port";
-  public static final String PIPE_AIR_GAP_RECEIVER_ENABLED = 
"pipe_air_gap_receiver_enabled";
   public static final String PIPE_AIR_GAP_RECEIVER_PORT = 
"pipe_air_gap_receiver_port";
   public static final String MAX_TSBLOCK_SIZE_IN_BYTES = 
"max_tsblock_size_in_bytes";
   public static final String PAGE_SIZE_IN_BYTE = "page_size_in_byte";
@@ -205,7 +196,6 @@ public class ClusterConstant {
 
   // Env Constant
   public static final int NODE_START_TIMEOUT = 100;
-  public static final int PROBE_TIMEOUT_MS = 2000;
   public static final int NODE_NETWORK_TIMEOUT_MS = 0;
   public static final String ZERO_TIME_ZONE = "GMT+0";
 
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java
index f3c9527e595..9663fa371e9 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/EnvUtils.java
@@ -70,22 +70,22 @@ public class EnvUtils {
     while (true) {
       int randomPortStart = 1000 + (int) (Math.random() * (1999 - 1000));
       randomPortStart = randomPortStart * (length + 1) + 1;
-      String lockFilePath = getLockFilePath(randomPortStart);
-      File lockFile = new File(lockFilePath);
+      final String lockFilePath = getLockFilePath(randomPortStart);
+      final File lockFile = new File(lockFilePath);
       try {
         // Lock the ports first to avoid to be occupied by other ForkedBooters 
during ports
         // available detecting
         if (!lockFile.createNewFile()) {
           continue;
         }
-        List<Integer> requiredPorts =
+        final List<Integer> requiredPorts =
             IntStream.rangeClosed(randomPortStart, randomPortStart + length)
                 .boxed()
                 .collect(Collectors.toList());
         if (checkPortsAvailable(requiredPorts)) {
           return requiredPorts.stream().mapToInt(Integer::intValue).toArray();
         }
-      } catch (IOException e) {
+      } catch (final IOException ignore) {
         // ignore
       }
       // Delete the lock file if the ports can't be used or some error happens
@@ -95,39 +95,35 @@ public class EnvUtils {
     }
   }
 
-  private static boolean checkPortsAvailable(List<Integer> ports) {
-    String cmd = getSearchAvailablePortCmd(ports);
+  private static boolean checkPortsAvailable(final List<Integer> ports) {
+    final String cmd = getSearchAvailablePortCmd(ports);
     try {
-      Process proc = Runtime.getRuntime().exec(cmd);
-      return proc.waitFor() == 1;
-    } catch (IOException e) {
+      return Runtime.getRuntime().exec(cmd).waitFor() == 1;
+    } catch (final IOException ignore) {
       // ignore
-    } catch (InterruptedException e) {
+    } catch (final InterruptedException e) {
       Thread.currentThread().interrupt();
     }
     return false;
   }
 
-  private static String getSearchAvailablePortCmd(List<Integer> ports) {
-    if (SystemUtils.IS_OS_WINDOWS) {
-      return getWindowsSearchPortCmd(ports);
-    }
-    return getUnixSearchPortCmd(ports);
+  private static String getSearchAvailablePortCmd(final List<Integer> ports) {
+    return SystemUtils.IS_OS_WINDOWS ? getWindowsSearchPortCmd(ports) : 
getUnixSearchPortCmd(ports);
   }
 
-  private static String getWindowsSearchPortCmd(List<Integer> ports) {
-    String cmd = "netstat -aon -p tcp | findStr ";
-    return cmd
+  private static String getWindowsSearchPortCmd(final List<Integer> ports) {
+    return "netstat -aon -p tcp | findStr "
         + ports.stream().map(v -> "/C:'127.0.0.1:" + v + 
"'").collect(Collectors.joining(" "));
   }
 
-  private static String getUnixSearchPortCmd(List<Integer> ports) {
-    String cmd = "lsof -iTCP -sTCP:LISTEN -P -n | awk '{print $9}' | grep -E ";
-    return cmd + 
ports.stream().map(String::valueOf).collect(Collectors.joining("|")) + "\"";
+  private static String getUnixSearchPortCmd(final List<Integer> ports) {
+    return "lsof -iTCP -sTCP:LISTEN -P -n | awk '{print $9}' | grep -E "
+        + ports.stream().map(String::valueOf).collect(Collectors.joining("|"))
+        + "\"";
   }
 
-  private static Pair<Integer, Integer> getClusterNodesNum(int index) {
-    String valueStr = System.getProperty(CLUSTER_CONFIGURATIONS);
+  private static Pair<Integer, Integer> getClusterNodesNum(final int index) {
+    final String valueStr = System.getProperty(CLUSTER_CONFIGURATIONS);
     if (valueStr == null) {
       return null;
     }
@@ -154,17 +150,17 @@ public class EnvUtils {
           // Print nothing to avoid polluting test outputs
           return null;
       }
-    } catch (NumberFormatException ignore) {
+    } catch (final NumberFormatException ignore) {
       return null;
     }
   }
 
-  public static String getLockFilePath(int port) {
+  public static String getLockFilePath(final int port) {
     return LOCK_FILE_PATH + port;
   }
 
   public static Pair<Integer, Integer> getNodeNum() {
-    Pair<Integer, Integer> nodesNum = getClusterNodesNum(0);
+    final Pair<Integer, Integer> nodesNum = getClusterNodesNum(0);
     if (nodesNum != null) {
       return nodesNum;
     }
@@ -173,8 +169,8 @@ public class EnvUtils {
         getIntFromSysVar(DEFAULT_DATA_NODE_NUM, 3, 0));
   }
 
-  public static Pair<Integer, Integer> getNodeNum(int index) {
-    Pair<Integer, Integer> nodesNum = getClusterNodesNum(index);
+  public static Pair<Integer, Integer> getNodeNum(final int index) {
+    final Pair<Integer, Integer> nodesNum = getClusterNodesNum(index);
     if (nodesNum != null) {
       return nodesNum;
     }
@@ -183,38 +179,38 @@ public class EnvUtils {
         getIntFromSysVar(DEFAULT_DATA_NODE_NUM, 3, index));
   }
 
-  public static String getFilePathFromSysVar(String key, int index) {
-    String valueStr = System.getProperty(key);
+  public static String getFilePathFromSysVar(final String key, final int 
index) {
+    final String valueStr = System.getProperty(key);
     if (valueStr == null) {
       return null;
     }
     return System.getProperty(USER_DIR) + getValueOfIndex(valueStr, index);
   }
 
-  public static int getIntFromSysVar(String key, int defaultValue, int index) {
-    String valueStr = System.getProperty(key);
+  public static int getIntFromSysVar(final String key, final int defaultValue, 
final int index) {
+    final String valueStr = System.getProperty(key);
     if (valueStr == null) {
       return defaultValue;
     }
 
-    String value = getValueOfIndex(valueStr, index);
+    final String value = getValueOfIndex(valueStr, index);
     try {
       return Integer.parseInt(value);
-    } catch (NumberFormatException e) {
+    } catch (final NumberFormatException e) {
       throw new IllegalArgumentException("Invalid property value: " + value + 
" of key " + key);
     }
   }
 
-  public static String getValueOfIndex(String valueStr, int index) {
-    String[] values = valueStr.split(DELIMITER);
+  public static String getValueOfIndex(final String valueStr, final int index) 
{
+    final String[] values = valueStr.split(DELIMITER);
     return index <= values.length - 1 ? values[index] : values[values.length - 
1];
   }
 
-  public static String getTimeForLogDirectory(long startTime) {
+  public static String getTimeForLogDirectory(final long startTime) {
     return convertLongToDate(startTime, "ms").replace(":", 
DIR_TIME_REPLACEMENT);
   }
 
-  public static String fromConsensusFullNameToAbbr(String consensus) {
+  public static String fromConsensusFullNameToAbbr(final String consensus) {
     switch (consensus) {
       case SIMPLE_CONSENSUS:
         return SIMPLE_CONSENSUS_STR;
@@ -233,7 +229,7 @@ public class EnvUtils {
     }
   }
 
-  public static String fromConsensusAbbrToFullName(String consensus) {
+  public static String fromConsensusAbbrToFullName(final String consensus) {
     switch (consensus) {
       case SIMPLE_CONSENSUS_STR:
         return SIMPLE_CONSENSUS;
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
index 7deb7620974..92cb39765c6 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java
@@ -19,9 +19,7 @@
 
 package org.apache.iotdb.it.env.cluster.env;
 
-import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
-import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.commons.client.ClientPoolFactory;
 import org.apache.iotdb.commons.client.IClientManager;
@@ -103,7 +101,7 @@ public abstract class AbstractEnv implements BaseEnv {
   }
 
   // For multiple environment ITs, time must be consistent across environments.
-  protected AbstractEnv(long startTime) {
+  protected AbstractEnv(final long startTime) {
     this.startTime = startTime;
     this.clusterConfig = new MppClusterConfig();
   }
@@ -115,10 +113,10 @@ public abstract class AbstractEnv implements BaseEnv {
 
   @Override
   public List<String> getMetricPrometheusReporterContents() {
-    List<String> result = new ArrayList<>();
+    final List<String> result = new ArrayList<>();
     // get all report content of confignodes
-    for (ConfigNodeWrapper configNode : this.configNodeWrapperList) {
-      String configNodeMetricContent =
+    for (final ConfigNodeWrapper configNode : this.configNodeWrapperList) {
+      final String configNodeMetricContent =
           getUrlContent(
               Config.IOTDB_HTTP_URL_PREFIX
                   + configNode.getIp()
@@ -128,8 +126,8 @@ public abstract class AbstractEnv implements BaseEnv {
       result.add(configNodeMetricContent);
     }
     // get all report content of datanodes
-    for (DataNodeWrapper dataNode : this.dataNodeWrapperList) {
-      String dataNodeMetricContent =
+    for (final DataNodeWrapper dataNode : this.dataNodeWrapperList) {
+      final String dataNodeMetricContent =
           getUrlContent(
               Config.IOTDB_HTTP_URL_PREFIX
                   + dataNode.getIp()
@@ -141,16 +139,20 @@ public abstract class AbstractEnv implements BaseEnv {
     return result;
   }
 
-  protected void initEnvironment(int configNodesNum, int dataNodesNum) {
+  protected void initEnvironment(final int configNodesNum, final int 
dataNodesNum) {
     initEnvironment(configNodesNum, dataNodesNum, retryCount);
   }
 
-  protected void initEnvironment(int configNodesNum, int dataNodesNum, int 
testWorkingRetryCount) {
+  protected void initEnvironment(
+      final int configNodesNum, final int dataNodesNum, final int 
testWorkingRetryCount) {
     initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount, 
false);
   }
 
   protected void initEnvironment(
-      int configNodesNum, int dataNodesNum, int retryCount, boolean addAINode) 
{
+      final int configNodesNum,
+      final int dataNodesNum,
+      final int retryCount,
+      final boolean addAINode) {
     this.retryCount = retryCount;
     this.configNodeWrapperList = new ArrayList<>();
     this.dataNodeWrapperList = new ArrayList<>();
@@ -161,7 +163,7 @@ public abstract class AbstractEnv implements BaseEnv {
 
     final String testClassName = getTestClassName();
 
-    ConfigNodeWrapper seedConfigNodeWrapper =
+    final ConfigNodeWrapper seedConfigNodeWrapper =
         new ConfigNodeWrapper(
             true,
             "",
@@ -179,22 +181,23 @@ public abstract class AbstractEnv implements BaseEnv {
     seedConfigNodeWrapper.createLogDir();
     seedConfigNodeWrapper.setKillPoints(configNodeKillPoints);
     seedConfigNodeWrapper.start();
-    String seedConfigNode = seedConfigNodeWrapper.getIpAndPortString();
+    final String seedConfigNode = seedConfigNodeWrapper.getIpAndPortString();
     this.configNodeWrapperList.add(seedConfigNodeWrapper);
 
     // Check if the Seed-ConfigNode started successfully
-    try (SyncConfigNodeIServiceClient ignored =
+    try (final SyncConfigNodeIServiceClient ignored =
         (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) {
       // Do nothing
       logger.info("The Seed-ConfigNode started successfully!");
-    } catch (Exception e) {
+    } catch (final Exception e) {
       logger.error("Failed to get connection to the Seed-ConfigNode", e);
     }
 
-    List<String> configNodeEndpoints = new ArrayList<>();
-    RequestDelegate<Void> configNodesDelegate = new 
SerialRequestDelegate<>(configNodeEndpoints);
+    final List<String> configNodeEndpoints = new ArrayList<>();
+    final RequestDelegate<Void> configNodesDelegate =
+        new SerialRequestDelegate<>(configNodeEndpoints);
     for (int i = 1; i < configNodesNum; i++) {
-      ConfigNodeWrapper configNodeWrapper =
+      final ConfigNodeWrapper configNodeWrapper =
           new ConfigNodeWrapper(
               false,
               seedConfigNode,
@@ -221,16 +224,16 @@ public abstract class AbstractEnv implements BaseEnv {
     }
     try {
       configNodesDelegate.requestAll();
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       logger.error("Start configNodes failed", e);
       throw new AssertionError();
     }
 
-    List<String> dataNodeEndpoints = new ArrayList<>();
-    RequestDelegate<Void> dataNodesDelegate =
+    final List<String> dataNodeEndpoints = new ArrayList<>();
+    final RequestDelegate<Void> dataNodesDelegate =
         new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT);
     for (int i = 0; i < dataNodesNum; i++) {
-      DataNodeWrapper dataNodeWrapper =
+      final DataNodeWrapper dataNodeWrapper =
           new DataNodeWrapper(
               seedConfigNode,
               testClassName,
@@ -257,7 +260,7 @@ public abstract class AbstractEnv implements BaseEnv {
 
     try {
       dataNodesDelegate.requestAll();
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       logger.error("Start dataNodes failed", e);
       throw new AssertionError();
     }
@@ -270,9 +273,9 @@ public abstract class AbstractEnv implements BaseEnv {
     checkClusterStatusWithoutUnknown();
   }
 
-  private void startAINode(String seedConfigNode, String testClassName) {
-    String aiNodeEndPoint;
-    AINodeWrapper aiNodeWrapper =
+  private void startAINode(final String seedConfigNode, final String 
testClassName) {
+    final String aiNodeEndPoint;
+    final AINodeWrapper aiNodeWrapper =
         new AINodeWrapper(
             seedConfigNode,
             testClassName,
@@ -284,29 +287,29 @@ public abstract class AbstractEnv implements BaseEnv {
     aiNodeEndPoint = aiNodeWrapper.getIpAndPortString();
     aiNodeWrapper.createNodeDir();
     aiNodeWrapper.createLogDir();
-    RequestDelegate<Void> AINodesDelegate =
+    final RequestDelegate<Void> aiNodesDelegate =
         new ParallelRequestDelegate<>(
             Collections.singletonList(aiNodeEndPoint), NODE_START_TIMEOUT);
 
-    AINodesDelegate.addRequest(
+    aiNodesDelegate.addRequest(
         () -> {
           aiNodeWrapper.start();
           return null;
         });
 
     try {
-      AINodesDelegate.requestAll();
-    } catch (SQLException e) {
+      aiNodesDelegate.requestAll();
+    } catch (final SQLException e) {
       logger.error("Start aiNodes failed", e);
     }
   }
 
   public String getTestClassName() {
-    StackTraceElement[] stack = Thread.currentThread().getStackTrace();
-    for (StackTraceElement stackTraceElement : stack) {
-      String className = stackTraceElement.getClassName();
+    final StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+    for (final StackTraceElement stackTraceElement : stack) {
+      final String className = stackTraceElement.getClassName();
       if (className.endsWith("IT")) {
-        String result = className.substring(className.lastIndexOf(".") + 1);
+        final String result = className.substring(className.lastIndexOf(".") + 
1);
         if (!result.startsWith("Abstract")) {
           return result;
         }
@@ -315,8 +318,8 @@ public abstract class AbstractEnv implements BaseEnv {
     return "UNKNOWN-IT";
   }
 
-  private Map<String, Integer> countNodeStatus(Map<Integer, String> 
nodeStatus) {
-    Map<String, Integer> result = new HashMap<>();
+  private Map<String, Integer> countNodeStatus(final Map<Integer, String> 
nodeStatus) {
+    final Map<String, Integer> result = new HashMap<>();
     nodeStatus.values().forEach(status -> result.put(status, 
result.getOrDefault(status, 0) + 1));
     return result;
   }
@@ -343,13 +346,13 @@ public abstract class AbstractEnv implements BaseEnv {
    *
    * @param statusCheck the predicate to test the status of nodes
    */
-  public void checkClusterStatus(Predicate<Map<Integer, String>> statusCheck) {
+  public void checkClusterStatus(final Predicate<Map<Integer, String>> 
statusCheck) {
     logger.info("Testing cluster environment...");
     TShowClusterResp showClusterResp;
     Exception lastException = null;
     boolean flag;
     for (int i = 0; i < retryCount; i++) {
-      try (SyncConfigNodeIServiceClient client =
+      try (final SyncConfigNodeIServiceClient client =
           (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) {
         flag = true;
         showClusterResp = client.showCluster();
@@ -369,20 +372,19 @@ public abstract class AbstractEnv implements BaseEnv {
 
         // Check the status of nodes
         if (flag) {
-          Map<Integer, String> nodeStatus = showClusterResp.getNodeStatus();
-          flag = statusCheck.test(nodeStatus);
+          flag = statusCheck.test(showClusterResp.getNodeStatus());
         }
 
         if (flag) {
           logger.info("The cluster is now ready for testing!");
           return;
         }
-      } catch (Exception e) {
+      } catch (final Exception e) {
         lastException = e;
       }
       try {
         TimeUnit.SECONDS.sleep(1L);
-      } catch (InterruptedException e) {
+      } catch (final InterruptedException e) {
         lastException = e;
         Thread.currentThread().interrupt();
       }
@@ -399,7 +401,7 @@ public abstract class AbstractEnv implements BaseEnv {
 
   @Override
   public void cleanClusterEnvironment() {
-    List<AbstractNodeWrapper> allNodeWrappers =
+    final List<AbstractNodeWrapper> allNodeWrappers =
         Stream.concat(
                 dataNodeWrapperList.stream(),
                 Stream.concat(configNodeWrapperList.stream(), 
aiNodeWrapperList.stream()))
@@ -408,10 +410,10 @@ public abstract class AbstractEnv implements BaseEnv {
         .findAny()
         .ifPresent(
             nodeWrapper -> logger.info("You can find logs at {}", 
nodeWrapper.getLogDirPath()));
-    for (AbstractNodeWrapper nodeWrapper : allNodeWrappers) {
+    for (final AbstractNodeWrapper nodeWrapper : allNodeWrappers) {
       nodeWrapper.stopForcibly();
       nodeWrapper.destroyDir();
-      String lockPath = EnvUtils.getLockFilePath(nodeWrapper.getPort());
+      final String lockPath = EnvUtils.getLockFilePath(nodeWrapper.getPort());
       if (!new File(lockPath).delete()) {
         logger.error("Delete lock file {} failed", lockPath);
       }
@@ -431,7 +433,8 @@ public abstract class AbstractEnv implements BaseEnv {
 
   @Override
   public Connection getWriteOnlyConnectionWithSpecifiedDataNode(
-      DataNodeWrapper dataNode, String username, String password) throws 
SQLException {
+      final DataNodeWrapper dataNode, final String username, final String 
password)
+      throws SQLException {
     return new ClusterTestConnection(
         getWriteConnectionWithSpecifiedDataNode(dataNode, null, username, 
password),
         Collections.emptyList());
@@ -439,7 +442,8 @@ public abstract class AbstractEnv implements BaseEnv {
 
   @Override
   public Connection getConnectionWithSpecifiedDataNode(
-      DataNodeWrapper dataNode, String username, String password) throws 
SQLException {
+      final DataNodeWrapper dataNode, final String username, final String 
password)
+      throws SQLException {
     return new ClusterTestConnection(
         getWriteConnectionWithSpecifiedDataNode(dataNode, null, username, 
password),
         getReadConnections(null, username, password));
@@ -469,7 +473,7 @@ public abstract class AbstractEnv implements BaseEnv {
   @Override
   public ISession getSessionConnection(String userName, String password)
       throws IoTDBConnectionException {
-    DataNodeWrapper dataNode =
+    final DataNodeWrapper dataNode =
         
this.dataNodeWrapperList.get(rand.nextInt(this.dataNodeWrapperList.size()));
     Session session = new Session(dataNode.getIp(), dataNode.getPort(), 
userName, password);
     session.open();
@@ -523,8 +527,8 @@ public abstract class AbstractEnv implements BaseEnv {
   protected NodeConnection getWriteConnectionWithSpecifiedDataNode(
       DataNodeWrapper dataNode, Constant.Version version, String username, 
String password)
       throws SQLException {
-    String endpoint = dataNode.getIp() + ":" + dataNode.getPort();
-    Connection writeConnection =
+    final String endpoint = dataNode.getIp() + ":" + dataNode.getPort();
+    final Connection writeConnection =
         DriverManager.getConnection(
             Config.IOTDB_URL_PREFIX
                 + endpoint
@@ -544,10 +548,10 @@ public abstract class AbstractEnv implements BaseEnv {
       String username,
       String password)
       throws SQLException {
-    List<DataNodeWrapper> dataNodeWrapperListCopy = new 
ArrayList<>(dataNodeList);
+    final List<DataNodeWrapper> dataNodeWrapperListCopy = new 
ArrayList<>(dataNodeList);
     Collections.shuffle(dataNodeWrapperListCopy);
     SQLException lastException = null;
-    for (DataNodeWrapper dataNode : dataNodeWrapperListCopy) {
+    for (final DataNodeWrapper dataNode : dataNodeWrapperListCopy) {
       try {
         return getWriteConnectionWithSpecifiedDataNode(dataNode, version, 
username, password);
       } catch (SQLException e) {
@@ -593,19 +597,19 @@ public abstract class AbstractEnv implements BaseEnv {
   // AssertionError.
   protected void testJDBCConnection() {
     logger.info("Testing JDBC connection...");
-    List<String> endpoints =
+    final List<String> endpoints =
         dataNodeWrapperList.stream()
             .map(DataNodeWrapper::getIpAndPortString)
             .collect(Collectors.toList());
-    RequestDelegate<Void> testDelegate =
+    final RequestDelegate<Void> testDelegate =
         new ParallelRequestDelegate<>(endpoints, NODE_START_TIMEOUT);
-    for (DataNodeWrapper dataNode : dataNodeWrapperList) {
+    for (final DataNodeWrapper dataNode : dataNodeWrapperList) {
       final String dataNodeEndpoint = dataNode.getIpAndPortString();
       testDelegate.addRequest(
           () -> {
             Exception lastException = null;
             for (int i = 0; i < retryCount; i++) {
-              try (IoTDBConnection ignored =
+              try (final IoTDBConnection ignored =
                   (IoTDBConnection)
                       DriverManager.getConnection(
                           Config.IOTDB_URL_PREFIX
@@ -615,7 +619,7 @@ public abstract class AbstractEnv implements BaseEnv {
                           System.getProperty("Password", "root"))) {
                 logger.info("Successfully connecting to DataNode: {}.", 
dataNodeEndpoint);
                 return null;
-              } catch (Exception e) {
+              } catch (final Exception e) {
                 lastException = e;
                 TimeUnit.SECONDS.sleep(1L);
               }
@@ -628,15 +632,16 @@ public abstract class AbstractEnv implements BaseEnv {
     }
     try {
       testDelegate.requestAll();
-    } catch (Exception e) {
+    } catch (final Exception e) {
       logger.error("exception in test Cluster with RPC, message: {}", 
e.getMessage(), e);
       throw new AssertionError(
           String.format("After %d times retry, the cluster can't work!", 
retryCount));
     }
   }
 
-  private String getParam(Constant.Version version, int timeout, String 
timeZone) {
-    StringBuilder sb = new StringBuilder("?");
+  private String getParam(
+      final Constant.Version version, final int timeout, final String 
timeZone) {
+    final StringBuilder sb = new StringBuilder("?");
     sb.append(Config.NETWORK_TIMEOUT).append("=").append(timeout);
     if (version != null) {
       sb.append("&").append(VERSION).append("=").append(version);
@@ -652,23 +657,20 @@ public abstract class AbstractEnv implements BaseEnv {
   }
 
   @Override
-  public void setTestMethodName(String testMethodName) {
+  public void setTestMethodName(final String testMethodName) {
     this.testMethodName = testMethodName;
   }
 
   @Override
   public void dumpTestJVMSnapshot() {
-    for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
-      configNodeWrapper.executeJstack(testMethodName);
-    }
-    for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) {
-      dataNodeWrapper.executeJstack(testMethodName);
-    }
+    configNodeWrapperList.forEach(
+        configNodeWrapper -> configNodeWrapper.executeJstack(testMethodName));
+    dataNodeWrapperList.forEach(dataNodeWrapper -> 
dataNodeWrapper.executeJstack(testMethodName));
   }
 
   @Override
   public List<AbstractNodeWrapper> getNodeWrapperList() {
-    List<AbstractNodeWrapper> result = new ArrayList<>(configNodeWrapperList);
+    final List<AbstractNodeWrapper> result = new 
ArrayList<>(configNodeWrapperList);
     result.addAll(dataNodeWrapperList);
     return result;
   }
@@ -697,13 +699,13 @@ public abstract class AbstractEnv implements BaseEnv {
     Exception lastException = null;
     ConfigNodeWrapper lastErrorNode = null;
     for (int i = 0; i < retryCount; i++) {
-      for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
+      for (final ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
         try {
           lastErrorNode = configNodeWrapper;
-          SyncConfigNodeIServiceClient client =
+          final SyncConfigNodeIServiceClient client =
               clientManager.borrowClient(
                   new TEndPoint(configNodeWrapper.getIp(), 
configNodeWrapper.getPort()));
-          TShowClusterResp resp = client.showCluster();
+          final TShowClusterResp resp = client.showCluster();
 
           if (resp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
             // Only the ConfigNodeClient who connects to the ConfigNode-leader
@@ -718,7 +720,7 @@ public abstract class AbstractEnv implements BaseEnv {
                     + " message: "
                     + resp.getStatus().getMessage());
           }
-        } catch (Exception e) {
+        } catch (final Exception e) {
           lastException = e;
         }
 
@@ -739,12 +741,12 @@ public abstract class AbstractEnv implements BaseEnv {
   @Override
   public IConfigNodeRPCService.Iface getConfigNodeConnection(int index) throws 
Exception {
     Exception lastException = null;
-    ConfigNodeWrapper configNodeWrapper = configNodeWrapperList.get(index);
+    final ConfigNodeWrapper configNodeWrapper = 
configNodeWrapperList.get(index);
     for (int i = 0; i < 30; i++) {
       try {
         return clientManager.borrowClient(
             new TEndPoint(configNodeWrapper.getIp(), 
configNodeWrapper.getPort()));
-      } catch (Exception e) {
+      } catch (final Exception e) {
         lastException = e;
       }
       // Sleep 1s before next retry
@@ -760,9 +762,9 @@ public abstract class AbstractEnv implements BaseEnv {
     ConfigNodeWrapper lastErrorNode = null;
     for (int retry = 0; retry < 30; retry++) {
       for (int configNodeId = 0; configNodeId < configNodeWrapperList.size(); 
configNodeId++) {
-        ConfigNodeWrapper configNodeWrapper = 
configNodeWrapperList.get(configNodeId);
+        final ConfigNodeWrapper configNodeWrapper = 
configNodeWrapperList.get(configNodeId);
         lastErrorNode = configNodeWrapper;
-        try (SyncConfigNodeIServiceClient client =
+        try (final SyncConfigNodeIServiceClient client =
             clientManager.borrowClient(
                 new TEndPoint(configNodeWrapper.getIp(), 
configNodeWrapper.getPort()))) {
           TShowRegionResp resp =
@@ -775,12 +777,12 @@ public abstract class AbstractEnv implements BaseEnv {
           int port;
 
           if (resp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-            for (TRegionInfo tRegionInfo : resp.getRegionInfoList()) {
+            for (final TRegionInfo tRegionInfo : resp.getRegionInfoList()) {
               if (tRegionInfo.getRoleType().equals("Leader")) {
                 ip = tRegionInfo.getClientRpcIp();
                 port = tRegionInfo.getClientRpcPort();
                 for (int dataNodeId = 0; dataNodeId < 
dataNodeWrapperList.size(); ++dataNodeId) {
-                  DataNodeWrapper dataNodeWrapper = 
dataNodeWrapperList.get(dataNodeId);
+                  final DataNodeWrapper dataNodeWrapper = 
dataNodeWrapperList.get(dataNodeId);
                   if (dataNodeWrapper.getIp().equals(ip) && 
dataNodeWrapper.getPort() == port) {
                     return dataNodeId;
                   }
@@ -796,7 +798,7 @@ public abstract class AbstractEnv implements BaseEnv {
                     + " message: "
                     + resp.getStatus().getMessage());
           }
-        } catch (Exception e) {
+        } catch (final Exception e) {
           lastException = e;
         }
 
@@ -820,12 +822,12 @@ public abstract class AbstractEnv implements BaseEnv {
     ConfigNodeWrapper lastErrorNode = null;
     for (int retry = 0; retry < retryCount; retry++) {
       for (int configNodeId = 0; configNodeId < configNodeWrapperList.size(); 
configNodeId++) {
-        ConfigNodeWrapper configNodeWrapper = 
configNodeWrapperList.get(configNodeId);
+        final ConfigNodeWrapper configNodeWrapper = 
configNodeWrapperList.get(configNodeId);
         lastErrorNode = configNodeWrapper;
-        try (SyncConfigNodeIServiceClient client =
+        try (final SyncConfigNodeIServiceClient client =
             clientManager.borrowClient(
                 new TEndPoint(configNodeWrapper.getIp(), 
configNodeWrapper.getPort()))) {
-          TShowClusterResp resp = client.showCluster();
+          final TShowClusterResp resp = client.showCluster();
           // Only the ConfigNodeClient who connects to the ConfigNode-leader
           // will respond the SUCCESS_STATUS
           if (resp.getStatus().getCode() == 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
@@ -837,7 +839,7 @@ public abstract class AbstractEnv implements BaseEnv {
                     + " message: "
                     + resp.getStatus().getMessage());
           }
-        } catch (Exception e) {
+        } catch (final Exception e) {
           lastException = e;
         }
 
@@ -857,15 +859,13 @@ public abstract class AbstractEnv implements BaseEnv {
   }
 
   @Override
-  public void startConfigNode(int index) {
+  public void startConfigNode(final int index) {
     configNodeWrapperList.get(index).start();
   }
 
   @Override
   public void startAllConfigNodes() {
-    for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
-      configNodeWrapper.start();
-    }
+    configNodeWrapperList.forEach(AbstractNodeWrapper::start);
   }
 
   @Override
@@ -875,24 +875,22 @@ public abstract class AbstractEnv implements BaseEnv {
 
   @Override
   public void shutdownAllConfigNodes() {
-    for (ConfigNodeWrapper configNodeWrapper : configNodeWrapperList) {
-      configNodeWrapper.stop();
-    }
+    configNodeWrapperList.forEach(AbstractNodeWrapper::stop);
   }
 
   @Override
-  public ConfigNodeWrapper getConfigNodeWrapper(int index) {
+  public ConfigNodeWrapper getConfigNodeWrapper(final int index) {
     return configNodeWrapperList.get(index);
   }
 
   @Override
-  public DataNodeWrapper getDataNodeWrapper(int index) {
+  public DataNodeWrapper getDataNodeWrapper(final int index) {
     return dataNodeWrapperList.get(index);
   }
 
   @Override
   public ConfigNodeWrapper generateRandomConfigNodeWrapper() {
-    ConfigNodeWrapper newConfigNodeWrapper =
+    final ConfigNodeWrapper newConfigNodeWrapper =
         new ConfigNodeWrapper(
             false,
             configNodeWrapperList.get(0).getIpAndPortString(),
@@ -914,7 +912,7 @@ public abstract class AbstractEnv implements BaseEnv {
 
   @Override
   public DataNodeWrapper generateRandomDataNodeWrapper() {
-    DataNodeWrapper newDataNodeWrapper =
+    final DataNodeWrapper newDataNodeWrapper =
         new DataNodeWrapper(
             configNodeWrapperList.get(0).getIpAndPortString(),
             getTestClassName(),
@@ -934,19 +932,20 @@ public abstract class AbstractEnv implements BaseEnv {
   }
 
   @Override
-  public void registerNewDataNode(boolean isNeedVerify) {
+  public void registerNewDataNode(final boolean isNeedVerify) {
     registerNewDataNode(generateRandomDataNodeWrapper(), isNeedVerify);
   }
 
   @Override
-  public void registerNewConfigNode(boolean isNeedVerify) {
+  public void registerNewConfigNode(final boolean isNeedVerify) {
     registerNewConfigNode(generateRandomConfigNodeWrapper(), isNeedVerify);
   }
 
   @Override
-  public void registerNewConfigNode(ConfigNodeWrapper newConfigNodeWrapper, 
boolean isNeedVerify) {
+  public void registerNewConfigNode(
+      final ConfigNodeWrapper newConfigNodeWrapper, final boolean 
isNeedVerify) {
     // Start new ConfigNode
-    RequestDelegate<Void> configNodeDelegate =
+    final RequestDelegate<Void> configNodeDelegate =
         new ParallelRequestDelegate<>(
             
Collections.singletonList(newConfigNodeWrapper.getIpAndPortString()),
             NODE_START_TIMEOUT);
@@ -958,7 +957,7 @@ public abstract class AbstractEnv implements BaseEnv {
 
     try {
       configNodeDelegate.requestAll();
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       logger.error("Start configNode failed", e);
       throw new AssertionError();
     }
@@ -970,11 +969,12 @@ public abstract class AbstractEnv implements BaseEnv {
   }
 
   @Override
-  public void registerNewDataNode(DataNodeWrapper newDataNodeWrapper, boolean 
isNeedVerify) {
+  public void registerNewDataNode(
+      final DataNodeWrapper newDataNodeWrapper, final boolean isNeedVerify) {
     // Start new DataNode
-    List<String> dataNodeEndpoints =
+    final List<String> dataNodeEndpoints =
         Collections.singletonList(newDataNodeWrapper.getIpAndPortString());
-    RequestDelegate<Void> dataNodesDelegate =
+    final RequestDelegate<Void> dataNodesDelegate =
         new ParallelRequestDelegate<>(dataNodeEndpoints, NODE_START_TIMEOUT);
     dataNodesDelegate.addRequest(
         () -> {
@@ -983,7 +983,7 @@ public abstract class AbstractEnv implements BaseEnv {
         });
     try {
       dataNodesDelegate.requestAll();
-    } catch (SQLException e) {
+    } catch (final SQLException e) {
       logger.error("Start dataNodes failed", e);
       throw new AssertionError();
     }
@@ -995,58 +995,63 @@ public abstract class AbstractEnv implements BaseEnv {
   }
 
   @Override
-  public void startDataNode(int index) {
+  public void startDataNode(final int index) {
     dataNodeWrapperList.get(index).start();
   }
 
   @Override
   public void startAllDataNodes() {
-    for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) {
-      dataNodeWrapper.start();
-    }
+    dataNodeWrapperList.forEach(AbstractNodeWrapper::start);
   }
 
   @Override
-  public void shutdownDataNode(int index) {
+  public void shutdownDataNode(final int index) {
     dataNodeWrapperList.get(index).stop();
   }
 
   @Override
   public void shutdownAllDataNodes() {
-    for (DataNodeWrapper dataNodeWrapper : dataNodeWrapperList) {
-      dataNodeWrapper.stop();
-    }
+    dataNodeWrapperList.forEach(AbstractNodeWrapper::stop);
   }
 
   @Override
-  public void ensureNodeStatus(List<BaseNodeWrapper> nodes, List<NodeStatus> 
targetStatus)
+  public void ensureNodeStatus(
+      final List<BaseNodeWrapper> nodes, final List<NodeStatus> targetStatus)
       throws IllegalStateException {
     Throwable lastException = null;
     for (int i = 0; i < retryCount; i++) {
-      try (SyncConfigNodeIServiceClient client =
+      try (final SyncConfigNodeIServiceClient client =
           (SyncConfigNodeIServiceClient) 
EnvFactory.getEnv().getLeaderConfigNodeConnection()) {
-        List<String> errorMessages = new ArrayList<>(nodes.size());
-        Map<String, Integer> nodeIds = new HashMap<>(nodes.size());
-        TShowClusterResp showClusterResp = client.showCluster();
-        for (TConfigNodeLocation node : showClusterResp.getConfigNodeList()) {
-          nodeIds.put(
-              node.getInternalEndPoint().getIp() + ":" + 
node.getInternalEndPoint().getPort(),
-              node.getConfigNodeId());
-        }
-        for (TDataNodeLocation node : showClusterResp.getDataNodeList()) {
-          nodeIds.put(
-              node.getClientRpcEndPoint().getIp() + ":" + 
node.getClientRpcEndPoint().getPort(),
-              node.getDataNodeId());
-        }
+        final List<String> errorMessages = new ArrayList<>(nodes.size());
+        final Map<String, Integer> nodeIds = new HashMap<>(nodes.size());
+        final TShowClusterResp showClusterResp = client.showCluster();
+        showClusterResp
+            .getConfigNodeList()
+            .forEach(
+                node ->
+                    nodeIds.put(
+                        node.getInternalEndPoint().getIp()
+                            + ":"
+                            + node.getInternalEndPoint().getPort(),
+                        node.getConfigNodeId()));
+        showClusterResp
+            .getDataNodeList()
+            .forEach(
+                node ->
+                    nodeIds.put(
+                        node.getClientRpcEndPoint().getIp()
+                            + ":"
+                            + node.getClientRpcEndPoint().getPort(),
+                        node.getDataNodeId()));
         for (int j = 0; j < nodes.size(); j++) {
-          String endpoint = nodes.get(j).getIpAndPortString();
+          final String endpoint = nodes.get(j).getIpAndPortString();
           if (!nodeIds.containsKey(endpoint)) {
             // Node not exist
             // Notice: Never modify this line, since the NodeLocation might be 
modified in IT
             errorMessages.add("The node " + nodes.get(j).getIpAndPortString() 
+ " is not found!");
             continue;
           }
-          String status = 
showClusterResp.getNodeStatus().get(nodeIds.get(endpoint));
+          final String status = 
showClusterResp.getNodeStatus().get(nodeIds.get(endpoint));
           if (!targetStatus.get(j).getStatus().equals(status)) {
             // Error status
             errorMessages.add(
@@ -1060,12 +1065,12 @@ public abstract class AbstractEnv implements BaseEnv {
         } else {
           lastException = new IllegalStateException(String.join(". ", 
errorMessages));
         }
-      } catch (TException | ClientManagerException | IOException | 
InterruptedException e) {
+      } catch (final TException | ClientManagerException | IOException | 
InterruptedException e) {
         lastException = e;
       }
       try {
         TimeUnit.SECONDS.sleep(1);
-      } catch (InterruptedException e) {
+      } catch (final InterruptedException e) {
         throw new RuntimeException(e);
       }
     }
@@ -1074,8 +1079,9 @@ public abstract class AbstractEnv implements BaseEnv {
 
   @Override
   public int getMqttPort() {
-    int randomIndex = new 
Random(System.currentTimeMillis()).nextInt(dataNodeWrapperList.size());
-    return dataNodeWrapperList.get(randomIndex).getMqttPort();
+    return dataNodeWrapperList
+        .get(new 
Random(System.currentTimeMillis()).nextInt(dataNodeWrapperList.size()))
+        .getMqttPort();
   }
 
   @Override
@@ -1104,11 +1110,11 @@ public abstract class AbstractEnv implements BaseEnv {
   }
 
   @Override
-  public Optional<DataNodeWrapper> dataNodeIdToWrapper(int nodeId) {
-    try (SyncConfigNodeIServiceClient leaderClient =
+  public Optional<DataNodeWrapper> dataNodeIdToWrapper(final int nodeId) {
+    try (final SyncConfigNodeIServiceClient leaderClient =
         (SyncConfigNodeIServiceClient) getLeaderConfigNodeConnection()) {
-      TShowDataNodesResp resp = leaderClient.showDataNodes();
-      for (TDataNodeInfo dataNodeInfo : resp.getDataNodesInfoList()) {
+      final TShowDataNodesResp resp = leaderClient.showDataNodes();
+      for (final TDataNodeInfo dataNodeInfo : resp.getDataNodesInfoList()) {
         if (dataNodeInfo.getDataNodeId() == nodeId) {
           return dataNodeWrapperList.stream()
               .filter(dataNodeWrapper -> dataNodeWrapper.getPort() == 
dataNodeInfo.getRpcPort())
@@ -1116,18 +1122,18 @@ public abstract class AbstractEnv implements BaseEnv {
         }
       }
       return Optional.empty();
-    } catch (Exception e) {
+    } catch (final Exception e) {
       return Optional.empty();
     }
   }
 
   @Override
-  public void registerConfigNodeKillPoints(List<String> killPoints) {
+  public void registerConfigNodeKillPoints(final List<String> killPoints) {
     this.configNodeKillPoints = killPoints;
   }
 
   @Override
-  public void registerDataNodeKillPoints(List<String> killPoints) {
+  public void registerDataNodeKillPoints(final List<String> killPoints) {
     this.dataNodeKillPoints = killPoints;
   }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MultiClusterEnv.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MultiClusterEnv.java
index 1eb05cb315f..d462b5a668b 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MultiClusterEnv.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/MultiClusterEnv.java
@@ -25,7 +25,7 @@ import org.apache.tsfile.utils.Pair;
 
 public class MultiClusterEnv extends AbstractEnv {
 
-  public MultiClusterEnv(long startTime, int index, String currentMethodName) {
+  public MultiClusterEnv(final long startTime, final int index, final String 
currentMethodName) {
     super(startTime);
     this.index = index;
     this.testMethodName = currentMethodName;
@@ -33,18 +33,18 @@ public class MultiClusterEnv extends AbstractEnv {
 
   @Override
   public void initClusterEnvironment() {
-    Pair<Integer, Integer> nodeNum = EnvUtils.getNodeNum(index);
+    final Pair<Integer, Integer> nodeNum = EnvUtils.getNodeNum(index);
     super.initEnvironment(nodeNum.getLeft(), nodeNum.getRight());
   }
 
   @Override
-  public void initClusterEnvironment(int configNodesNum, int dataNodesNum) {
+  public void initClusterEnvironment(final int configNodesNum, final int 
dataNodesNum) {
     super.initEnvironment(configNodesNum, dataNodesNum);
   }
 
   @Override
   public void initClusterEnvironment(
-      int configNodesNum, int dataNodesNum, int testWorkingRetryCount) {
+      final int configNodesNum, final int dataNodesNum, final int 
testWorkingRetryCount) {
     super.initEnvironment(configNodesNum, dataNodesNum, testWorkingRetryCount);
   }
 }
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
index bf28c407f72..e3f9b0b5d3e 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/AbstractNodeWrapper.java
@@ -649,7 +649,7 @@ public abstract class AbstractNodeWrapper implements 
BaseNodeWrapper {
     if (testMethodName == null) {
       return testClassName;
     }
-    return testClassName + "_" + testMethodName;
+    return testClassName + File.separator + testMethodName;
   }
 
   public void setKillPoints(List<String> killPoints) {
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/ConfigNodeWrapper.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/ConfigNodeWrapper.java
index a509790ad07..2b3211fe868 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/ConfigNodeWrapper.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/ConfigNodeWrapper.java
@@ -57,23 +57,17 @@ public class ConfigNodeWrapper extends AbstractNodeWrapper {
   private final String defaultCommonPropertiesFile;
 
   public ConfigNodeWrapper(
-      boolean isSeed,
-      String targetCNs,
-      String testClassName,
-      String testMethodName,
-      int[] portList,
-      int clusterIndex,
-      boolean isMultiCluster,
-      long startTime) {
+      final boolean isSeed,
+      final String targetCNs,
+      final String testClassName,
+      final String testMethodName,
+      final int[] portList,
+      final int clusterIndex,
+      final boolean isMultiCluster,
+      final long startTime) {
     super(testClassName, testMethodName, portList, clusterIndex, 
isMultiCluster, startTime);
     this.consensusPort = portList[1];
     this.isSeed = isSeed;
-    String seedConfigNodes;
-    if (isSeed) {
-      seedConfigNodes = getIpAndPortString();
-    } else {
-      seedConfigNodes = targetCNs;
-    }
     this.defaultNodePropertiesFile =
         EnvUtils.getFilePathFromSysVar(DEFAULT_CONFIG_NODE_PROPERTIES, 
clusterIndex);
     this.defaultCommonPropertiesFile =
@@ -83,7 +77,8 @@ public class ConfigNodeWrapper extends AbstractNodeWrapper {
     reloadMutableFields();
 
     // initialize immutable properties
-    immutableNodeProperties.setProperty(IoTDBConstant.CN_SEED_CONFIG_NODE, 
seedConfigNodes);
+    immutableNodeProperties.setProperty(
+        IoTDBConstant.CN_SEED_CONFIG_NODE, isSeed ? getIpAndPortString() : 
targetCNs);
     immutableNodeProperties.setProperty(CN_SYSTEM_DIR, 
MppBaseConfig.NULL_VALUE);
     immutableNodeProperties.setProperty(CN_CONSENSUS_DIR, 
MppBaseConfig.NULL_VALUE);
     immutableNodeProperties.setProperty(CN_METRIC_IOTDB_REPORTER_HOST, 
MppBaseConfig.NULL_VALUE);
@@ -129,7 +124,7 @@ public class ConfigNodeWrapper extends AbstractNodeWrapper {
   }
 
   @Override
-  protected void addStartCmdParams(List<String> params) {
+  protected void addStartCmdParams(final List<String> params) {
     final String workDir = getNodePath();
     final String confDir = workDir + File.separator + "conf";
     params.addAll(
@@ -166,14 +161,14 @@ public class ConfigNodeWrapper extends 
AbstractNodeWrapper {
 
   @Override
   protected void renameFile() {
-    String configNodeName = isSeed ? "SeedConfigNode" : "ConfigNode";
+    final String configNodeName = isSeed ? "SeedConfigNode" : "ConfigNode";
     // rename log file
-    File oldLogFile =
+    final File oldLogFile =
         new File(getLogDirPath() + File.separator + configNodeName + 
portList[0] + ".log");
     oldLogFile.renameTo(new File(getLogDirPath() + File.separator + getId() + 
".log"));
 
     // rename node dir
-    File oldNodeDir =
+    final File oldNodeDir =
         new File(
             System.getProperty(USER_DIR)
                 + File.separator
@@ -184,7 +179,7 @@ public class ConfigNodeWrapper extends AbstractNodeWrapper {
     oldNodeDir.renameTo(new File(getNodePath()));
   }
 
-  public void setConsensusPort(int consensusPort) {
+  public void setConsensusPort(final int consensusPort) {
     this.consensusPort = consensusPort;
   }
 
diff --git 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
index cb7e840af63..127f9b33185 100644
--- 
a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
+++ 
b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/node/DataNodeWrapper.java
@@ -83,13 +83,13 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
   private final String defaultCommonPropertiesFile;
 
   public DataNodeWrapper(
-      String seedConfigNode,
-      String testClassName,
-      String testMethodName,
-      int[] portList,
-      int clusterIndex,
-      boolean isMultiCluster,
-      long startTime) {
+      final String seedConfigNode,
+      final String testClassName,
+      final String testMethodName,
+      final int[] portList,
+      final int clusterIndex,
+      final boolean isMultiCluster,
+      final long startTime) {
     super(testClassName, testMethodName, portList, clusterIndex, 
isMultiCluster, startTime);
     this.internalAddress = super.getIp();
     this.mppDataExchangePort = portList[1];
@@ -161,7 +161,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
   }
 
   @Override
-  protected void addStartCmdParams(List<String> params) {
+  protected void addStartCmdParams(final List<String> params) {
     final String workDir = getNodePath();
     final String confDir = workDir + File.separator + "conf";
     params.addAll(
@@ -214,22 +214,22 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
   @Override
   public void renameFile() {
     // Rename log file
-    String oldLogFilePath =
+    final String oldLogFilePath =
         getLogDirPath() + File.separator + DATA_NODE_NAME + portList[0] + 
".log";
-    String newLogFilePath = getLogDirPath() + File.separator + getId() + 
".log";
-    File oldLogFile = new File(oldLogFilePath);
+    final String newLogFilePath = getLogDirPath() + File.separator + getId() + 
".log";
+    final File oldLogFile = new File(oldLogFilePath);
     oldLogFile.renameTo(new File(newLogFilePath));
 
     // Rename node dir
-    String oldNodeDirPath =
+    final String oldNodeDirPath =
         System.getProperty(USER_DIR)
             + File.separator
             + TARGET
             + File.separator
             + DATA_NODE_NAME
             + portList[0];
-    String newNodeDirPath = getNodePath();
-    File oldNodeDir = new File(oldNodeDirPath);
+    final String newNodeDirPath = getNodePath();
+    final File oldNodeDir = new File(oldNodeDirPath);
     oldNodeDir.renameTo(new File(newNodeDirPath));
   }
 
@@ -237,7 +237,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
     return mppDataExchangePort;
   }
 
-  public void setMppDataExchangePort(int mppDataExchangePort) {
+  public void setMppDataExchangePort(final int mppDataExchangePort) {
     this.mppDataExchangePort = mppDataExchangePort;
   }
 
@@ -249,7 +249,7 @@ public class DataNodeWrapper extends AbstractNodeWrapper {
     return internalPort;
   }
 
-  public void setInternalPort(int internalPort) {
+  public void setInternalPort(final int internalPort) {
     this.internalPort = internalPort;
   }
 
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/it/framework/IoTDBTestRunner.java
 
b/integration-test/src/test/java/org/apache/iotdb/it/framework/IoTDBTestRunner.java
index 07e04514688..17e20d91e9a 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/it/framework/IoTDBTestRunner.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/it/framework/IoTDBTestRunner.java
@@ -37,33 +37,30 @@ public class IoTDBTestRunner extends BlockJUnit4ClassRunner 
{
   private static final Logger logger = IoTDBTestLogger.logger;
   private IoTDBTestListener listener;
 
-  public IoTDBTestRunner(Class<?> testClass) throws InitializationError {
+  public IoTDBTestRunner(final Class<?> testClass) throws InitializationError {
     super(testClass);
   }
 
   @Override
-  public void run(RunNotifier notifier) {
-    TimeZone.setDefault(TimeZone.getTimeZone("Bejing"));
+  public void run(final RunNotifier notifier) {
+    TimeZone.setDefault(TimeZone.getTimeZone("UTC+08:00"));
     listener = new IoTDBTestListener(this.getName());
     notifier.addListener(listener);
     super.run(notifier);
   }
 
   @Override
-  protected void runChild(final FrameworkMethod method, RunNotifier notifier) {
-    Description description = describeChild(method);
+  protected void runChild(final FrameworkMethod method, final RunNotifier 
notifier) {
+    final Description description = describeChild(method);
     logger.info("Run {}", description.getMethodName());
-    long currentTime = System.currentTimeMillis();
+    final long currentTime = System.currentTimeMillis();
     if (EnvType.getSystemEnvType() != EnvType.MultiCluster) {
       EnvFactory.getEnv().setTestMethodName(description.getMethodName());
-    } else {
-      // TestMethodName must be set globally in MultiEnvFactory, since the
-      // cluster environments are not created now
-      MultiEnvFactory.setTestMethodName(description.getMethodName());
     }
+    MultiEnvFactory.setTestMethodName(description.getMethodName());
     super.runChild(method, notifier);
-    double timeCost = (System.currentTimeMillis() - currentTime) / 1000.0;
-    String testName = description.getClassName() + "." + 
description.getMethodName();
+    final double timeCost = (System.currentTimeMillis() - currentTime) / 
1000.0;
+    final String testName = description.getClassName() + "." + 
description.getMethodName();
     logger.info("Done {}. Cost: {}s", description.getMethodName(), timeCost);
     listener.addTestStat(new IoTDBTestStat(testName, timeCost));
   }
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
index 14567b01f11..8caabaf6f62 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/single/IoTDBPipeOPCUAIT.java
@@ -22,15 +22,21 @@ package org.apache.iotdb.pipe.it.single;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.MultiClusterIT1;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
 
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+@RunWith(IoTDBTestRunner.class)
+@Category({MultiClusterIT1.class})
 public class IoTDBPipeOPCUAIT extends AbstractPipeSingleIT {
   @Test
   public void testOPCUASink() throws Exception {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
index bfd57fe6e24..310ed2640d3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/extractor/dataregion/historical/PipeHistoricalDataRegionTsFileExtractor.java
@@ -536,7 +536,6 @@ public class PipeHistoricalDataRegionTsFileExtractor 
implements PipeHistoricalDa
 
     return deviceSet.stream()
         .anyMatch(
-            // TODO: use IDeviceID
             deviceID -> pipePattern.mayOverlapWithDevice(((PlainDeviceID) 
deviceID).toStringID()));
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SchemaRegionSnapshotParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SchemaRegionSnapshotParser.java
index 7f6a719070b..a9ad880bebb 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SchemaRegionSnapshotParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/tools/schema/SchemaRegionSnapshotParser.java
@@ -34,7 +34,6 @@ import java.io.IOException;
 import java.nio.file.DirectoryStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -49,7 +48,8 @@ public class SchemaRegionSnapshotParser {
     // Empty constructor
   }
 
-  private static Path getLatestSnapshotPath(List<Path> snapshotPathList, 
boolean includingTmp) {
+  private static Path getLatestSnapshotPath(
+      final List<Path> snapshotPathList, final boolean includingTmp) {
     if (snapshotPathList.isEmpty()) {
       return null;
     }
@@ -68,69 +68,28 @@ public class SchemaRegionSnapshotParser {
     return pathArray[pathArray.length - 1];
   }
 
-  // Return all schema region's latest snapshot units in this datanode.
-  public static List<Pair<Path, Path>> getSnapshotPaths() {
-    final String snapshotPath = CONFIG.getSchemaRegionConsensusDir();
-    final File snapshotDir = new File(snapshotPath);
-    final ArrayList<Pair<Path, Path>> snapshotUnits = new ArrayList<>();
-
-    // Get schema region path
-    try (DirectoryStream<Path> stream =
-        Files.newDirectoryStream(snapshotDir.toPath(), 
"[0-9]*-[0-9]*-[0-9]*-[0-9]*-[0-9]*")) {
-      for (Path path : stream) {
-        try (DirectoryStream<Path> filestream =
-            Files.newDirectoryStream(Paths.get(path.toString() + 
File.separator + "sm"))) {
-          // Find the latest snapshots
-          final ArrayList<Path> snapshotList = new ArrayList<>();
-          for (Path snapshotFolder : filestream) {
-            if (snapshotFolder.toFile().isDirectory()) {
-              snapshotList.add(snapshotFolder);
-            }
-          }
-          final Path latestSnapshotPath = getLatestSnapshotPath(snapshotList, 
false);
-          if (latestSnapshotPath != null) {
-            // Get metadata from the latest snapshot folder.
-            final File mTreeSnapshot =
-                SystemFileFactory.INSTANCE.getFile(
-                    latestSnapshotPath + File.separator + 
SchemaConstant.MTREE_SNAPSHOT);
-            final File tagSnapshot =
-                SystemFileFactory.INSTANCE.getFile(
-                    latestSnapshotPath + File.separator + 
SchemaConstant.TAG_LOG_SNAPSHOT);
-            if (mTreeSnapshot.exists()) {
-              snapshotUnits.add(
-                  new Pair<>(
-                      mTreeSnapshot.toPath(), tagSnapshot.exists() ? 
tagSnapshot.toPath() : null));
-            }
-          }
-        }
-      }
-    } catch (IOException exception) {
-      LOGGER.warn("Cannot construct snapshot directory stream", exception);
-    }
-    return snapshotUnits;
-  }
-
   // In schema snapshot path: 
datanode/consensus/schema_region/47474747-4747-4747-4747-000200000000
   // this func will get schema region id = 
47474747-4747-4747-4747-000200000000's latest snapshot.
   // In one schema region, there is only one snapshot unit.
-  public static Pair<Path, Path> getSnapshotPaths(String schemaRegionId, 
boolean isTmp) {
+  public static Pair<Path, Path> getSnapshotPaths(
+      final String schemaRegionId, final boolean isTmp) {
     final String snapshotPath = CONFIG.getSchemaRegionConsensusDir();
     final File snapshotDir =
         new File(snapshotPath + File.separator + schemaRegionId + 
File.separator + "sm");
 
     // Get the latest snapshot file
     final ArrayList<Path> snapshotList = new ArrayList<>();
-    try (DirectoryStream<Path> stream =
+    try (final DirectoryStream<Path> stream =
         Files.newDirectoryStream(
             snapshotDir.toPath(), isTmp ? ".tmp.[0-9]*_[0-9]*" : 
"[0-9]*_[0-9]*")) {
-      for (Path path : stream) {
+      for (final Path path : stream) {
         snapshotList.add(path);
       }
-    } catch (IOException ioException) {
+    } catch (final IOException ioException) {
       LOGGER.warn("ioexception when get {}'s folder", schemaRegionId, 
ioException);
       return null;
     }
-    Path latestSnapshotPath = getLatestSnapshotPath(snapshotList, isTmp);
+    final Path latestSnapshotPath = getLatestSnapshotPath(snapshotList, isTmp);
     if (latestSnapshotPath != null) {
       // Get metadata from the latest snapshot folder.
       final File mTreeSnapshot =
@@ -148,17 +107,14 @@ public class SchemaRegionSnapshotParser {
   }
 
   public static SRStatementGenerator translate2Statements(
-      Path mtreePath, Path tagFilePath, PartialPath databasePath) throws 
IOException {
+      final Path mtreePath, final Path tagFilePath, final PartialPath 
databasePath)
+      throws IOException {
     if (mtreePath == null) {
       return null;
     }
     final File mtreefile = mtreePath.toFile();
-    final File tagfile;
-    if (tagFilePath != null && tagFilePath.toFile().exists()) {
-      tagfile = tagFilePath.toFile();
-    } else {
-      tagfile = null;
-    }
+    final File tagfile =
+        tagFilePath != null && tagFilePath.toFile().exists() ? 
tagFilePath.toFile() : null;
 
     if (!mtreefile.exists()) {
       return null;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
index 1a3c394af7d..59867861ed9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/DateTimeUtils.java
@@ -711,6 +711,8 @@ public class DateTimeUtils {
       case "us":
         timestamp /= 1000;
         break;
+      default:
+        break;
     }
     return LocalDateTime.ofInstant(Instant.ofEpochMilli(timestamp), 
ZoneId.systemDefault())
         .toString();

Reply via email to