This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch load_v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/load_v2 by this push:
     new 723e65834a7 fix sonar issues
723e65834a7 is described below

commit 723e65834a7f378b0317e30bcff849bcb7ee76fd
Author: Tian Jiang <[email protected]>
AuthorDate: Mon Nov 20 15:29:48 2023 +0800

    fix sonar issues
---
 .../thrift/async/IoTDBThriftAsyncConnector.java    |   9 +-
 .../impl/DataNodeInternalRPCServiceImpl.java       |  24 +-
 .../execution/load/TsFileSplitSender.java          | 265 ++++++++++++++-------
 .../locseq/ThroughputBasedLocationSequencer.java   |   2 +-
 .../nodesplit/ClusteringMeasurementSplitter.java   |  61 ++---
 .../analyze/partition/BasicPartitionFetcher.java   |  23 +-
 .../analyze/partition/ClusterPartitionFetcher.java |   1 +
 .../plan/analyze/schema/SchemaValidator.java       |  69 +++++-
 .../planner/plan/node/load/LoadCommandNode.java    |   7 +-
 .../iotdb/db/storageengine/StorageEngine.java      |   9 +-
 .../execution/load/LoadTsFileManagerTest.java      |   8 +-
 .../execution/load/LoadTsFileSchedulerTest.java    |  29 +--
 .../db/queryengine/execution/load/TestBase.java    |  10 +-
 .../execution/load/TsFileSplitSenderTest.java      |  50 ++--
 .../org/apache/iotdb/tsfile/utils/TsFileUtils.java |   2 +-
 15 files changed, 344 insertions(+), 225 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
index 10a901a3712..e501d322e53 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBThriftAsyncConnector.java
@@ -467,15 +467,16 @@ public class IoTDBThriftAsyncConnector extends 
IoTDBConnector {
         pipeTsFileInsertionEvent.getTsFiles().size(),
         targetConfigNodes);
 
-    if (splitSender.getStatistic().hasP2Timeout) {
-      double throughput = splitSender.getStatistic().p2ThroughputMBPS();
+    if (splitSender.getStatistic().isHasP2Timeout()) {
+      double throughput = splitSender.getStatistic().p2ThroughputMbps();
       Map<String, Object> param = new HashMap<>(2);
       param.put(
-          PipeBatchTsFileInsertionEvent.CONNECTOR_TIMEOUT_MS, 
splitSender.getStatistic().p2Timeout);
+          PipeBatchTsFileInsertionEvent.CONNECTOR_TIMEOUT_MS,
+          splitSender.getStatistic().getP2Timeout());
       param.put(PipeBatchTsFileInsertionEvent.CONNECTOR_THROUGHPUT_MBPS_KEY, 
throughput);
       pipeTsFileInsertionEvent.getExtractorOnConnectorTimeout().apply(param);
     } else {
-      double throughput = splitSender.getStatistic().p2ThroughputMBPS();
+      double throughput = splitSender.getStatistic().p2ThroughputMbps();
       pipeTsFileInsertionEvent
           .getExtractorOnConnectorSuccess()
           .apply(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
index ef80ec7b9a9..4578d8fe96c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeInternalRPCServiceImpl.java
@@ -419,12 +419,12 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
           currDevice = device;
           currMeasurement = measurement;
         } else if (!currDevice.equals(device)) {
-          validatingSchema.devicePaths.add(new PartialPath(currDevice));
-          validatingSchema.measurements.add(measurements.toArray(new 
String[0]));
-          validatingSchema.dataTypes.add(dataTypes.toArray(new TSDataType[0]));
-          validatingSchema.encodings.add(encodings.toArray(new TSEncoding[0]));
-          validatingSchema.compressionTypes.add(compressionTypes.toArray(new 
CompressionType[0]));
-          validatingSchema.isAlignedList.add(isAligned);
+          validatingSchema.getDevicePaths().add(new PartialPath(currDevice));
+          validatingSchema.getMeasurements().add(measurements.toArray(new 
String[0]));
+          validatingSchema.getDataTypes().add(dataTypes.toArray(new 
TSDataType[0]));
+          validatingSchema.getEncodings().add(encodings.toArray(new 
TSEncoding[0]));
+          
validatingSchema.getCompressionTypes().add(compressionTypes.toArray(new 
CompressionType[0]));
+          validatingSchema.getIsAlignedList().add(isAligned);
           currDevice = device;
           currMeasurement = measurement;
           measurements.clear();
@@ -438,12 +438,12 @@ public class DataNodeInternalRPCServiceImpl implements 
IDataNodeRPCService.Iface
       }
     }
 
-    validatingSchema.devicePaths.add(new PartialPath(currDevice));
-    validatingSchema.measurements.add(measurements.toArray(new String[0]));
-    validatingSchema.dataTypes.add(dataTypes.toArray(new TSDataType[0]));
-    validatingSchema.encodings.add(encodings.toArray(new TSEncoding[0]));
-    validatingSchema.compressionTypes.add(compressionTypes.toArray(new 
CompressionType[0]));
-    validatingSchema.isAlignedList.add(isAligned);
+    validatingSchema.getDevicePaths().add(new PartialPath(currDevice));
+    validatingSchema.getMeasurements().add(measurements.toArray(new 
String[0]));
+    validatingSchema.getDataTypes().add(dataTypes.toArray(new TSDataType[0]));
+    validatingSchema.getEncodings().add(encodings.toArray(new TSEncoding[0]));
+    validatingSchema.getCompressionTypes().add(compressionTypes.toArray(new 
CompressionType[0]));
+    validatingSchema.getIsAlignedList().add(isAligned);
     return validatingSchema;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
index 606e96a9435..a53eed48d68 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSender.java
@@ -106,6 +106,7 @@ public class TsFileSplitSender {
   private String userName;
   private String password;
 
+  @SuppressWarnings("java:S107")
   public TsFileSplitSender(
       LoadTsFileNode loadTsFileNode,
       DataPartitionBatchFetcher targetPartitionFetcher,
@@ -128,11 +129,11 @@ public class TsFileSplitSender {
     this.userName = userName;
     this.password = password;
 
-    this.statistic.totalSize = loadTsFileNode.getTotalSize();
+    this.statistic.setTotalSize(loadTsFileNode.getTotalSize());
   }
 
   public void start() throws IOException {
-    statistic.taskStartTime = System.currentTimeMillis();
+    statistic.setTaskStartTime(System.currentTimeMillis());
     // skip files without data
     loadTsFileNode.getResources().removeIf(f -> f.getDevices().isEmpty());
     uuid = UUID.randomUUID().toString();
@@ -145,7 +146,7 @@ public class TsFileSplitSender {
     } else {
       logger.warn("Can not Load TsFiles {}", loadTsFileNode.getResources());
     }
-    statistic.taskEndTime = System.currentTimeMillis();
+    statistic.setTaskEndTime(System.currentTimeMillis());
     locationStatistics.logLocationStatistics();
     statistic.logStatistic();
   }
@@ -186,11 +187,40 @@ public class TsFileSplitSender {
         tsFileDataManager.sendAllTsFileData()
             && processRemainingPieceNodes()
             && phaseOneFailures.isEmpty();
-    statistic.p1TimeMS = System.currentTimeMillis() - start;
-    logger.info("Cleanup ends after {}ms", statistic.p1TimeMS);
+    statistic.setP1TimeMS(System.currentTimeMillis() - start);
+    logger.info("Cleanup ends after {}ms", statistic.getP1TimeMS());
     return success;
   }
 
+  private boolean loadInGroup(
+      TDataNodeLocation dataNodeLocation, TLoadCommandReq loadCommandReq)
+      throws SocketException, FragmentInstanceDispatchException, 
InterruptedException {
+    TEndPoint endPoint = dataNodeLocation.getInternalEndPoint();
+
+    for (int i = 0; i < MAX_RETRY && !Thread.interrupted(); i++) {
+      try (SyncDataNodeInternalServiceClient client =
+          internalServiceClientManager.borrowClient(endPoint)) {
+        // record timeout for recalculating max batch size
+        if (statistic.getP2Timeout() == 0) {
+          statistic.setP2Timeout(client.getTimeout());
+        }
+
+        TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
+        if (!loadResp.isAccepted()) {
+          logger.warn(loadResp.message);
+          throw new FragmentInstanceDispatchException(loadResp.status);
+        } else {
+          // if any node in this replica set succeeds, it is loaded
+          return true;
+        }
+      } catch (ClientManagerException | TException e) {
+        logger.debug("{} timed out, retrying...", endPoint, e);
+      }
+
+      Thread.sleep(RETRY_INTERVAL_MS);
+    }
+    return false;
+  }
   private Void loadInGroup(
       TRegionReplicaSet replicaSet, TLoadCommandReq loadCommandReq, 
AtomicBoolean hasTimeout)
       throws SocketException {
@@ -201,50 +231,27 @@ public class TsFileSplitSender {
           uuid,
           dataNodeLocation,
           replicaSet.regionId);
-      TEndPoint endPoint = dataNodeLocation.getInternalEndPoint();
-      boolean loaded = false;
-
-      for (int i = 0; i < MAX_RETRY; i++) {
-        try (SyncDataNodeInternalServiceClient client =
-            internalServiceClientManager.borrowClient(endPoint)) {
-          // record timeout for recalculating max batch size
-          if (statistic.p2Timeout == 0) {
-            statistic.p2Timeout = client.getTimeout();
-          }
-
-          TLoadResp loadResp = client.sendLoadCommand(loadCommandReq);
-          if (!loadResp.isAccepted()) {
-            logger.warn(loadResp.message);
-            locationException = new 
FragmentInstanceDispatchException(loadResp.status);
-          } else {
-            // if any node in this replica set succeeds, it is loaded
-            locationException = null;
-            loaded = true;
-          }
+      try {
+        if (loadInGroup(dataNodeLocation, loadCommandReq)) {
+          // if any node in this replica set succeeds, it is loaded
+          locationException = null;
           break;
-        } catch (ClientManagerException | TException e) {
+        } else {
+          // the location timed out
           TSStatus status = new TSStatus();
           status.setCode(TSStatusCode.DISPATCH_ERROR.getStatusCode());
           status.setMessage(
               "can't connect to node {}, please reset longer 
dn_connection_timeout_ms "
                   + "in iotdb-common.properties and restart iotdb."
-                  + endPoint);
-          locationException = new FragmentInstanceDispatchException(status);
+                  + dataNodeLocation.internalEndPoint);
           hasTimeout.set(true);
+          locationException = new FragmentInstanceDispatchException(status);
         }
-
-        try {
-          Thread.sleep(RETRY_INTERVAL_MS);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-          locationException = e;
-          break;
-        }
-      }
-
-      if (loaded) {
-        // if any node in this replica set succeeds, it is loaded
-        break;
+      } catch (FragmentInstanceDispatchException e) {
+        locationException = e;
+      } catch (InterruptedException e) {
+        locationException = e;
+        Thread.currentThread().interrupt();
       }
     }
 
@@ -281,22 +288,20 @@ public class TsFileSplitSender {
         phaseTwoFailures.put(loadFuture.left.regionId, e);
       }
     }
-    statistic.p2TimeMS = System.currentTimeMillis() - p2StartMS;
-    statistic.hasP2Timeout = hasTimeout.get();
+    statistic.setP2TimeMS(System.currentTimeMillis() - p2StartMS);
+    statistic.setHasP2Timeout(hasTimeout.get());
 
     return phaseTwoFailures.isEmpty();
   }
 
   public LocationSequencer createLocationSequencer(TRegionReplicaSet 
replicaSet) {
-    //    return new FixedLocationSequencer(replicaSet);
-    //    return new RandomLocationSequencer(replicaSet);
     return new ThroughputBasedLocationSequencer(replicaSet, 
locationStatistics);
   }
 
   private ByteBuffer compressBuffer(ByteBuffer buffer) throws IOException {
-    statistic.rawSize.addAndGet(buffer.remaining());
+    statistic.getRawSize().addAndGet(buffer.remaining());
     if (compressionType.equals(CompressionType.UNCOMPRESSED)) {
-      statistic.compressedSize.addAndGet(buffer.remaining());
+      statistic.getCompressedSize().addAndGet(buffer.remaining());
       return buffer;
     }
     ICompressor compressor = ICompressor.getCompressor(compressionType);
@@ -309,7 +314,7 @@ public class TsFileSplitSender {
             buffer.remaining(),
             compressed.array());
     compressed.limit(compressLength);
-    statistic.compressedSize.addAndGet(compressLength);
+    statistic.getCompressedSize().addAndGet(compressLength);
     return compressed;
   }
 
@@ -324,10 +329,10 @@ public class TsFileSplitSender {
         subNodes = pair.left.get();
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        logger.error("Unexpected error during splitting node", e);
+        logger.error("Unexpected interruption during splitting node", e);
         return false;
       } catch (ExecutionException e) {
-        logger.error("Unexpected error during splitting node", e);
+        logger.error("Unexpected execution error during splitting node", e);
         return false;
       }
       if (!dispatchPieceNodes(subNodes, pair.right)) {
@@ -397,7 +402,7 @@ public class TsFileSplitSender {
       return false;
     }
     long compressingTime = System.nanoTime() - startTime;
-    statistic.compressingTimeNs.addAndGet(compressingTime);
+    statistic.getCompressingTimeNs().addAndGet(compressingTime);
 
     TTsFilePieceReq loadTsFileReq = genLoadReq(buffer, replicaSet, 
uncompressedLength);
     LocationSequencer locationSequencer = createLocationSequencer(replicaSet);
@@ -451,7 +456,7 @@ public class TsFileSplitSender {
             .map(node -> dispatchOneFinalNode(node, replicaSet))
             .collect(Collectors.toList());
     long elapsedTime = System.nanoTime() - start;
-    statistic.dispatchNodesTimeNS.addAndGet(elapsedTime);
+    statistic.getDispatchNodesTimeNS().addAndGet(elapsedTime);
     return !subNodeResults.contains(false);
   }
 
@@ -464,7 +469,7 @@ public class TsFileSplitSender {
     // split the piece node asynchronously to improve parallelism
     if (splitFutures.size() < MAX_PENDING_PIECE_NODE) {
       splitFutures.add(new Pair<>(submitSplitPieceNode(pieceNode), 
replicaSet));
-      statistic.dispatchNodeTimeNS.addAndGet(System.nanoTime() - allStart);
+      statistic.getDispatchNodeTimeNS().addAndGet(System.nanoTime() - 
allStart);
       return true;
     } else {
       // wait for the first split task to complete if too many task
@@ -473,71 +478,155 @@ public class TsFileSplitSender {
       try {
         subNodes = pair.left.get();
         long elapsedTime = System.nanoTime() - start;
-        statistic.splitTime.addAndGet(elapsedTime);
-        statistic.pieceNodeNum.incrementAndGet();
+        statistic.getSplitTime().addAndGet(elapsedTime);
+        statistic.getPieceNodeNum().incrementAndGet();
         logger.debug(
             "{} splits are generated after {}ms", subNodes.size(), elapsedTime 
/ 1_000_000L);
 
         splitFutures.add(new Pair<>(submitSplitPieceNode(pieceNode), 
replicaSet));
       } catch (InterruptedException e) {
         Thread.currentThread().interrupt();
-        logger.error("Unexpected error during splitting node", e);
+        logger.error("Unexpected interruption during splitting node", e);
         return false;
       } catch (ExecutionException e) {
-        logger.error("Unexpected error during splitting node", e);
+        logger.error("Unexpected execution error during splitting node", e);
         return false;
       }
       // send the split nodes to the replicas
       boolean success = dispatchPieceNodes(subNodes, pair.right);
-      statistic.dispatchNodeTimeNS.addAndGet(System.nanoTime() - allStart);
+      statistic.getDispatchNodeTimeNS().addAndGet(System.nanoTime() - 
allStart);
       return success;
     }
   }
 
   public static class Statistic {
 
-    public long taskStartTime;
-    public long taskEndTime;
-    public AtomicLong rawSize = new AtomicLong();
-    public AtomicLong compressedSize = new AtomicLong();
-    public AtomicLong splitTime = new AtomicLong();
-    public AtomicLong pieceNodeNum = new AtomicLong();
-    public AtomicLong dispatchNodesTimeNS = new AtomicLong();
-    public AtomicLong dispatchNodeTimeNS = new AtomicLong();
-    public AtomicLong compressingTimeNs = new AtomicLong();
-    public long p1TimeMS;
-    public long p2TimeMS;
-    public long totalSize;
-    public boolean hasP2Timeout;
-    public long p2Timeout;
+    private long taskStartTime;
+    private long taskEndTime;
+    private AtomicLong rawSize = new AtomicLong();
+    private AtomicLong compressedSize = new AtomicLong();
+    private AtomicLong splitTime = new AtomicLong();
+    private AtomicLong pieceNodeNum = new AtomicLong();
+    private AtomicLong dispatchNodesTimeNS = new AtomicLong();
+    private AtomicLong dispatchNodeTimeNS = new AtomicLong();
+    private AtomicLong compressingTimeNs = new AtomicLong();
+    private long p1TimeMS;
+    private long p2TimeMS;
+    private long totalSize;
+    private boolean hasP2Timeout;
+    private long p2Timeout;
 
     public void logStatistic() {
       logger.info(
           "Time consumption: {}ms, totalSize: {}MB",
-          taskEndTime - taskStartTime,
-          totalSize * 1.0 / MB);
+          getTaskEndTime() - getTaskStartTime(),
+          getTotalSize() * 1.0 / MB);
       logger.info(
           "Generated {} piece nodes, splitTime: {}, dispatchSplitsTime: {}, 
dispatchNodeTime: {}",
-          pieceNodeNum.get(),
-          splitTime.get() / 1_000_000L,
-          dispatchNodesTimeNS.get() / 1_000_000L,
-          dispatchNodeTimeNS.get() / 1_000_000L);
+          getPieceNodeNum().get(),
+          getSplitTime().get() / 1_000_000L,
+          getDispatchNodesTimeNS().get() / 1_000_000L,
+          getDispatchNodeTimeNS().get() / 1_000_000L);
       logger.info(
           "Transmission size: {}/{} ({}), compressionTime: {}ms",
-          compressedSize.get(),
-          rawSize.get(),
-          compressedSize.get() * 1.0 / rawSize.get(),
-          compressingTimeNs.get() / 1_000_000L);
-      logger.info("Sync TsFile time: {}ms ({})", p1TimeMS, p1ThroughputMBPS());
-      logger.info("Load command execution time: {}ms ({})", p2TimeMS, 
p2ThroughputMBPS());
+          getCompressedSize().get(),
+          getRawSize().get(),
+          getCompressedSize().get() * 1.0 / getRawSize().get(),
+          getCompressingTimeNs().get() / 1_000_000L);
+      logger.info("Sync TsFile time: {}ms ({})", getP1TimeMS(), 
p1ThroughputMbps());
+      logger.info("Load command execution time: {}ms ({})", getP2TimeMS(), 
p2ThroughputMbps());
+    }
+
+    public double p2ThroughputMbps() {
+      return getTotalSize() * 1.0 / MB / (getP2TimeMS() / 1000.0);
+    }
+
+    public double p1ThroughputMbps() {
+      return getTotalSize() * 1.0 / MB / (getP1TimeMS() / 1000.0);
+    }
+
+    public long getTaskStartTime() {
+      return taskStartTime;
+    }
+
+    public void setTaskStartTime(long taskStartTime) {
+      this.taskStartTime = taskStartTime;
+    }
+
+    public long getTaskEndTime() {
+      return taskEndTime;
+    }
+
+    public void setTaskEndTime(long taskEndTime) {
+      this.taskEndTime = taskEndTime;
+    }
+
+    public AtomicLong getRawSize() {
+      return rawSize;
+    }
+
+    public AtomicLong getCompressedSize() {
+      return compressedSize;
+    }
+
+    public AtomicLong getSplitTime() {
+      return splitTime;
+    }
+
+    public AtomicLong getPieceNodeNum() {
+      return pieceNodeNum;
+    }
+
+    public AtomicLong getDispatchNodesTimeNS() {
+      return dispatchNodesTimeNS;
+    }
+
+    public AtomicLong getDispatchNodeTimeNS() {
+      return dispatchNodeTimeNS;
+    }
+
+    public AtomicLong getCompressingTimeNs() {
+      return compressingTimeNs;
+    }
+
+    public long getP1TimeMS() {
+      return p1TimeMS;
+    }
+
+    public void setP1TimeMS(long p1TimeMS) {
+      this.p1TimeMS = p1TimeMS;
+    }
+
+    public long getP2TimeMS() {
+      return p2TimeMS;
+    }
+
+    public void setP2TimeMS(long p2TimeMS) {
+      this.p2TimeMS = p2TimeMS;
+    }
+
+    public long getTotalSize() {
+      return totalSize;
+    }
+
+    public void setTotalSize(long totalSize) {
+      this.totalSize = totalSize;
+    }
+
+    public boolean isHasP2Timeout() {
+      return hasP2Timeout;
+    }
+
+    public void setHasP2Timeout(boolean hasP2Timeout) {
+      this.hasP2Timeout = hasP2Timeout;
     }
 
-    public double p2ThroughputMBPS() {
-      return totalSize * 1.0 / MB / (p2TimeMS / 1000.0);
+    public long getP2Timeout() {
+      return p2Timeout;
     }
 
-    public double p1ThroughputMBPS() {
-      return totalSize * 1.0 / MB / (p1TimeMS / 1000.0);
+    public void setP2Timeout(long p2Timeout) {
+      this.p2Timeout = p2Timeout;
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java
index ac8254f289f..213cf5bf379 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/locseq/ThroughputBasedLocationSequencer.java
@@ -133,7 +133,7 @@ public class ThroughputBasedLocationSequencer implements 
LocationSequencer {
               .map(p -> new Pair<>(p.left.getDataNodeId(), p.right))
               .collect(Collectors.toList()));
     }
-    Pair<TDataNodeLocation, Double> chosenPair = locations.remove(chosen);
+    final Pair<TDataNodeLocation, Double> chosenPair = 
locations.remove(chosen);
     // update ranks
     double newTotalRank = 0.0;
     for (Pair<TDataNodeLocation, Double> location : locations) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
index 9a4fdf3b96f..7d24273023e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/nodesplit/ClusteringMeasurementSplitter.java
@@ -184,22 +184,16 @@ public class ClusteringMeasurementSplitter implements 
PieceNodeSplitter {
         newNodeA.addTsFileData(tsFileData);
       } else {
         ChunkData chunkData = (ChunkData) tsFileData;
-        if (currMeasurement == null || 
currMeasurement.equals(chunkData.firstMeasurement())) {
+        if (currMeasurement == null || 
currMeasurement.equals(chunkData.firstMeasurement()) ||
+            sizeTarget > 0) {
           // the first chunk or chunk of the same series, add it to A
+          // or the chunk of the next series and splitA is not
           currMeasurement = chunkData.firstMeasurement();
           newNodeA.addTsFileData(tsFileData);
           sizeTarget -= chunkData.getDataSize();
-        } else {
-          // chunk of the next series
-          if (sizeTarget < 0) {
-            // splitA is full, break to fill splitB
-            break;
-          } else {
-            // splitA is not full, also add this series to A
-            currMeasurement = chunkData.firstMeasurement();
-            newNodeA.addTsFileData(tsFileData);
-            sizeTarget -= chunkData.getDataSize();
-          }
+        } else if (sizeTarget < 0) {
+          // a new series but splitA is full, break to fill splitB
+          break;
         }
       }
     }
@@ -455,7 +449,7 @@ public class ClusteringMeasurementSplitter implements 
PieceNodeSplitter {
 
   public class KMeans implements Clustering {
 
-    private int k;
+    private int clusterNum;
     private int maxIteration;
     private double[][] centroids;
     private AtomicInteger[] centroidCounters;
@@ -463,11 +457,11 @@ public class ClusteringMeasurementSplitter implements 
PieceNodeSplitter {
     private Map<Entry<String, double[]>, Integer> recordCentroidMapping;
     private int vecLength = 0;
 
-    public KMeans(int k, int maxIteration) {
-      this.k = k;
+    public KMeans(int clusterNum, int maxIteration) {
+      this.clusterNum = clusterNum;
       this.maxIteration = maxIteration;
-      this.centroids = new double[k][];
-      this.centroidCounters = new AtomicInteger[k];
+      this.centroids = new double[clusterNum][];
+      this.centroidCounters = new AtomicInteger[clusterNum];
       for (int i = 0; i < centroidCounters.length; i++) {
         centroidCounters[i] = new AtomicInteger();
       }
@@ -484,26 +478,24 @@ public class ClusteringMeasurementSplitter implements 
PieceNodeSplitter {
     @Override
     public List<List<String>> cluster(Map<String, double[]> tagVectorMap, 
VectorDistance distance) {
       recordCentroidMapping.clear();
-      if (k > tagVectorMap.size()) {
-        k = tagVectorMap.size();
-        this.centroids = new double[k][];
+      if (clusterNum > tagVectorMap.size()) {
+        clusterNum = tagVectorMap.size();
+        this.centroids = new double[clusterNum][];
       }
 
       for (Entry<String, double[]> entry : tagVectorMap.entrySet()) {
         vecLength = entry.getValue().length;
       }
 
-      randomCentroid(vecLength, tagVectorMap);
+      randomCentroid(tagVectorMap);
 
-      for (int i = 0; i < maxIteration; i++) {
+      for (int i = 0; i < maxIteration && System.currentTimeMillis() - 
splitStartTime <= splitTimeBudget; i++) {
         if (!assignCentroid(tagVectorMap, distance)) {
+          // centroid not updated, end
           break;
         }
         newCentroid();
         clearCentroidCounter();
-        if (System.currentTimeMillis() - splitStartTime > splitTimeBudget) {
-          break;
-        }
       }
 
       Map<Integer, List<Entry<String, double[]>>> centroidRecordMap =
@@ -542,9 +534,9 @@ public class ClusteringMeasurementSplitter implements 
PieceNodeSplitter {
                 List<Entry<String, double[]>> records = e.getValue();
                 int recordNum = records.size();
                 double[] sumVec = new double[vecLength];
-                for (Entry<String, double[]> record : records) {
+                for (Entry<String, double[]> rec : records) {
                   for (int i = 0; i < sumVec.length; i++) {
-                    sumVec[i] += record.getValue()[i];
+                    sumVec[i] += rec.getValue()[i];
                   }
                 }
                 for (int i = 0; i < sumVec.length; i++) {
@@ -590,27 +582,16 @@ public class ClusteringMeasurementSplitter implements 
PieceNodeSplitter {
       return centroidUpdated.get();
     }
 
-    private void randomCentroid(int vecLength, Map<String, double[]> 
tagVectorMap) {
+    private void randomCentroid(Map<String, double[]> tagVectorMap) {
       pickRandomCentroid(tagVectorMap);
-      // genRandomCentroid(vecLength);
     }
 
     private void pickRandomCentroid(Map<String, double[]> tagVectorMap) {
       List<double[]> recordVectors = new ArrayList<>(tagVectorMap.values());
       Collections.shuffle(recordVectors);
-      for (int i = 0; i < k; i++) {
+      for (int i = 0; i < clusterNum; i++) {
         centroids[i] = recordVectors.get(i);
       }
     }
-
-    private void genRandomCentroid(int vecLength) {
-      for (int i = 0; i < k; i++) {
-        double[] centroid = new double[vecLength];
-        for (int j = 0; j < vecLength; j++) {
-          centroid[j] = random.nextDouble();
-        }
-        centroids[i] = centroid;
-      }
-    }
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/BasicPartitionFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/BasicPartitionFetcher.java
index 98e50f2720b..6f7da416cb9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/BasicPartitionFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/BasicPartitionFetcher.java
@@ -60,6 +60,9 @@ import java.util.Set;
 
 public abstract class BasicPartitionFetcher implements IPartitionFetcher {
 
+  private static final String ERR_GET_DATA_PARTITION = "An error occurred when 
executing getDataPartition():";
+  private static final String ERR_GET_OR_CREATE_DATA_PARTITION = "An error 
occurred when executing getOrCreateDataPartition():";
+
   protected static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
 
   protected final SeriesPartitionExecutor partitionExecutor;
@@ -92,7 +95,7 @@ public abstract class BasicPartitionFetcher implements 
IPartitionFetcher {
           partitionCache.updateSchemaPartitionCache(
               schemaPartitionTableResp.getSchemaPartitionTable());
         } else {
-          throw new RuntimeException(
+          throw new IllegalStateException(
               new IoTDBException(
                   schemaPartitionTableResp.getStatus().getMessage(),
                   schemaPartitionTableResp.getStatus().getCode()));
@@ -122,7 +125,7 @@ public abstract class BasicPartitionFetcher implements 
IPartitionFetcher {
           partitionCache.updateSchemaPartitionCache(
               schemaPartitionTableResp.getSchemaPartitionTable());
         } else {
-          throw new RuntimeException(
+          throw new IllegalStateException(
               new IoTDBException(
                   schemaPartitionTableResp.getStatus().getMessage(),
                   schemaPartitionTableResp.getStatus().getCode()));
@@ -165,12 +168,12 @@ public abstract class BasicPartitionFetcher implements 
IPartitionFetcher {
           
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
         } else {
           throw new StatementAnalyzeException(
-              "An error occurred when executing getDataPartition():"
+              ERR_GET_DATA_PARTITION
                   + dataPartitionTableResp.getStatus().getMessage());
         }
       } catch (ClientManagerException | TException e) {
         throw new StatementAnalyzeException(
-            "An error occurred when executing getDataPartition():" + 
e.getMessage());
+            ERR_GET_DATA_PARTITION + e.getMessage());
       }
     }
     return dataPartition;
@@ -190,12 +193,12 @@ public abstract class BasicPartitionFetcher implements 
IPartitionFetcher {
         return parseDataPartitionResp(dataPartitionTableResp);
       } else {
         throw new StatementAnalyzeException(
-            "An error occurred when executing getDataPartition():"
+            ERR_GET_DATA_PARTITION
                 + dataPartitionTableResp.getStatus().getMessage());
       }
     } catch (ClientManagerException | TException e) {
       throw new StatementAnalyzeException(
-          "An error occurred when executing getDataPartition():" + 
e.getMessage());
+          ERR_GET_DATA_PARTITION + e.getMessage());
     }
   }
 
@@ -214,12 +217,12 @@ public abstract class BasicPartitionFetcher implements 
IPartitionFetcher {
           
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
         } else {
           throw new StatementAnalyzeException(
-              "An error occurred when executing getOrCreateDataPartition():"
+              ERR_GET_OR_CREATE_DATA_PARTITION
                   + dataPartitionTableResp.getStatus().getMessage());
         }
       } catch (ClientManagerException | TException e) {
         throw new StatementAnalyzeException(
-            "An error occurred when executing getOrCreateDataPartition():" + 
e.getMessage());
+            ERR_GET_OR_CREATE_DATA_PARTITION + e.getMessage());
       }
     }
     return dataPartition;
@@ -243,14 +246,14 @@ public abstract class BasicPartitionFetcher implements 
IPartitionFetcher {
           dataPartition = parseDataPartitionResp(dataPartitionTableResp);
           
partitionCache.updateDataPartitionCache(dataPartitionTableResp.getDataPartitionTable());
         } else {
-          throw new RuntimeException(
+          throw new IllegalStateException(
               new IoTDBException(
                   dataPartitionTableResp.getStatus().getMessage(),
                   dataPartitionTableResp.getStatus().getCode()));
         }
       } catch (ClientManagerException | TException e) {
         throw new StatementAnalyzeException(
-            "An error occurred when executing getOrCreateDataPartition():" + 
e.getMessage());
+            ERR_GET_OR_CREATE_DATA_PARTITION + e.getMessage());
       }
     }
     return dataPartition;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ClusterPartitionFetcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ClusterPartitionFetcher.java
index b38346c150c..c7f6a7bde03 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ClusterPartitionFetcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/partition/ClusterPartitionFetcher.java
@@ -27,6 +27,7 @@ import 
org.apache.iotdb.db.protocol.client.ConfigNodeClientManager;
 import org.apache.iotdb.db.protocol.client.ConfigNodeInfo;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.partition.PartitionCache;
 
+@SuppressWarnings("java:S6548")
 public class ClusterPartitionFetcher extends BasicPartitionFetcher {
   private final IClientManager<ConfigRegionId, ConfigNodeClient> 
configNodeClientManager =
       ConfigNodeClientManager.getInstance();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
index 78bf3023943..81b295eb066 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/schema/SchemaValidator.java
@@ -55,6 +55,7 @@ public class SchemaValidator {
     }
   }
 
+  @SuppressWarnings("java:S107")
   public static ISchemaTree validate(
       ISchemaFetcher schemaFetcher,
       List<PartialPath> devicePaths,
@@ -71,22 +72,66 @@ public class SchemaValidator {
   public static ISchemaTree validate(
       ISchemaFetcher schemaFetcher, ValidatingSchema validatingSchema, 
MPPQueryContext context) {
     return schemaFetcher.fetchSchemaListWithAutoCreate(
-        validatingSchema.devicePaths,
-        validatingSchema.measurements,
-        validatingSchema.dataTypes,
-        validatingSchema.encodings,
-        validatingSchema.compressionTypes,
-        validatingSchema.isAlignedList,
+        validatingSchema.getDevicePaths(),
+        validatingSchema.getMeasurements(),
+        validatingSchema.getDataTypes(),
+        validatingSchema.getEncodings(),
+        validatingSchema.getCompressionTypes(),
+        validatingSchema.getIsAlignedList(),
         context);
   }
 
   public static class ValidatingSchema {
 
-    public List<PartialPath> devicePaths = new ArrayList<>();
-    public List<String[]> measurements = new ArrayList<>();
-    public List<TSDataType[]> dataTypes = new ArrayList<>();
-    public List<TSEncoding[]> encodings = new ArrayList<>();
-    public List<CompressionType[]> compressionTypes = new ArrayList<>();
-    public List<Boolean> isAlignedList = new ArrayList<>();
+    private List<PartialPath> devicePaths = new ArrayList<>();
+    private List<String[]> measurements = new ArrayList<>();
+    private List<TSDataType[]> dataTypes = new ArrayList<>();
+    private List<TSEncoding[]> encodings = new ArrayList<>();
+    private List<CompressionType[]> compressionTypes = new ArrayList<>();
+    private List<Boolean> isAlignedList = new ArrayList<>();
+
+    public List<PartialPath> getDevicePaths() {
+      return devicePaths;
+    }
+
+    public List<String[]> getMeasurements() {
+      return measurements;
+    }
+
+    public void setMeasurements(List<String[]> measurements) {
+      this.measurements = measurements;
+    }
+
+    public List<TSDataType[]> getDataTypes() {
+      return dataTypes;
+    }
+
+    public void setDataTypes(
+        List<TSDataType[]> dataTypes) {
+      this.dataTypes = dataTypes;
+    }
+
+    public List<TSEncoding[]> getEncodings() {
+      return encodings;
+    }
+
+    public void setEncodings(
+        List<TSEncoding[]> encodings) {
+      this.encodings = encodings;
+    }
+
+    public List<CompressionType[]> getCompressionTypes() {
+      return compressionTypes;
+    }
+
+    public void setCompressionTypes(
+        List<CompressionType[]> compressionTypes) {
+      this.compressionTypes = compressionTypes;
+    }
+
+    public List<Boolean> getIsAlignedList() {
+      return isAlignedList;
+    }
+
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadCommandNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadCommandNode.java
index 3d42a401b70..b39a174d99c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadCommandNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadCommandNode.java
@@ -62,11 +62,14 @@ public class LoadCommandNode extends PlanNode {
   }
 
   @Override
-  public void addChild(PlanNode child) {}
+  public void addChild(PlanNode child) {
+    // no children for this plan
+  }
 
+  @SuppressWarnings({"java:S2975", "java:S1182"})
   @Override
   public PlanNode clone() {
-    return null;
+    return new LoadCommandNode(getPlanNodeId(), loadCommand, uuid, 
consensusGroupId, isGeneratedByPipe);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
index a759829024b..32a83c0761b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/StorageEngine.java
@@ -101,6 +101,7 @@ import static 
org.apache.iotdb.commons.conf.IoTDBConstant.FILE_NAME_SEPARATOR;
 
 public class StorageEngine implements IService {
 
+  private static final String MSG_NO_SUCH_UUID = "No load TsFile uuid %s 
recorded for execute load command %s.";
   private static final Logger logger = 
LoggerFactory.getLogger(StorageEngine.class);
 
   private static final IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
@@ -807,7 +808,7 @@ public class StorageEngine implements IService {
             status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
             status.setMessage(
                 String.format(
-                    "No load TsFile uuid %s recorded for execute load command 
%s.",
+                    MSG_NO_SUCH_UUID,
                     loadCommandNode.getUuid(), 
loadCommandNode.getLoadCommand()));
           }
           break;
@@ -819,7 +820,7 @@ public class StorageEngine implements IService {
             status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
             status.setMessage(
                 String.format(
-                    "No load TsFile uuid %s recorded for execute load command 
%s.",
+                    MSG_NO_SUCH_UUID,
                     loadCommandNode.getUuid(), 
loadCommandNode.getLoadCommand()));
           }
           break;
@@ -851,7 +852,7 @@ public class StorageEngine implements IService {
             status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
             status.setMessage(
                 String.format(
-                    "No load TsFile uuid %s recorded for execute load command 
%s.",
+                    MSG_NO_SUCH_UUID,
                     uuid, loadCommand));
           }
           break;
@@ -862,7 +863,7 @@ public class StorageEngine implements IService {
             status.setCode(TSStatusCode.LOAD_FILE_ERROR.getStatusCode());
             status.setMessage(
                 String.format(
-                    "No load TsFile uuid %s recorded for execute load command 
%s.",
+                    MSG_NO_SUCH_UUID,
                     uuid, loadCommand));
           }
           break;
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
index 26b7745a451..38084a86c6d 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
@@ -136,7 +136,7 @@ public class LoadTsFileManagerTest extends TestBase {
     }
   }
 
-  public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint 
tEndpoint)
+  public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint 
tEndPoint)
       throws TException, IOException {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
@@ -165,7 +165,7 @@ public class LoadTsFileManagerTest extends TestBase {
           .forEach(
               dataNodeLocation -> {
                 TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
-                if (!otherPoint.equals(tEndpoint)) {
+                if (!otherPoint.equals(tEndPoint)) {
                   try {
                     handleTsFilePieceNode(req, otherPoint);
                   } catch (TException | IOException e) {
@@ -180,7 +180,7 @@ public class LoadTsFileManagerTest extends TestBase {
         .setStatus(new 
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
-  public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint 
tEndpoint)
+  public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint 
tEndPoint)
       throws LoadFileException, IOException {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
@@ -191,7 +191,7 @@ public class LoadTsFileManagerTest extends TestBase {
       TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
       for (TDataNodeLocation dataNodeLocation : 
regionReplicaSet.getDataNodeLocations()) {
         TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
-        if (!otherPoint.equals(tEndpoint)) {
+        if (!otherPoint.equals(tEndPoint)) {
           handleTsLoadCommand(req, otherPoint);
         }
       }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileSchedulerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileSchedulerTest.java
index 9e226eb2dca..9ae155d18c0 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileSchedulerTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileSchedulerTest.java
@@ -109,7 +109,7 @@ public class LoadTsFileSchedulerTest extends TestBase {
     System.out.printf("Split ends after %dms", timeConsumption);
   }
 
-  public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint 
tEndpoint) {
+  public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint 
tEndPoint) {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
     LoadTsFilePieceNode pieceNode =
@@ -117,7 +117,7 @@ public class LoadTsFileSchedulerTest extends TestBase {
     Set<Integer> splitIds =
         phaseOneResults
             .computeIfAbsent(
-                tEndpoint,
+                tEndPoint,
                 e -> new 
ConcurrentSkipListMap<>(Comparator.comparingInt(ConsensusGroupId::getId)))
             .computeIfAbsent(groupId, g -> new ConcurrentSkipListMap<>())
             .computeIfAbsent(req.uuid, id -> new ConcurrentSkipListMap<>())
@@ -132,9 +132,9 @@ public class LoadTsFileSchedulerTest extends TestBase {
         .setStatus(new 
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
-  public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint 
tEndpoint) {
+  public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint 
tEndPoint) {
     phaseTwoResults
-        .computeIfAbsent(tEndpoint, e -> new ConcurrentSkipListMap<>())
+        .computeIfAbsent(tEndPoint, e -> new ConcurrentSkipListMap<>())
         .computeIfAbsent(req.uuid, id -> req.commandType);
 
     return new TLoadResp()
@@ -145,16 +145,16 @@ public class LoadTsFileSchedulerTest extends TestBase {
   public void printPhaseResult() {
     System.out.print("Phase one:\n");
     for (Entry<TEndPoint, Map<ConsensusGroupId, Map<String, Map<File, 
Set<Integer>>>>>
-        tEndPointMapEntry : phaseOneResults.entrySet()) {
-      TEndPoint endPoint = tEndPointMapEntry.getKey();
+        entry : phaseOneResults.entrySet()) {
+      TEndPoint endPoint = entry.getKey();
       for (Entry<ConsensusGroupId, Map<String, Map<File, Set<Integer>>>> 
consensusGroupIdMapEntry :
-          tEndPointMapEntry.getValue().entrySet()) {
+          entry.getValue().entrySet()) {
         ConsensusGroupId consensusGroupId = consensusGroupIdMapEntry.getKey();
         int chunkNum = 0;
         int fileNum = 0;
         int taskNum = 0;
         for (Entry<String, Map<File, Set<Integer>>> stringMapEntry :
-            consensusGroupIdMapEntry.getValue().entrySet()) {;
+            consensusGroupIdMapEntry.getValue().entrySet()) {
           taskNum += 1;
           for (Entry<File, Set<Integer>> fileListEntry : 
stringMapEntry.getValue().entrySet()) {
             Set<Integer> chunks = fileListEntry.getValue();
@@ -165,20 +165,13 @@ public class LoadTsFileSchedulerTest extends TestBase {
         System.out.printf(
             "%s - %s - %s tasks - %s files - %s chunks\n",
             endPoint, consensusGroupId, taskNum, fileNum, chunkNum);
-        //        if (consensusGroupId.getId() == 0) {
-        //          // d1, non-aligned series
-        //          assertEquals(expectedChunkNum() / 2, chunkNum);
-        //        } else {
-        //          // d2, aligned series
-        //          assertEquals(expectedChunkNum() / 2 / seriesNum, chunkNum);
-        //        }
       }
     }
 
     System.out.print("Phase two:\n");
-    for (Entry<TEndPoint, Map<String, Integer>> tEndPointMapEntry : 
phaseTwoResults.entrySet()) {
-      TEndPoint endPoint = tEndPointMapEntry.getKey();
-      for (Entry<String, Integer> stringMapEntry : 
tEndPointMapEntry.getValue().entrySet()) {
+    for (Entry<TEndPoint, Map<String, Integer>> entry : 
phaseTwoResults.entrySet()) {
+      TEndPoint endPoint = entry.getKey();
+      for (Entry<String, Integer> stringMapEntry : 
entry.getValue().entrySet()) {
         String uuid = stringMapEntry.getKey();
         int command = stringMapEntry.getValue();
         System.out.printf("%s - %s - %s\n", endPoint, uuid, 
LoadCommand.values()[command]);
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
index 3ef4c756386..ea8d526ba5a 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
@@ -164,14 +164,14 @@ public class TestBase {
     return (splitChunkNum + fileNum) * seriesNum * deviceNum;
   }
 
-  public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint 
tEndpoint)
+  public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint 
tEndPoint)
       throws TException, IOException {
     return new TLoadResp()
         .setAccepted(true)
         .setStatus(new 
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
-  public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint 
tEndpoint)
+  public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint 
tEndPoint)
       throws LoadFileException, IOException {
     return new TLoadResp()
         .setAccepted(true)
@@ -420,12 +420,12 @@ public class TestBase {
 
   public static String getTestTsFilePath(
       String logicalStorageGroupName,
-      long VirtualStorageGroupId,
-      long TimePartitionId,
+      long virtualStorageGroupId,
+      long timePartitionId,
       long tsFileVersion) {
     String filePath =
         String.format(
-            TEST_TSFILE_PATH, logicalStorageGroupName, VirtualStorageGroupId, 
TimePartitionId);
+            TEST_TSFILE_PATH, logicalStorageGroupName, virtualStorageGroupId, 
timePartitionId);
     return TsFileGeneratorUtils.getTsFilePath(filePath, tsFileVersion);
   }
 }
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
index bd903c45548..a682263e742 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
@@ -60,6 +60,7 @@ import java.util.stream.Collectors;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.MB;
 import static org.junit.Assert.assertEquals;
 
+@SuppressWarnings("java:S2925")
 public class TsFileSplitSenderTest extends TestBase {
 
   private static final Logger logger = 
LoggerFactory.getLogger(TsFileSplitSenderTest.class);
@@ -128,7 +129,7 @@ public class TsFileSplitSenderTest extends TestBase {
     thread.interrupt();
 
     printPhaseResult();
-    long transmissionTime = splitSender.getStatistic().compressedSize.get() / 
nodeThroughput;
+    long transmissionTime = 
splitSender.getStatistic().getCompressedSize().get() / nodeThroughput;
     System.out.printf(
         "Split ends after %dms + %dms (Transmission) = %dms\n",
         timeConsumption, transmissionTime, timeConsumption + transmissionTime);
@@ -139,30 +140,30 @@ public class TsFileSplitSenderTest extends TestBase {
     System.out.printf("Memory usage %dMB\n", maxMemoryUsage.get() / MB);
   }
 
-  public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint 
tEndpoint)
+  public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint 
tEndPoint)
       throws TException, IOException {
-    long handleStart = System.nanoTime();
-    if ((tEndpoint.getPort() - 10000) % 3 == 0
+    final long handleStart = System.nanoTime();
+    if ((tEndPoint.getPort() - 10000) % 3 == 0
         && random.nextDouble() < packetLossRatio
         && req.relayTargets != null) {
       throw new TException("Packet lost");
     }
-    if ((tEndpoint.getPort() - 10000) % 3 == 1
+    if ((tEndPoint.getPort() - 10000) % 3 == 1
         && random.nextDouble() < packetLossRatio / 2
         && req.relayTargets != null) {
       throw new TException("Packet lost");
     }
 
-    if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.relayTargets != null && 
stuckDurationMS > 0) {
+    if ((tEndPoint.getPort() - 10000) % 3 == 0 && req.relayTargets != null && 
stuckDurationMS > 0) {
       Pair<Long, Long> nextStuckTime =
           nextStuckTimeMap.computeIfAbsent(
-              tEndpoint,
+              tEndPoint,
               e ->
                   new Pair<>(
                       System.currentTimeMillis(), System.currentTimeMillis() + 
stuckDurationMS));
       long currTime = System.currentTimeMillis();
       if (currTime >= nextStuckTime.left && currTime < nextStuckTime.right) {
-        logger.debug("Node{} stalls", tEndpoint.getPort() - 10000);
+        logger.debug("Node{} stalls", tEndPoint.getPort() - 10000);
         try {
           Thread.sleep(nextStuckTime.right - currTime);
         } catch (InterruptedException e) {
@@ -170,7 +171,7 @@ public class TsFileSplitSenderTest extends TestBase {
         }
       } else if (currTime > nextStuckTime.right) {
         nextStuckTimeMap.compute(
-            tEndpoint,
+            tEndPoint,
             (endPoint, newInterval) -> {
               if (newInterval != null && currTime < newInterval.right) {
                 return newInterval;
@@ -185,8 +186,6 @@ public class TsFileSplitSenderTest extends TestBase {
     }
 
     long decompressStart = System.nanoTime();
-    ConsensusGroupId groupId =
-        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
     ByteBuffer buf = req.body.slice();
     if (req.isSetCompressionType()) {
       CompressionType compressionType = 
CompressionType.deserialize(req.compressionType);
@@ -203,10 +202,13 @@ public class TsFileSplitSenderTest extends TestBase {
     long deserializeStart = System.nanoTime();
     LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) 
PlanNodeType.deserialize(buf);
     deserializeTime.addAndGet(System.nanoTime() - deserializeStart);
+
+    ConsensusGroupId groupId =
+        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
     Set<Integer> splitIds =
         phaseOneResults
             .computeIfAbsent(
-                tEndpoint,
+                tEndPoint,
                 e -> new 
ConcurrentSkipListMap<>(Comparator.comparingInt(ConsensusGroupId::getId)))
             .computeIfAbsent(groupId, g -> new ConcurrentSkipListMap<>())
             .computeIfAbsent(req.uuid, id -> new ConcurrentSkipListMap<>())
@@ -217,14 +219,14 @@ public class TsFileSplitSenderTest extends TestBase {
             .collect(Collectors.toList()));
 
     if (dummyDelayMS > 0) {
-      if ((tEndpoint.getPort() - 10000) % 3 == 0 && req.relayTargets != null) {
+      if ((tEndPoint.getPort() - 10000) % 3 == 0 && req.relayTargets != null) {
         try {
           Thread.sleep(dummyDelayMS);
         } catch (InterruptedException e) {
           throw new RuntimeException(e);
         }
       }
-      if ((tEndpoint.getPort() - 10000) % 3 == 1 && req.relayTargets != null) {
+      if ((tEndPoint.getPort() - 10000) % 3 == 1 && req.relayTargets != null) {
         try {
           Thread.sleep(dummyDelayMS / 2);
         } catch (InterruptedException e) {
@@ -243,7 +245,7 @@ public class TsFileSplitSenderTest extends TestBase {
           .forEach(
               dataNodeLocation -> {
                 TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
-                if (!otherPoint.equals(tEndpoint)) {
+                if (!otherPoint.equals(tEndPoint)) {
                   try {
                     handleTsFilePieceNode(req, otherPoint);
                   } catch (TException | IOException e) {
@@ -260,12 +262,12 @@ public class TsFileSplitSenderTest extends TestBase {
         .setStatus(new 
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
-  public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint 
tEndpoint) {
+  public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint 
tEndPoint) {
     ConsensusGroupId groupId =
         
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
     phaseTwoResults
         .computeIfAbsent(
-            tEndpoint,
+            tEndPoint,
             e -> new 
ConcurrentSkipListMap<>(Comparator.comparingInt(ConsensusGroupId::getId)))
         .computeIfAbsent(groupId, g -> new ConcurrentSkipListMap<>())
         .computeIfAbsent(req.uuid, id -> req.commandType);
@@ -276,7 +278,7 @@ public class TsFileSplitSenderTest extends TestBase {
       TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
       for (TDataNodeLocation dataNodeLocation : 
regionReplicaSet.getDataNodeLocations()) {
         TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
-        if (!otherPoint.equals(tEndpoint)) {
+        if (!otherPoint.equals(tEndPoint)) {
           handleTsLoadCommand(req, otherPoint);
         }
       }
@@ -290,10 +292,10 @@ public class TsFileSplitSenderTest extends TestBase {
   public void printPhaseResult() {
     System.out.print("Phase one:\n");
     for (Entry<TEndPoint, Map<ConsensusGroupId, Map<String, Map<File, 
Set<Integer>>>>>
-        tEndPointMapEntry : phaseOneResults.entrySet()) {
-      TEndPoint endPoint = tEndPointMapEntry.getKey();
+        endPointMapEntry : phaseOneResults.entrySet()) {
+      TEndPoint endPoint = endPointMapEntry.getKey();
       for (Entry<ConsensusGroupId, Map<String, Map<File, Set<Integer>>>> 
consensusGroupIdMapEntry :
-          tEndPointMapEntry.getValue().entrySet()) {
+          endPointMapEntry.getValue().entrySet()) {
         ConsensusGroupId consensusGroupId = consensusGroupIdMapEntry.getKey();
         for (Entry<String, Map<File, Set<Integer>>> stringMapEntry :
             consensusGroupIdMapEntry.getValue().entrySet()) {
@@ -317,11 +319,11 @@ public class TsFileSplitSenderTest extends TestBase {
     }
 
     System.out.print("Phase two:\n");
-    for (Entry<TEndPoint, Map<ConsensusGroupId, Map<String, Integer>>> 
tEndPointMapEntry :
+    for (Entry<TEndPoint, Map<ConsensusGroupId, Map<String, Integer>>> 
endPointMapEntryValue :
         phaseTwoResults.entrySet()) {
-      TEndPoint endPoint = tEndPointMapEntry.getKey();
+      TEndPoint endPoint = endPointMapEntryValue.getKey();
       for (Entry<ConsensusGroupId, Map<String, Integer>> 
consensusGroupIdMapEntry :
-          tEndPointMapEntry.getValue().entrySet()) {
+          endPointMapEntryValue.getValue().entrySet()) {
         ConsensusGroupId consensusGroupId = consensusGroupIdMapEntry.getKey();
         for (Entry<String, Integer> stringMapEntry :
             consensusGroupIdMapEntry.getValue().entrySet()) {
diff --git 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java
 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java
index c54028d513c..c86d597a734 100644
--- 
a/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java
+++ 
b/iotdb-core/tsfile/src/main/java/org/apache/iotdb/tsfile/utils/TsFileUtils.java
@@ -69,7 +69,7 @@ public class TsFileUtils {
       throws IOException {
     if (uncompressedSize == 0 || type == CompressionType.UNCOMPRESSED) {
       return buffer;
-    } // FIXME if the buffer is not array-implemented.
+    }
     IUnCompressor unCompressor = IUnCompressor.getUnCompressor(type);
     ByteBuffer uncompressedBuffer = ByteBuffer.allocate(uncompressedSize);
     unCompressor.uncompress(

Reply via email to