This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new d7c05fa432d [to dev/1.3] Parallel disptach read fragment instance
d7c05fa432d is described below

commit d7c05fa432d472d1219c4135bc4b7bc4e727b4a1
Author: shuwenwei <[email protected]>
AuthorDate: Tue Jul 1 09:45:11 2025 +0800

    [to dev/1.3] Parallel disptach read fragment instance
---
 .../iotdb/db/queryengine/plan/Coordinator.java     |  13 +++
 .../queryengine/plan/planner/TreeModelPlanner.java |   2 +-
 .../SimpleFragmentParallelPlanner.java             |   1 +
 .../plan/planner/plan/PlanFragment.java            |   9 ++
 .../plan/scheduler/ClusterScheduler.java           |  11 ++-
 .../scheduler/FragmentInstanceDispatcherImpl.java  | 101 ++++++++++++++++++++-
 .../plan/scheduler/IFragInstanceDispatcher.java    |   6 +-
 .../scheduler/load/LoadTsFileDispatcherImpl.java   |   4 +-
 .../plan/scheduler/load/LoadTsFileScheduler.java   |   2 +-
 .../commons/concurrent/IoTDBThreadPoolFactory.java |  28 ++++++
 .../iotdb/commons/concurrent/ThreadName.java       |   1 +
 11 files changed, 165 insertions(+), 13 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 8561a6e4188..ed76b837e80 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -57,6 +57,7 @@ import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.function.BiFunction;
 
 import static org.apache.iotdb.commons.utils.StatusUtils.needRetry;
@@ -95,6 +96,7 @@ public class Coordinator {
   private final ExecutorService executor;
   private final ExecutorService writeOperationExecutor;
   private final ScheduledExecutorService scheduledExecutor;
+  private final ExecutorService dispatchExecutor;
 
   private final QueryIdGenerator queryIdGenerator =
       new 
QueryIdGenerator(IoTDBDescriptor.getInstance().getConfig().getDataNodeId());
@@ -108,6 +110,13 @@ public class Coordinator {
     this.executor = getQueryExecutor();
     this.writeOperationExecutor = getWriteExecutor();
     this.scheduledExecutor = getScheduledExecutor();
+    int dispatchThreadNum = Math.max(20, 
Runtime.getRuntime().availableProcessors() * 2);
+    this.dispatchExecutor =
+        IoTDBThreadPoolFactory.newCachedThreadPool(
+            ThreadName.FRAGMENT_INSTANCE_DISPATCH.getName(),
+            dispatchThreadNum,
+            dispatchThreadNum,
+            new ThreadPoolExecutor.CallerRunsPolicy());
   }
 
   private ExecutionResult execution(
@@ -318,4 +327,8 @@ public class Coordinator {
     }
     return -1L;
   }
+
+  public ExecutorService getDispatchExecutor() {
+    return dispatchExecutor;
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
index 843374ec49c..6ec87d08918 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
@@ -132,7 +132,7 @@ public class TreeModelPlanner implements IPlanner {
           new ClusterScheduler(
               context,
               stateMachine,
-              distributedPlan.getInstances(),
+              distributedPlan,
               context.getQueryType(),
               executor,
               writeOperationExecutor,
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 484a558f2f4..d9e91787ad4 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -191,6 +191,7 @@ public class SimpleFragmentParallelPlanner implements 
IFragmentParallelPlaner {
       
fragmentInstance.getFragment().generateTypeProvider(queryContext.getTypeProvider());
     }
     instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
+    fragment.setIndexInFragmentInstanceList(fragmentInstanceList.size());
     fragmentInstanceList.add(fragmentInstance);
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
index 3b32f2932cb..bf3d06b4186 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
@@ -49,6 +49,7 @@ public class PlanFragment {
 
   // indicate whether this PlanFragment is the root of the whole 
Fragment-Plan-Tree or not
   private boolean isRoot;
+  private int indexInFragmentInstanceList = -1;
 
   public PlanFragment(PlanFragmentId id, PlanNode planNodeTree) {
     this.id = id;
@@ -56,6 +57,14 @@ public class PlanFragment {
     this.isRoot = false;
   }
 
+  public int getIndexInFragmentInstanceList() {
+    return indexInFragmentInstanceList;
+  }
+
+  public void setIndexInFragmentInstanceList(int indexInFragmentInstanceList) {
+    this.indexInFragmentInstanceList = indexInFragmentInstanceList;
+  }
+
   public PlanFragmentId getId() {
     return id;
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
index 448f0829b5a..33f1bd3a194 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
@@ -28,7 +28,9 @@ import 
org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
 import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInfo;
 import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
 import org.apache.iotdb.rpc.TSStatusCode;
 
 import io.airlift.units.Duration;
@@ -56,6 +58,7 @@ public class ClusterScheduler implements IScheduler {
   // The stateMachine of the QueryExecution owned by this QueryScheduler
   private final QueryStateMachine stateMachine;
   private final QueryType queryType;
+  private final SubPlan rootSubPlan;
   // The fragment instances which should be sent to corresponding Nodes.
   private final List<FragmentInstance> instances;
 
@@ -69,7 +72,7 @@ public class ClusterScheduler implements IScheduler {
   public ClusterScheduler(
       MPPQueryContext queryContext,
       QueryStateMachine stateMachine,
-      List<FragmentInstance> instances,
+      DistributedQueryPlan distributedQueryPlan,
       QueryType queryType,
       ExecutorService executor,
       ExecutorService writeOperationExecutor,
@@ -78,7 +81,8 @@ public class ClusterScheduler implements IScheduler {
       IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
           asyncInternalServiceClientManager) {
     this.stateMachine = stateMachine;
-    this.instances = instances;
+    this.rootSubPlan = distributedQueryPlan.getRootSubPlan();
+    this.instances = distributedQueryPlan.getInstances();
     this.queryType = queryType;
     this.dispatcher =
         new FragmentInstanceDispatcherImpl(
@@ -114,7 +118,8 @@ public class ClusterScheduler implements IScheduler {
   public void start() {
     stateMachine.transitionToDispatching();
     long startTime = System.nanoTime();
-    Future<FragInstanceDispatchResult> dispatchResultFuture = 
dispatcher.dispatch(instances);
+    Future<FragInstanceDispatchResult> dispatchResultFuture =
+        dispatcher.dispatch(rootSubPlan, instances);
 
     // NOTICE: the FragmentInstance may be dispatched to another Host due to 
consensus redirect.
     // So we need to start the state fetcher after the dispatching stage.
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 8dbfc148bb6..fce62782abc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -38,8 +38,10 @@ import 
org.apache.iotdb.db.queryengine.execution.executor.RegionExecutionResult;
 import org.apache.iotdb.db.queryengine.execution.executor.RegionReadExecutor;
 import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor;
 import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
 import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.utils.SetThreadName;
 import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
@@ -54,6 +56,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
 import org.apache.thrift.TException;
+import org.apache.tsfile.utils.Pair;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -62,6 +65,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
 import static com.google.common.util.concurrent.Futures.immediateFuture;
@@ -113,17 +117,104 @@ public class FragmentInstanceDispatcherImpl implements 
IFragInstanceDispatcher {
   }
 
   @Override
-  public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> 
instances) {
+  public Future<FragInstanceDispatchResult> dispatch(
+      SubPlan root, List<FragmentInstance> instances) {
     if (type == QueryType.READ) {
-      return dispatchRead(instances);
+      return instances.size() == 1 || root == null
+          ? dispatchRead(instances)
+          : topologicalParallelDispatchRead(root, instances);
     } else {
       return dispatchWriteAsync(instances);
     }
   }
 
-  // TODO: (xingtanzjr) currently we use a sequential dispatch policy for 
READ, which is
-  //  unsafe for current FragmentInstance scheduler framework. We need to 
implement the
-  //  topological dispatch according to dependency relations between 
FragmentInstances
+  private Future<FragInstanceDispatchResult> topologicalParallelDispatchRead(
+      SubPlan root, List<FragmentInstance> instances) {
+    long startTime = System.nanoTime();
+    LinkedBlockingQueue<Pair<SubPlan, Boolean>> queue = new 
LinkedBlockingQueue<>(instances.size());
+    List<Future<FragInstanceDispatchResult>> futures = new 
ArrayList<>(instances.size());
+    queue.add(new Pair<>(root, true));
+    try {
+      while (futures.size() < instances.size()) {
+        Pair<SubPlan, Boolean> pair = queue.take();
+        SubPlan next = pair.getLeft();
+        boolean previousSuccess = pair.getRight();
+        if (!previousSuccess) {
+          break;
+        }
+        FragmentInstance fragmentInstance =
+            
instances.get(next.getPlanFragment().getIndexInFragmentInstanceList());
+        futures.add(asyncDispatchOneInstance(next, fragmentInstance, queue));
+      }
+      for (Future<FragInstanceDispatchResult> future : futures) {
+        FragInstanceDispatchResult result = future.get();
+        if (!result.isSuccessful()) {
+          return immediateFuture(result);
+        }
+      }
+    } catch (InterruptedException e) {
+      Thread.currentThread().interrupt();
+      LOGGER.warn("Interrupted when dispatching read async", e);
+      return immediateFuture(
+          new FragInstanceDispatchResult(
+              RpcUtils.getStatus(
+                  TSStatusCode.INTERNAL_SERVER_ERROR, "Interrupted errors: " + 
e.getMessage())));
+    } catch (Throwable t) {
+      LOGGER.warn(DISPATCH_FAILED, t);
+      return immediateFuture(
+          new FragInstanceDispatchResult(
+              RpcUtils.getStatus(
+                  TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS + 
t.getMessage())));
+    } finally {
+      long queryDispatchReadTime = System.nanoTime() - startTime;
+      QUERY_EXECUTION_METRIC_SET.recordExecutionCost(DISPATCH_READ, 
queryDispatchReadTime);
+      queryContext.recordDispatchCost(queryDispatchReadTime);
+    }
+    return immediateFuture(new FragInstanceDispatchResult(true));
+  }
+
+  private Future<FragInstanceDispatchResult> asyncDispatchOneInstance(
+      SubPlan plan, FragmentInstance instance, 
LinkedBlockingQueue<Pair<SubPlan, Boolean>> queue) {
+    return Coordinator.getInstance()
+        .getDispatchExecutor()
+        .submit(
+            () -> {
+              boolean success = false;
+              try (SetThreadName threadName = new 
SetThreadName(instance.getId().getFullId())) {
+                dispatchOneInstance(instance);
+                success = true;
+              } catch (FragmentInstanceDispatchException e) {
+                return new FragInstanceDispatchResult(e.getFailureStatus());
+              } catch (Throwable t) {
+                LOGGER.warn(DISPATCH_FAILED, t);
+                return new FragInstanceDispatchResult(
+                    RpcUtils.getStatus(
+                        TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS 
+ t.getMessage()));
+              } finally {
+                if (!success) {
+                  // In case of failure, only one notification needs to be 
sent to the outer layer.
+                  // If the failure is not notified and the child FI is not 
sent, the outer loop
+                  // won't be able to exit.
+                  queue.add(new Pair<>(null, false));
+                } else {
+                  for (SubPlan child : plan.getChildren()) {
+                    queue.add(new Pair<>(child, true));
+                  }
+                }
+                // friendly for gc, clear the plan node tree, for some queries 
select all devices,
+                // it will release lots of memory
+                if (!queryContext.isExplainAnalyze()) {
+                  // EXPLAIN ANALYZE will use these instances, so we can't 
clear them
+                  instance.getFragment().clearUselessField();
+                } else {
+                  // TypeProvider is not used in EXPLAIN ANALYZE, so we can 
clear it
+                  instance.getFragment().clearTypeProvider();
+                }
+              }
+              return new FragInstanceDispatchResult(true);
+            });
+  }
+
   private Future<FragInstanceDispatchResult> 
dispatchRead(List<FragmentInstance> instances) {
     long startTime = System.nanoTime();
     for (FragmentInstance instance : instances) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/IFragInstanceDispatcher.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/IFragInstanceDispatcher.java
index 97363e44097..5552063a5d7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/IFragInstanceDispatcher.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/IFragInstanceDispatcher.java
@@ -20,6 +20,7 @@
 package org.apache.iotdb.db.queryengine.plan.scheduler;
 
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
 
 import java.util.List;
 import java.util.concurrent.Future;
@@ -28,10 +29,11 @@ public interface IFragInstanceDispatcher {
   /**
    * Dispatch all Fragment instances asynchronously
    *
+   * @param root the root SubPlan
    * @param instances Fragment instance list
-   * @return Boolean.
+   * @return Future<FragInstanceDispatchResult>
    */
-  Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> 
instances);
+  Future<FragInstanceDispatchResult> dispatch(SubPlan root, 
List<FragmentInstance> instances);
 
   void abort();
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 28660fd0e5d..41b003c5b77 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.exception.load.LoadFileException;
 import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
 import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode;
@@ -102,7 +103,8 @@ public class LoadTsFileDispatcherImpl implements 
IFragInstanceDispatcher {
   }
 
   @Override
-  public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance> 
instances) {
+  public Future<FragInstanceDispatchResult> dispatch(
+      SubPlan root, List<FragmentInstance> instances) {
     return executor.submit(
         () -> {
           for (FragmentInstance instance : instances) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index fa24b64fc40..59aa57ffd4e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -339,7 +339,7 @@ public class LoadTsFileScheduler implements IScheduler {
             queryContext.getSession());
     instance.setExecutorAndHost(new StorageExecutor(replicaSet));
     Future<FragInstanceDispatchResult> dispatchResultFuture =
-        dispatcher.dispatch(Collections.singletonList(instance));
+        dispatcher.dispatch(null, Collections.singletonList(instance));
 
     try {
       FragInstanceDispatchResult result =
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
index 1cf5488d917..6234705ed2d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
@@ -160,6 +160,34 @@ public class IoTDBThreadPoolFactory {
         poolName);
   }
 
+  /**
+   * see {@link 
Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory)}.
+   *
+   * @param poolName the name of thread pool.
+   * @param corePoolSize the corePoolSize of thread pool
+   * @param maximumPoolSize the maximumPoolSize of thread pool
+   * @param rejectedExecutionHandler the reject handler
+   * @return thread pool.
+   */
+  public static ExecutorService newCachedThreadPool(
+      String poolName,
+      int corePoolSize,
+      int maximumPoolSize,
+      RejectedExecutionHandler rejectedExecutionHandler) {
+    logger.info(NEW_CACHED_THREAD_POOL_LOGGER_FORMAT, poolName);
+    WrappedThreadPoolExecutor executor =
+        new WrappedThreadPoolExecutor(
+            corePoolSize,
+            maximumPoolSize,
+            60L,
+            TimeUnit.SECONDS,
+            new SynchronousQueue<>(),
+            new IoTThreadFactory(poolName),
+            poolName);
+    executor.setRejectedExecutionHandler(rejectedExecutionHandler);
+    return executor;
+  }
+
   /**
    * see {@link 
Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory)}.
    *
diff --git 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 4cdb24cd328..19aff15004d 100644
--- 
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++ 
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -33,6 +33,7 @@ public enum ThreadName {
   TIMED_QUERY_SQL_COUNT("Timed-Query-SQL-Count"),
   FRAGMENT_INSTANCE_MANAGEMENT("Fragment-Instance-Management"),
   FRAGMENT_INSTANCE_NOTIFICATION("Fragment-Instance-Notification"),
+  FRAGMENT_INSTANCE_DISPATCH("Fragment-Instance-Dispatch"),
   DRIVER_TASK_SCHEDULER_NOTIFICATION("Driver-Task-Scheduler-Notification"),
   // -------------------------- MPP --------------------------
   MPP_COORDINATOR_SCHEDULED_EXECUTOR("MPP-Coordinator-Scheduled-Executor"),

Reply via email to