This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch ty/sonar
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/ty/sonar by this push:
     new 0bb317ebc49 server/src/main/java/org/apache/iotdb/db/mpp/execution 
done except for operator package
0bb317ebc49 is described below

commit 0bb317ebc49a6a3ddf764a8a5fe291bc7675d74a
Author: JackieTien97 <[email protected]>
AuthorDate: Tue Jun 20 17:01:59 2023 +0800

    server/src/main/java/org/apache/iotdb/db/mpp/execution done except for 
operator package
---
 .../iotdb/db/mpp/execution/FutureStateChange.java  |   1 +
 .../iotdb/db/mpp/execution/QueryIdGenerator.java   |   7 +-
 .../iotdb/db/mpp/execution/QueryStateMachine.java  |   1 +
 .../iotdb/db/mpp/execution/StateMachine.java       |  51 ++++---
 .../iotdb/db/mpp/execution/driver/DataDriver.java  |  15 +-
 .../db/mpp/execution/driver/DataDriverContext.java |   2 +-
 .../iotdb/db/mpp/execution/driver/Driver.java      |  23 +--
 .../db/mpp/execution/driver/DriverContext.java     |   1 +
 .../iotdb/db/mpp/execution/driver/IDriver.java     |  16 ++-
 .../db/mpp/execution/driver/SchemaDriver.java      |   1 +
 .../mpp/execution/driver/SchemaDriverContext.java  |   2 +-
 .../mpp/execution/executor/RegionReadExecutor.java |  13 +-
 .../execution/executor/RegionWriteExecutor.java    | 155 ++++++++++++---------
 .../db/mpp/execution/fragment/FragmentInfo.java    |  17 +++
 .../fragment/FragmentInstanceContext.java          |  44 +++---
 .../fragment/FragmentInstanceExecution.java        |  64 +++++----
 .../fragment/FragmentInstanceFailureInfo.java      |  36 +++--
 .../fragment/FragmentInstanceFailureListener.java  |   1 +
 .../execution/fragment/FragmentInstanceInfo.java   |   1 +
 .../fragment/FragmentInstanceManager.java          |   4 +
 .../execution/fragment/FragmentInstanceState.java  |   1 +
 .../fragment/FragmentInstanceStateMachine.java     |   6 +-
 .../db/mpp/execution/fragment/FragmentState.java   |   1 +
 .../timer/RuleBasedTimeSliceAllocator.java         |   2 +-
 24 files changed, 280 insertions(+), 185 deletions(-)

diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java
index 0d33be9b161..a3261925296 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/FutureStateChange.java
@@ -11,6 +11,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution;
 
 import com.google.common.collect.ImmutableSet;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryIdGenerator.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryIdGenerator.java
index 49ad488d08e..c8a17b6d7a4 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryIdGenerator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryIdGenerator.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
@@ -56,9 +57,9 @@ public class QueryIdGenerator {
   private int counter;
 
   public QueryIdGenerator() {
-    int dataNodeId = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
-    checkArgument(dataNodeId != -1, "DataNodeId should be init first!");
-    this.dataNodeId = String.valueOf(dataNodeId);
+    int dataNode = IoTDBDescriptor.getInstance().getConfig().getDataNodeId();
+    checkArgument(dataNode != -1, "DataNodeId should be init first!");
+    this.dataNodeId = String.valueOf(dataNode);
   }
 
   public String getCoordinatorId() {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
index ffc174cbce8..e3b39a1c952 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/QueryStateMachine.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
index c4482b87eb7..f1e94464e40 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/StateMachine.java
@@ -11,12 +11,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.util.concurrent.ListenableFuture;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.GuardedBy;
 import javax.annotation.concurrent.ThreadSafe;
@@ -36,6 +39,12 @@ import static java.util.Objects.requireNonNull;
 
 @ThreadSafe
 public class StateMachine<T> {
+
+  private static final String LOCK_HELD_ERROR_MSG = "Cannot set state while 
holding the lock";
+  private static final String STATE_IS_NULL = "newState is null";
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(StateMachine.class);
+
   private final String name;
   private final Executor executor;
   private final Object lock = new Object();
@@ -110,12 +119,12 @@ public class StateMachine<T> {
    * @return the state before the possible state change
    */
   public T trySet(T newState) {
-    checkState(!Thread.holdsLock(lock), "Cannot set state while holding the 
lock");
-    requireNonNull(newState, "newState is null");
+    checkState(!Thread.holdsLock(lock), LOCK_HELD_ERROR_MSG);
+    requireNonNull(newState, STATE_IS_NULL);
 
     T oldState;
-    FutureStateChange<T> futureStateChange;
-    ImmutableList<StateChangeListener<T>> stateChangeListeners;
+    FutureStateChange<T> oldFutureStateChange;
+    ImmutableList<StateChangeListener<T>> curStateChangeListeners;
     synchronized (lock) {
       if (state.equals(newState) || isTerminalState(state)) {
         return state;
@@ -124,8 +133,8 @@ public class StateMachine<T> {
       oldState = state;
       state = newState;
 
-      futureStateChange = this.futureStateChange.getAndSet(new 
FutureStateChange<>());
-      stateChangeListeners = ImmutableList.copyOf(this.stateChangeListeners);
+      oldFutureStateChange = this.futureStateChange.getAndSet(new 
FutureStateChange<>());
+      curStateChangeListeners = 
ImmutableList.copyOf(this.stateChangeListeners);
 
       // if we are now in a terminal state, free the listeners since this will 
be the last
       // notification
@@ -134,7 +143,7 @@ public class StateMachine<T> {
       }
     }
 
-    fireStateChanged(newState, futureStateChange, stateChangeListeners);
+    fireStateChanged(newState, oldFutureStateChange, curStateChangeListeners);
     return oldState;
   }
 
@@ -145,8 +154,8 @@ public class StateMachine<T> {
    * @return true if the state is set
    */
   public boolean setIf(T newState, Predicate<T> predicate) {
-    checkState(!Thread.holdsLock(lock), "Cannot set state while holding the 
lock");
-    requireNonNull(newState, "newState is null");
+    checkState(!Thread.holdsLock(lock), LOCK_HELD_ERROR_MSG);
+    requireNonNull(newState, STATE_IS_NULL);
 
     while (true) {
       // check if the current state passes the predicate
@@ -176,12 +185,12 @@ public class StateMachine<T> {
    * @return true if the state is set
    */
   public boolean compareAndSet(T expectedState, T newState) {
-    checkState(!Thread.holdsLock(lock), "Cannot set state while holding the 
lock");
+    checkState(!Thread.holdsLock(lock), LOCK_HELD_ERROR_MSG);
     requireNonNull(expectedState, "expectedState is null");
-    requireNonNull(newState, "newState is null");
+    requireNonNull(newState, STATE_IS_NULL);
 
-    FutureStateChange<T> futureStateChange;
-    ImmutableList<StateChangeListener<T>> stateChangeListeners;
+    FutureStateChange<T> oldFutureStateChange;
+    ImmutableList<StateChangeListener<T>> curStateChangeListeners;
     synchronized (lock) {
       if (!state.equals(expectedState)) {
         return false;
@@ -197,8 +206,8 @@ public class StateMachine<T> {
 
       state = newState;
 
-      futureStateChange = this.futureStateChange.getAndSet(new 
FutureStateChange<>());
-      stateChangeListeners = ImmutableList.copyOf(this.stateChangeListeners);
+      oldFutureStateChange = this.futureStateChange.getAndSet(new 
FutureStateChange<>());
+      curStateChangeListeners = 
ImmutableList.copyOf(this.stateChangeListeners);
 
       // if we are now in a terminal state, free the listeners since this will 
be the last
       // notification
@@ -207,16 +216,17 @@ public class StateMachine<T> {
       }
     }
 
-    fireStateChanged(newState, futureStateChange, stateChangeListeners);
+    fireStateChanged(newState, oldFutureStateChange, curStateChangeListeners);
     return true;
   }
 
+  @SuppressWarnings("squid:S1181")
   private void fireStateChanged(
       T newState,
       FutureStateChange<T> futureStateChange,
       List<StateChangeListener<T>> stateChangeListeners) {
     checkState(!Thread.holdsLock(lock), "Cannot fire state change event while 
holding the lock");
-    requireNonNull(newState, "newState is null");
+    requireNonNull(newState, STATE_IS_NULL);
 
     // always fire listener callbacks from a different thread
     safeExecute(
@@ -225,7 +235,7 @@ public class StateMachine<T> {
           try {
             futureStateChange.complete(newState);
           } catch (Throwable e) {
-            //                log.error(e, "Error setting future state for 
%s", name);
+            LOGGER.error("Error setting future state for {}", name, e);
           }
           for (StateChangeListener<T> stateChangeListener : 
stateChangeListeners) {
             fireStateChangedListener(newState, stateChangeListener);
@@ -233,11 +243,12 @@ public class StateMachine<T> {
         });
   }
 
+  @SuppressWarnings("squid:S1181")
   private void fireStateChangedListener(T newState, StateChangeListener<T> 
stateChangeListener) {
     try {
       stateChangeListener.stateChanged(newState);
     } catch (Throwable e) {
-      //            log.error(e, "Error notifying state change listener for 
%s", name);
+      LOGGER.error("Error notifying state change listener for {}", name, e);
     }
   }
 
@@ -303,12 +314,12 @@ public class StateMachine<T> {
     return get().toString();
   }
 
+  @SuppressWarnings("squid:S112")
   private void safeExecute(Runnable command) {
     try {
       executor.execute(command);
     } catch (RejectedExecutionException e) {
       if ((executor instanceof ExecutorService) && ((ExecutorService) 
executor).isShutdown()) {
-        // TODO: (xingtanzjr) handle the exception
         throw new RuntimeException("Server is shutting down", e);
       }
       throw e;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
index c7ff76b8de2..266e4372bbd 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriver.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.driver;
 
 import org.apache.iotdb.db.engine.querycontext.QueryDataSource;
@@ -48,6 +49,7 @@ public class DataDriver extends Driver {
     this.estimatedMemorySize = estimatedMemorySize;
   }
 
+  @SuppressWarnings("squid:S1181")
   @Override
   protected boolean init(SettableFuture<?> blockedFuture) {
     if (!init) {
@@ -65,8 +67,12 @@ public class DataDriver extends Driver {
   }
 
   /**
-   * init seq file list and unseq file list in QueryDataSource and set it into 
each SourceNode TODO
-   * we should change all the blocked lock operation into tryLock
+   * init seq file list and unseq file list in QueryDataSource and set it into 
each SourceNode.
+   *
+   * @throws QueryProcessException while failed to init query resource, 
QueryProcessException will
+   *     be thrown
+   * @throws IllegalStateException if QueryDataSource is null after 
initialization,
+   *     IllegalStateException will be thrown
    */
   private void initialize() throws QueryProcessException {
     long startTime = System.nanoTime();
@@ -107,7 +113,10 @@ public class DataDriver extends Driver {
 
   /**
    * The method is called in mergeLock() when executing query. This method 
will get all the
-   * QueryDataSource needed for this query
+   * QueryDataSource needed for this query.
+   *
+   * @throws QueryProcessException while failed to init query resource, 
QueryProcessException will
+   *     be thrown
    */
   private QueryDataSource initQueryDataSource() throws QueryProcessException {
     return ((DataDriverContext) driverContext).getSharedQueryDataSource();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriverContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriverContext.java
index 0b0ad4332fd..88941ddce56 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriverContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DataDriverContext.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.driver;
 
 import org.apache.iotdb.commons.path.PartialPath;
@@ -28,7 +29,6 @@ import 
org.apache.iotdb.db.mpp.execution.operator.source.DataSourceOperator;
 import java.util.ArrayList;
 import java.util.List;
 
-/** TODO Add javadoc for context */
 public class DataDriverContext extends DriverContext {
 
   private final List<PartialPath> paths;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
index c6c7b73cd13..55e8e94ad50 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/Driver.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.driver;
 
 import org.apache.iotdb.commons.utils.FileUtils;
@@ -58,6 +59,9 @@ import static 
org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.DRIVER_INTE
 public abstract class Driver implements IDriver {
 
   protected static final Logger LOGGER = LoggerFactory.getLogger(Driver.class);
+  protected static final QueryMetricsManager QUERY_METRICS = 
QueryMetricsManager.getInstance();
+  protected static final QueryExecutionMetricSet QUERY_EXECUTION_METRICS =
+      QueryExecutionMetricSet.getInstance();
 
   protected final DriverContext driverContext;
   protected final Operator root;
@@ -67,10 +71,6 @@ public abstract class Driver implements IDriver {
 
   protected final DriverLock exclusiveLock = new DriverLock();
 
-  protected final QueryMetricsManager QUERY_METRICS = 
QueryMetricsManager.getInstance();
-  protected final QueryExecutionMetricSet QUERY_EXECUTION_METRICS =
-      QueryExecutionMetricSet.getInstance();
-
   protected enum State {
     ALIVE,
     NEED_DESTRUCTION,
@@ -104,13 +104,13 @@ public abstract class Driver implements IDriver {
   }
 
   /**
-   * do initialization
+   * do initialization.
    *
    * @return true if init succeed, false otherwise
    */
   protected abstract boolean init(SettableFuture<?> blockedFuture);
 
-  /** release resource this driver used */
+  /** release resource this driver used. */
   protected abstract void releaseResource();
 
   public int getDependencyDriverIndex() {
@@ -194,6 +194,7 @@ public abstract class Driver implements IDriver {
     return sink;
   }
 
+  @SuppressWarnings("squid:S112")
   @GuardedBy("exclusiveLock")
   private boolean isFinishedInternal() {
     checkLockHeld("Lock must be held to call isFinishedInternal");
@@ -214,6 +215,7 @@ public abstract class Driver implements IDriver {
     return finished;
   }
 
+  @SuppressWarnings({"squid:S1181", "squid:S112"})
   private ListenableFuture<?> processInternal() {
     long startTimeNanos = System.nanoTime();
     try {
@@ -261,7 +263,7 @@ public abstract class Driver implements IDriver {
     driverBlockedFuture.set(newDriverBlockedFuture);
     sourceBlockedFuture.addListener(() -> newDriverBlockedFuture.set(null), 
directExecutor());
 
-    // TODO Although we don't have memory management for operator now, we 
should consider it for
+    // Although we don't have memory management for operator now, we should 
consider it for
     // future
     // it's possible that memory revoking is requested for some operator
     // before we update driverBlockedFuture above and we don't want to miss 
that
@@ -340,6 +342,7 @@ public abstract class Driver implements IDriver {
     return result;
   }
 
+  @SuppressWarnings({"squid:S1181", "squid:S112"})
   @GuardedBy("exclusiveLock")
   private void destroyIfNecessary() {
     checkLockHeld("Lock must be held to call destroyIfNecessary");
@@ -372,6 +375,7 @@ public abstract class Driver implements IDriver {
     }
   }
 
+  @SuppressWarnings("squid:S1181")
   private Throwable closeAndDestroyOperators() {
     // record the current interrupted status (and clear the flag); we'll reset 
it later
     boolean wasInterrupted = Thread.interrupted();
@@ -405,7 +409,6 @@ public abstract class Driver implements IDriver {
       // don't record the stack
       wasInterrupted = true;
     } catch (Throwable t) {
-      // TODO currently, we won't know exact operator which is failed in 
closing
       inFlightException =
           addSuppressedException(
               inFlightException,
@@ -431,7 +434,9 @@ public abstract class Driver implements IDriver {
             + driverContext.getPipelineId()
             + File.separator;
     File tmpPipeLineDir = new File(pipeLineSortDir);
-    if (!tmpPipeLineDir.exists()) return;
+    if (!tmpPipeLineDir.exists()) {
+      return;
+    }
     FileUtils.deleteDirectory(tmpPipeLineDir);
   }
 
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
index 001cc5c4ede..0035932f879 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/DriverContext.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.driver;
 
 import org.apache.iotdb.commons.utils.TestOnly;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
index 24a4ef9d5d7..be015d961aa 100644
--- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
+++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/IDriver.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.driver;
 
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
@@ -27,12 +28,12 @@ import io.airlift.units.Duration;
 
 /**
  * IDriver encapsulates some methods which are necessary for 
FragmentInstanceTaskExecutor to run a
- * fragment instance
+ * fragment instance.
  */
 public interface IDriver {
 
   /**
-   * Used to judge whether this IDriver should be scheduled for execution 
anymore
+   * Used to judge whether this IDriver should be scheduled for execution 
anymore.
    *
    * @return true if the IDriver is done or terminated due to failure, 
otherwise false.
    */
@@ -40,14 +41,15 @@ public interface IDriver {
 
   /**
    * run the IDriver for {@param duration} time slice, the time of this run is 
likely not to be
-   * equal to {@param duration}, the actual run time should be calculated by 
the caller
+   * equal to {@param duration}, the actual run time should be calculated by 
the caller.
    *
    * @param duration how long should this IDriver run
-   * @return the returned ListenableFuture<Void> is used to represent status 
of this processing if
+   * @return the returned ListenableFuture is used to represent status of this 
processing if
    *     isDone() return true, meaning that this IDriver is not blocked and is 
ready for next
    *     processing. Otherwise, meaning that this IDriver is blocked and not 
ready for next
    *     processing.
    */
+  @SuppressWarnings("squid:S1452")
   ListenableFuture<?> processFor(Duration duration);
 
   /**
@@ -63,17 +65,17 @@ public interface IDriver {
 
   void setDriverTaskId(DriverTaskId driverTaskId);
 
-  /** clear resource used by this fragment instance */
+  /** clear resource used by this fragment instance. */
   void close();
 
   /**
-   * fail current driver
+   * fail current driver.
    *
    * @param t reason cause this failure
    */
   void failed(Throwable t);
 
-  /** @return get Sink of current IDriver */
+  /** return get Sink of current IDriver. */
   ISink getSink();
 
   DriverContext getDriverContext();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
index 670670de310..5ad7c764d5c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriver.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.driver;
 
 import org.apache.iotdb.db.mpp.execution.operator.Operator;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriverContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriverContext.java
index b93e4101270..fa828d846ba 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriverContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/driver/SchemaDriverContext.java
@@ -16,12 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.driver;
 
 import org.apache.iotdb.db.metadata.schemaregion.ISchemaRegion;
 import org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext;
 
-/** TODO Add javadoc for context */
 public class SchemaDriverContext extends DriverContext {
 
   private final ISchemaRegion schemaRegion;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionReadExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionReadExecutor.java
index 9d15aaa0097..583e522774c 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionReadExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionReadExecutor.java
@@ -37,6 +37,9 @@ public class RegionReadExecutor {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(RegionReadExecutor.class);
 
+  private static final String ERROR_MSG_FORMAT = "Execute FragmentInstance 
failed: %s";
+
+  @SuppressWarnings("squid:S1181")
   public RegionExecutionResult execute(
       ConsensusGroupId groupId, FragmentInstance fragmentInstance) {
     // execute fragment instance in state machine
@@ -59,8 +62,9 @@ public class RegionReadExecutor {
             readResponse.getException());
         resp.setAccepted(false);
         resp.setMessage(
-            "Execute FragmentInstance failed: "
-                + (readResponse.getException() == null
+            String.format(
+                ERROR_MSG_FORMAT,
+                readResponse.getException() == null
                     ? ""
                     : readResponse.getException().getMessage()));
       } else {
@@ -73,11 +77,12 @@ public class RegionReadExecutor {
       LOGGER.error("Execute FragmentInstance in ConsensusGroup {} failed.", 
groupId, t);
       RegionExecutionResult resp = new RegionExecutionResult();
       resp.setAccepted(false);
-      resp.setMessage("Execute FragmentInstance failed: " + t.getMessage());
+      resp.setMessage(String.format(ERROR_MSG_FORMAT, t.getMessage()));
       return resp;
     }
   }
 
+  @SuppressWarnings("squid:S1181")
   public RegionExecutionResult execute(FragmentInstance fragmentInstance) {
     // execute fragment instance in state machine
     try (SetThreadName threadName = new 
SetThreadName(fragmentInstance.getId().getFullId())) {
@@ -93,7 +98,7 @@ public class RegionReadExecutor {
       LOGGER.error("Execute FragmentInstance in QueryExecutor failed.", t);
       RegionExecutionResult resp = new RegionExecutionResult();
       resp.setAccepted(false);
-      resp.setMessage("Execute FragmentInstance failed: " + t.getMessage());
+      resp.setMessage(String.format(ERROR_MSG_FORMAT, t.getMessage()));
       return resp;
     }
   }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
index 8c8793d64b4..66d582d8a61 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/executor/RegionWriteExecutor.java
@@ -88,6 +88,9 @@ public class RegionWriteExecutor {
   private static final PerformanceOverviewMetrics PERFORMANCE_OVERVIEW_METRICS 
=
       PerformanceOverviewMetrics.getInstance();
 
+  private static final String METADATA_ERROR_MSG = "Metadata error: ";
+
+  @SuppressWarnings("squid:S1181")
   public RegionExecutionResult execute(ConsensusGroupId groupId, PlanNode 
planNode) {
     try {
       WritePlanNodeExecutionContext context =
@@ -164,7 +167,6 @@ public class RegionWriteExecutor {
 
       ConsensusWriteResponse writeResponse =
           executePlanNodeInConsensusLayer(context.getRegionId(), node);
-      // TODO need consider more status
       if (writeResponse.getStatus() != null) {
         response.setAccepted(
             TSStatusCode.SUCCESS_STATUS.getStatusCode() == 
writeResponse.getStatus().getCode());
@@ -231,7 +233,6 @@ public class RegionWriteExecutor {
         ConsensusWriteResponse writeResponse =
             fireTriggerAndInsert(context.getRegionId(), insertNode);
 
-        // TODO need consider more status
         if (writeResponse.getStatus() != null) {
           response.setAccepted(
               TSStatusCode.SUCCESS_STATUS.getStatusCode() == 
writeResponse.getStatus().getCode());
@@ -292,7 +293,7 @@ public class RegionWriteExecutor {
             return super.visitCreateTimeSeries(node, context);
           } else {
             MetadataException metadataException = failingMeasurementMap.get(0);
-            LOGGER.error("Metadata error: ", metadataException);
+            LOGGER.error(METADATA_ERROR_MSG, metadataException);
             result = new RegionExecutionResult();
             result.setAccepted(false);
             result.setMessage(metadataException.getMessage());
@@ -330,7 +331,7 @@ public class RegionWriteExecutor {
             return super.visitCreateAlignedTimeSeries(node, context);
           } else {
             MetadataException metadataException = 
failingMeasurementMap.values().iterator().next();
-            LOGGER.error("Metadata error: ", metadataException);
+            LOGGER.error(METADATA_ERROR_MSG, metadataException);
             result = new RegionExecutionResult();
             result.setAccepted(false);
             result.setMessage(metadataException.getMessage());
@@ -368,52 +369,23 @@ public class RegionWriteExecutor {
           List<TSStatus> failingStatus = new ArrayList<>();
           Map<PartialPath, MeasurementGroup> measurementGroupMap = 
node.getMeasurementGroupMap();
           List<PartialPath> emptyDeviceList = new ArrayList<>();
-          for (Map.Entry<PartialPath, MeasurementGroup> entry : 
measurementGroupMap.entrySet()) {
-            Map<Integer, MetadataException> failingMeasurementMap =
-                schemaRegion.checkMeasurementExistence(
-                    entry.getKey(),
-                    entry.getValue().getMeasurements(),
-                    entry.getValue().getAliasList());
-            if (failingMeasurementMap.isEmpty()) {
-              continue;
-            }
-
-            for (Map.Entry<Integer, MetadataException> failingMeasurement :
-                failingMeasurementMap.entrySet()) {
-              LOGGER.error("Metadata error: ", failingMeasurement.getValue());
-              failingStatus.add(
-                  RpcUtils.getStatus(
-                      failingMeasurement.getValue().getErrorCode(),
-                      failingMeasurement.getValue().getMessage()));
-            }
-            
entry.getValue().removeMeasurements(failingMeasurementMap.keySet());
 
-            if (entry.getValue().isEmpty()) {
-              emptyDeviceList.add(entry.getKey());
-            }
-          }
+          checkMeasurementExistence(
+              measurementGroupMap, schemaRegion, failingStatus, 
emptyDeviceList);
 
           for (PartialPath emptyDevice : emptyDeviceList) {
             measurementGroupMap.remove(emptyDevice);
           }
 
-          if (!measurementGroupMap.isEmpty()) {
-            // try registering the rest timeseries
-            RegionExecutionResult executionResult = 
super.visitCreateMultiTimeSeries(node, context);
-            if (failingStatus.isEmpty()) {
-              return executionResult;
-            }
+          RegionExecutionResult failingResult =
+              registerTimeSeries(measurementGroupMap, node, context, 
failingStatus);
 
-            TSStatus executionStatus = executionResult.getStatus();
-            if (executionStatus.getCode() == 
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
-              failingStatus.addAll(executionStatus.getSubStatus());
-            } else if (executionStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-              failingStatus.add(executionStatus);
-            }
+          if (failingResult != null) {
+            return failingResult;
           }
 
           TSStatus status = RpcUtils.getStatus(failingStatus);
-          RegionExecutionResult failingResult = new RegionExecutionResult();
+          failingResult = new RegionExecutionResult();
           failingResult.setAccepted(false);
           failingResult.setMessage(status.getMessage());
           failingResult.setStatus(status);
@@ -426,6 +398,59 @@ public class RegionWriteExecutor {
       }
     }
 
+    private void checkMeasurementExistence(
+        Map<PartialPath, MeasurementGroup> measurementGroupMap,
+        ISchemaRegion schemaRegion,
+        List<TSStatus> failingStatus,
+        List<PartialPath> emptyDeviceList) {
+      for (Map.Entry<PartialPath, MeasurementGroup> entry : 
measurementGroupMap.entrySet()) {
+        Map<Integer, MetadataException> failingMeasurementMap =
+            schemaRegion.checkMeasurementExistence(
+                entry.getKey(),
+                entry.getValue().getMeasurements(),
+                entry.getValue().getAliasList());
+        if (failingMeasurementMap.isEmpty()) {
+          continue;
+        }
+
+        for (Map.Entry<Integer, MetadataException> failingMeasurement :
+            failingMeasurementMap.entrySet()) {
+          LOGGER.error(METADATA_ERROR_MSG, failingMeasurement.getValue());
+          failingStatus.add(
+              RpcUtils.getStatus(
+                  failingMeasurement.getValue().getErrorCode(),
+                  failingMeasurement.getValue().getMessage()));
+        }
+        entry.getValue().removeMeasurements(failingMeasurementMap.keySet());
+
+        if (entry.getValue().isEmpty()) {
+          emptyDeviceList.add(entry.getKey());
+        }
+      }
+    }
+
+    private RegionExecutionResult registerTimeSeries(
+        Map<PartialPath, MeasurementGroup> measurementGroupMap,
+        CreateMultiTimeSeriesNode node,
+        WritePlanNodeExecutionContext context,
+        List<TSStatus> failingStatus) {
+      if (!measurementGroupMap.isEmpty()) {
+        // try registering the rest timeseries
+        RegionExecutionResult executionResult = 
super.visitCreateMultiTimeSeries(node, context);
+        if (failingStatus.isEmpty()) {
+          return executionResult;
+        }
+
+        TSStatus executionStatus = executionResult.getStatus();
+        if (executionStatus.getCode() == 
TSStatusCode.MULTIPLE_ERROR.getStatusCode()) {
+          failingStatus.addAll(executionStatus.getSubStatus());
+        } else if (executionStatus.getCode() != 
TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+          failingStatus.add(executionStatus);
+        }
+      }
+      return null;
+    }
+
     @Override
     public RegionExecutionResult visitInternalCreateTimeSeries(
         InternalCreateTimeSeriesNode node, WritePlanNodeExecutionContext 
context) {
@@ -463,7 +488,7 @@ public class RegionWriteExecutor {
                           ((MeasurementAlreadyExistException) 
metadataException)
                               .getMeasurementPath())));
             } else {
-              LOGGER.warn("Metadata error: ", metadataException);
+              LOGGER.warn(METADATA_ERROR_MSG, metadataException);
               failingStatus.add(
                   RpcUtils.getStatus(
                       metadataException.getErrorCode(), 
metadataException.getMessage()));
@@ -529,7 +554,7 @@ public class RegionWriteExecutor {
                             ((MeasurementAlreadyExistException) 
metadataException)
                                 .getMeasurementPath())));
               } else {
-                LOGGER.warn("Metadata error: ", metadataException);
+                LOGGER.warn(METADATA_ERROR_MSG, metadataException);
                 failingStatus.add(
                     RpcUtils.getStatus(
                         metadataException.getErrorCode(), 
metadataException.getMessage()));
@@ -575,6 +600,31 @@ public class RegionWriteExecutor {
         List<TSStatus> alreadyExistingStatus) {
       TSStatus executionStatus = executionResult.getStatus();
 
+      separateMeasurementAlreadyExistException(
+          failingStatus, executionStatus, alreadyExistingStatus);
+
+      RegionExecutionResult result = new RegionExecutionResult();
+      TSStatus status;
+      if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) {
+        status = RpcUtils.SUCCESS_STATUS;
+        result.setAccepted(true);
+      } else if (failingStatus.isEmpty()) {
+        status = RpcUtils.getStatus(alreadyExistingStatus);
+        result.setAccepted(true);
+      } else {
+        status = RpcUtils.getStatus(failingStatus);
+        result.setAccepted(false);
+      }
+
+      result.setMessage(status.getMessage());
+      result.setStatus(status);
+      return result;
+    }
+
+    private void separateMeasurementAlreadyExistException(
+        List<TSStatus> failingStatus,
+        TSStatus executionStatus,
+        List<TSStatus> alreadyExistingStatus) {
       // separate the measurement_already_exist exception and other exceptions 
process,
       // measurement_already_exist exception is acceptable due to concurrent 
timeseries creation
       if (failingStatus.isEmpty()) {
@@ -599,23 +649,6 @@ public class RegionWriteExecutor {
           failingStatus.add(executionStatus);
         }
       }
-
-      RegionExecutionResult result = new RegionExecutionResult();
-      TSStatus status;
-      if (failingStatus.isEmpty() && alreadyExistingStatus.isEmpty()) {
-        status = RpcUtils.SUCCESS_STATUS;
-        result.setAccepted(true);
-      } else if (failingStatus.isEmpty()) {
-        status = RpcUtils.getStatus(alreadyExistingStatus);
-        result.setAccepted(true);
-      } else {
-        status = RpcUtils.getStatus(failingStatus);
-        result.setAccepted(false);
-      }
-
-      result.setMessage(status.getMessage());
-      result.setStatus(status);
-      return result;
     }
 
     @Override
@@ -781,7 +814,7 @@ public class RegionWriteExecutor {
           // if there is some exception, handle each exception and return 
first of them.
           if (!failingMetadataException.isEmpty()) {
             MetadataException metadataException = 
failingMetadataException.get(0);
-            LOGGER.error("Metadata error: ", metadataException);
+            LOGGER.error(METADATA_ERROR_MSG, metadataException);
             RegionExecutionResult result = new RegionExecutionResult();
             result.setAccepted(false);
             result.setMessage(metadataException.getMessage());
@@ -791,8 +824,6 @@ public class RegionWriteExecutor {
             return result;
           }
           // step 2. make sure all source paths are existed.
-          // TODO: CRTODO use a more efficient method
-          //                List<PartialPath> sourcePaths = 
node.getAllTimeSeriesPathInSource();
           return super.visitCreateLogicalView(node, context);
         } finally {
           context.getRegionWriteValidationRWLock().writeLock().unlock();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInfo.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInfo.java
index 064897f9c08..babc711b68e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInfo.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInfo.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.fragment;
 
 import org.apache.iotdb.db.mpp.common.PlanFragmentId;
@@ -41,4 +42,20 @@ public class FragmentInfo {
     this.plan = plan;
     this.childrenFragments = childrenFragments;
   }
+
+  public PlanFragmentId getStageId() {
+    return stageId;
+  }
+
+  public FragmentState getState() {
+    return state;
+  }
+
+  public PlanFragment getPlan() {
+    return plan;
+  }
+
+  public List<FragmentInfo> getChildrenFragments() {
+    return childrenFragments;
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
index 6434e8faa06..6549d2501f6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceContext.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.fragment;
 
 import org.apache.iotdb.commons.path.PartialPath;
@@ -59,15 +60,13 @@ public class FragmentInstanceContext extends QueryContext {
   private List<PartialPath> sourcePaths;
   // Shared by all scan operators in this fragment instance to avoid memory 
problem
   private QueryDataSource sharedQueryDataSource;
-  /** closed tsfile used in this fragment instance */
+  /** closed tsfile used in this fragment instance. */
   private Set<TsFileResource> closedFilePaths;
-  /** unClosed tsfile used in this fragment instance */
+  /** unClosed tsfile used in this fragment instance. */
   private Set<TsFileResource> unClosedFilePaths;
-  /** check if there is tmp file to be deleted */
+  /** check if there is tmp file to be deleted. */
   private boolean mayHaveTmpFile = false;
 
-  private final long createNanos = System.nanoTime();
-
   private final AtomicLong startNanos = new AtomicLong();
   private final AtomicLong endNanos = new AtomicLong();
 
@@ -80,14 +79,6 @@ public class FragmentInstanceContext extends QueryContext {
   // session info
   private SessionInfo sessionInfo;
 
-  //    private final GcMonitor gcMonitor;
-  //    private final AtomicLong startNanos = new AtomicLong();
-  //    private final AtomicLong startFullGcCount = new AtomicLong(-1);
-  //    private final AtomicLong startFullGcTimeNanos = new AtomicLong(-1);
-  //    private final AtomicLong endNanos = new AtomicLong();
-  //    private final AtomicLong endFullGcCount = new AtomicLong(-1);
-  //    private final AtomicLong endFullGcTimeNanos = new AtomicLong(-1);
-
   public static FragmentInstanceContext createFragmentInstanceContext(
       FragmentInstanceId id, FragmentInstanceStateMachine stateMachine, 
SessionInfo sessionInfo) {
     FragmentInstanceContext instanceContext =
@@ -114,6 +105,17 @@ public class FragmentInstanceContext extends QueryContext {
     return new FragmentInstanceContext(queryId);
   }
 
+  @TestOnly
+  public static FragmentInstanceContext createFragmentInstanceContext(
+      FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
+    FragmentInstanceContext instanceContext =
+        new FragmentInstanceContext(
+            id, stateMachine, new SessionInfo(1, "test", 
ZoneId.systemDefault().getId()));
+    instanceContext.initialize();
+    instanceContext.start();
+    return instanceContext;
+  }
+
   private FragmentInstanceContext(
       FragmentInstanceId id,
       FragmentInstanceStateMachine stateMachine,
@@ -136,17 +138,6 @@ public class FragmentInstanceContext extends QueryContext {
     this.sessionInfo = sessionInfo;
   }
 
-  @TestOnly
-  public static FragmentInstanceContext createFragmentInstanceContext(
-      FragmentInstanceId id, FragmentInstanceStateMachine stateMachine) {
-    FragmentInstanceContext instanceContext =
-        new FragmentInstanceContext(
-            id, stateMachine, new SessionInfo(1, "test", 
ZoneId.systemDefault().getId()));
-    instanceContext.initialize();
-    instanceContext.start();
-    return instanceContext;
-  }
-
   @TestOnly
   public void setDataRegion(IDataRegionForQuery dataRegion) {
     this.dataRegion = dataRegion;
@@ -201,7 +192,7 @@ public class FragmentInstanceContext extends QueryContext {
     stateMachine.failed(cause);
   }
 
-  /** @return Message string of all failures */
+  /** return Message string of all failures */
   public String getFailedCause() {
     return stateMachine.getFailureCauses().stream()
         .findFirst()
@@ -209,7 +200,7 @@ public class FragmentInstanceContext extends QueryContext {
         .orElse("");
   }
 
-  /** @return List of specific throwable and stack trace */
+  /** return List of specific throwable and stack trace */
   public List<FragmentInstanceFailureInfo> getFailureInfoList() {
     return stateMachine.getFailureCauses().stream()
         .map(FragmentInstanceFailureInfo::toFragmentInstanceFailureInfo)
@@ -364,6 +355,7 @@ public class FragmentInstanceContext extends QueryContext {
     allDriversClosed.countDown();
   }
 
+  @SuppressWarnings("squid:S2142")
   public void releaseResourceWhenAllDriversAreClosed() {
     while (true) {
       try {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
index 04f6043bdc7..1bcc32d076e 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceExecution.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.fragment;
 
 import org.apache.iotdb.commons.utils.FileUtils;
@@ -55,8 +56,7 @@ public class FragmentInstanceExecution {
 
   private final long timeoutInMs;
 
-  private long lastHeartbeat;
-
+  @SuppressWarnings("squid:S107")
   public static FragmentInstanceExecution createFragmentInstanceExecution(
       IDriverScheduler scheduler,
       FragmentInstanceId instanceId,
@@ -90,14 +90,6 @@ public class FragmentInstanceExecution {
     this.timeoutInMs = timeoutInMs;
   }
 
-  public void recordHeartbeat() {
-    lastHeartbeat = System.currentTimeMillis();
-  }
-
-  public void setLastHeartbeat(long lastHeartbeat) {
-    this.lastHeartbeat = lastHeartbeat;
-  }
-
   public FragmentInstanceState getInstanceState() {
     return stateMachine.getState();
   }
@@ -123,6 +115,7 @@ public class FragmentInstanceExecution {
   }
 
   // this is a separate method to ensure that the `this` reference is not 
leaked during construction
+  @SuppressWarnings("squid:S1181")
   private void initialize(CounterStat failedInstances, IDriverScheduler 
scheduler) {
     requireNonNull(failedInstances, "failedInstances is null");
     stateMachine.addStateChangeListener(
@@ -141,39 +134,28 @@ public class FragmentInstanceExecution {
               failedInstances.update(1);
             }
 
-            if (newState.isFailed()) {
-              sink.abort();
-            } else {
-              sink.close();
-            }
-            // help for gc
-            sink = null;
+            clearShuffleSinkHandle(newState);
 
             // delete tmp file if exists
-            if (context.mayHaveTmpFile()) {
-              String tmpFilePath =
-                  IoTDBDescriptor.getInstance().getConfig().getSortTmpDir()
-                      + File.separator
-                      + context.getId().getFullId()
-                      + File.separator;
-              File tmpFile = new File(tmpFilePath);
-              if (tmpFile.exists()) {
-                FileUtils.deleteDirectory(tmpFile);
-              }
-            }
+            deleteTmpFile();
 
             // close the driver after sink is aborted or closed because in 
driver.close() it
             // will try to call ISink.setNoMoreTsBlocks()
             for (IDriver driver : drivers) {
               driver.close();
             }
-            context.releaseResourceWhenAllDriversAreClosed();
             // help for gc
             drivers = null;
+
+            // release file handlers
+            context.releaseResourceWhenAllDriversAreClosed();
+
+            // release memory
             MPPDataExchangeService.getInstance()
                 .getMPPDataExchangeManager()
                 .deRegisterFragmentInstanceFromMemoryPool(
                     instanceId.getQueryId().getId(), 
instanceId.getFragmentInstanceId());
+
             if (newState.isFailed()) {
               scheduler.abortFragmentInstance(instanceId);
             }
@@ -185,4 +167,28 @@ public class FragmentInstanceExecution {
           }
         });
   }
+
+  private void clearShuffleSinkHandle(FragmentInstanceState newState) {
+    if (newState.isFailed()) {
+      sink.abort();
+    } else {
+      sink.close();
+    }
+    // help for gc
+    sink = null;
+  }
+
+  private void deleteTmpFile() {
+    if (context.mayHaveTmpFile()) {
+      String tmpFilePath =
+          IoTDBDescriptor.getInstance().getConfig().getSortTmpDir()
+              + File.separator
+              + context.getId().getFullId()
+              + File.separator;
+      File tmpFile = new File(tmpFilePath);
+      if (tmpFile.exists()) {
+        FileUtils.deleteDirectory(tmpFile);
+      }
+    }
+  }
 }
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceFailureInfo.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceFailureInfo.java
index 3bae6edbe4f..349beb06b6a 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceFailureInfo.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceFailureInfo.java
@@ -41,8 +41,9 @@ import static java.util.Objects.requireNonNull;
 
 /**
  * This class is inspired by Trino <a
- * 
href="https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/ExecutionFailureInfo.java";>...</a>
+ * 
href="https://github.com/trinodb/trino/blob/master/core/trino-main/src/main/java/io/trino/execution/ExecutionFailureInfo.java";>...</a>.
  */
+@SuppressWarnings("squid:S5852")
 public class FragmentInstanceFailureInfo implements Serializable {
   private static final Pattern STACK_TRACE_PATTERN =
       Pattern.compile("(.*)\\.(.*)\\(([^:]*)(?::(.*))?\\)");
@@ -85,19 +86,6 @@ public class FragmentInstanceFailureInfo implements 
Serializable {
     return toException(this);
   }
 
-  public static FragmentInstanceFailureInfo 
toFragmentInstanceFailureInfo(Throwable throwable) {
-    if (throwable == null) {
-      return null;
-    }
-    return new FragmentInstanceFailureInfo(
-        throwable.getMessage(),
-        toFragmentInstanceFailureInfo(throwable.getCause()),
-        Arrays.stream(throwable.getSuppressed())
-            .map(FragmentInstanceFailureInfo::toFragmentInstanceFailureInfo)
-            .collect(Collectors.toList()),
-        
Arrays.stream(throwable.getStackTrace()).map(Objects::toString).collect(toImmutableList()));
-  }
-
   private static FailureException toException(FragmentInstanceFailureInfo 
failureInfo) {
     if (failureInfo == null) {
       return null;
@@ -116,6 +104,19 @@ public class FragmentInstanceFailureInfo implements 
Serializable {
     return failure;
   }
 
+  public static FragmentInstanceFailureInfo 
toFragmentInstanceFailureInfo(Throwable throwable) {
+    if (throwable == null) {
+      return null;
+    }
+    return new FragmentInstanceFailureInfo(
+        throwable.getMessage(),
+        toFragmentInstanceFailureInfo(throwable.getCause()),
+        Arrays.stream(throwable.getSuppressed())
+            .map(FragmentInstanceFailureInfo::toFragmentInstanceFailureInfo)
+            .collect(Collectors.toList()),
+        
Arrays.stream(throwable.getStackTrace()).map(Objects::toString).collect(toImmutableList()));
+  }
+
   public static StackTraceElement toStackTraceElement(String stack) {
     Matcher matcher = STACK_TRACE_PATTERN.matcher(stack);
     if (matcher.matches()) {
@@ -162,7 +163,7 @@ public class FragmentInstanceFailureInfo implements 
Serializable {
   }
 
   public static FragmentInstanceFailureInfo deserialize(ByteBuffer byteBuffer) 
{
-    String message = ReadWriteIOUtils.readString(byteBuffer);
+    final String message = ReadWriteIOUtils.readString(byteBuffer);
     FragmentInstanceFailureInfo cause;
     List<FragmentInstanceFailureInfo> suppressed = new ArrayList<>();
     List<String> stack = new ArrayList<>();
@@ -185,6 +186,11 @@ public class FragmentInstanceFailureInfo implements 
Serializable {
 
   // end region
 
+  @Override
+  public int hashCode() {
+    return Objects.hash(message, cause, suppressed, stack);
+  }
+
   @Override
   public boolean equals(Object o) {
     if (this == o) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceFailureListener.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceFailureListener.java
index a34f57e66b2..2f041b23b97 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceFailureListener.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceFailureListener.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.fragment;
 
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
index 97fa83b7eef..26d88c886ab 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceInfo.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.fragment;
 
 import org.apache.iotdb.consensus.common.DataSet;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
index 74e2985207d..c983334e247 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceManager.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.fragment;
 
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
@@ -54,6 +55,7 @@ import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceContext
 import static 
org.apache.iotdb.db.mpp.execution.fragment.FragmentInstanceExecution.createFragmentInstanceExecution;
 import static 
org.apache.iotdb.db.mpp.metric.QueryExecutionMetricSet.LOCAL_EXECUTION_PLANNER;
 
+@SuppressWarnings("squid:S6548")
 public class FragmentInstanceManager {
 
   private static final Logger logger = 
LoggerFactory.getLogger(FragmentInstanceManager.class);
@@ -115,6 +117,7 @@ public class FragmentInstanceManager {
     return instanceExecution.size();
   }
 
+  @SuppressWarnings("squid:S1181")
   public FragmentInstanceInfo execDataQueryFragmentInstance(
       FragmentInstance instance, IDataRegionForQuery dataRegion) {
     long startTime = System.nanoTime();
@@ -186,6 +189,7 @@ public class FragmentInstanceManager {
     }
   }
 
+  @SuppressWarnings("squid:S1181")
   public FragmentInstanceInfo execSchemaQueryFragmentInstance(
       FragmentInstance instance, ISchemaRegion schemaRegion) {
     FragmentInstanceId instanceId = instance.getId();
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceState.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceState.java
index c8f94f143ae..b931ce549c2 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceState.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceState.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.fragment;
 
 import java.util.Set;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java
index 2071fcc8b25..2c415c5c59d 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentInstanceStateMachine.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.fragment;
 
 import org.apache.iotdb.db.mpp.common.FragmentInstanceId;
@@ -159,10 +160,7 @@ public class FragmentInstanceStateMachine {
       sourceInstanceFailureListeners.add(listener);
       failures = ImmutableMap.copyOf(sourceInstanceFailures);
     }
-    executor.execute(
-        () -> {
-          failures.forEach(listener::onTaskFailed);
-        });
+    executor.execute(() -> failures.forEach(listener::onTaskFailed));
   }
 
   public void sourceTaskFailed(FragmentInstanceId instanceId, Throwable 
failure) {
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentState.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentState.java
index a3382fa0415..bd803ebe1ff 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentState.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/fragment/FragmentState.java
@@ -16,6 +16,7 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.iotdb.db.mpp.execution.fragment;
 
 import java.util.Set;
diff --git 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/timer/RuleBasedTimeSliceAllocator.java
 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/timer/RuleBasedTimeSliceAllocator.java
index 910b2295bc2..ccc5a22dad6 100644
--- 
a/server/src/main/java/org/apache/iotdb/db/mpp/execution/timer/RuleBasedTimeSliceAllocator.java
+++ 
b/server/src/main/java/org/apache/iotdb/db/mpp/execution/timer/RuleBasedTimeSliceAllocator.java
@@ -32,7 +32,7 @@ import static com.google.common.base.Preconditions.checkState;
 
 public class RuleBasedTimeSliceAllocator implements ITimeSliceAllocator {
 
-  private final long EXECUTION_TIME_SLICE_IN_MS =
+  private static final long EXECUTION_TIME_SLICE_IN_MS =
       DriverTaskThread.EXECUTION_TIME_SLICE.roundTo(TimeUnit.MILLISECONDS);
 
   private final Map<OperatorContext, Integer> operatorToWeightMap;

Reply via email to