This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new d7c05fa432d [to dev/1.3] Parallel disptach read fragment instance
d7c05fa432d is described below
commit d7c05fa432d472d1219c4135bc4b7bc4e727b4a1
Author: shuwenwei <[email protected]>
AuthorDate: Tue Jul 1 09:45:11 2025 +0800
[to dev/1.3] Parallel disptach read fragment instance
---
.../iotdb/db/queryengine/plan/Coordinator.java | 13 +++
.../queryengine/plan/planner/TreeModelPlanner.java | 2 +-
.../SimpleFragmentParallelPlanner.java | 1 +
.../plan/planner/plan/PlanFragment.java | 9 ++
.../plan/scheduler/ClusterScheduler.java | 11 ++-
.../scheduler/FragmentInstanceDispatcherImpl.java | 101 ++++++++++++++++++++-
.../plan/scheduler/IFragInstanceDispatcher.java | 6 +-
.../scheduler/load/LoadTsFileDispatcherImpl.java | 4 +-
.../plan/scheduler/load/LoadTsFileScheduler.java | 2 +-
.../commons/concurrent/IoTDBThreadPoolFactory.java | 28 ++++++
.../iotdb/commons/concurrent/ThreadName.java | 1 +
11 files changed, 165 insertions(+), 13 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
index 8561a6e4188..ed76b837e80 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/Coordinator.java
@@ -57,6 +57,7 @@ import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
import java.util.function.BiFunction;
import static org.apache.iotdb.commons.utils.StatusUtils.needRetry;
@@ -95,6 +96,7 @@ public class Coordinator {
private final ExecutorService executor;
private final ExecutorService writeOperationExecutor;
private final ScheduledExecutorService scheduledExecutor;
+ private final ExecutorService dispatchExecutor;
private final QueryIdGenerator queryIdGenerator =
new
QueryIdGenerator(IoTDBDescriptor.getInstance().getConfig().getDataNodeId());
@@ -108,6 +110,13 @@ public class Coordinator {
this.executor = getQueryExecutor();
this.writeOperationExecutor = getWriteExecutor();
this.scheduledExecutor = getScheduledExecutor();
+ int dispatchThreadNum = Math.max(20,
Runtime.getRuntime().availableProcessors() * 2);
+ this.dispatchExecutor =
+ IoTDBThreadPoolFactory.newCachedThreadPool(
+ ThreadName.FRAGMENT_INSTANCE_DISPATCH.getName(),
+ dispatchThreadNum,
+ dispatchThreadNum,
+ new ThreadPoolExecutor.CallerRunsPolicy());
}
private ExecutionResult execution(
@@ -318,4 +327,8 @@ public class Coordinator {
}
return -1L;
}
+
+ public ExecutorService getDispatchExecutor() {
+ return dispatchExecutor;
+ }
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
index 843374ec49c..6ec87d08918 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TreeModelPlanner.java
@@ -132,7 +132,7 @@ public class TreeModelPlanner implements IPlanner {
new ClusterScheduler(
context,
stateMachine,
- distributedPlan.getInstances(),
+ distributedPlan,
context.getQueryType(),
executor,
writeOperationExecutor,
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
index 484a558f2f4..d9e91787ad4 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/SimpleFragmentParallelPlanner.java
@@ -191,6 +191,7 @@ public class SimpleFragmentParallelPlanner implements
IFragmentParallelPlaner {
fragmentInstance.getFragment().generateTypeProvider(queryContext.getTypeProvider());
}
instanceMap.putIfAbsent(fragment.getId(), fragmentInstance);
+ fragment.setIndexInFragmentInstanceList(fragmentInstanceList.size());
fragmentInstanceList.add(fragmentInstance);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
index 3b32f2932cb..bf3d06b4186 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/PlanFragment.java
@@ -49,6 +49,7 @@ public class PlanFragment {
// indicate whether this PlanFragment is the root of the whole
Fragment-Plan-Tree or not
private boolean isRoot;
+ private int indexInFragmentInstanceList = -1;
public PlanFragment(PlanFragmentId id, PlanNode planNodeTree) {
this.id = id;
@@ -56,6 +57,14 @@ public class PlanFragment {
this.isRoot = false;
}
+ public int getIndexInFragmentInstanceList() {
+ return indexInFragmentInstanceList;
+ }
+
+ public void setIndexInFragmentInstanceList(int indexInFragmentInstanceList) {
+ this.indexInFragmentInstanceList = indexInFragmentInstanceList;
+ }
+
public PlanFragmentId getId() {
return id;
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
index 448f0829b5a..33f1bd3a194 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/ClusterScheduler.java
@@ -28,7 +28,9 @@ import
org.apache.iotdb.db.queryengine.execution.QueryStateMachine;
import org.apache.iotdb.db.queryengine.execution.fragment.FragmentInfo;
import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.DistributedQueryPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
import org.apache.iotdb.rpc.TSStatusCode;
import io.airlift.units.Duration;
@@ -56,6 +58,7 @@ public class ClusterScheduler implements IScheduler {
// The stateMachine of the QueryExecution owned by this QueryScheduler
private final QueryStateMachine stateMachine;
private final QueryType queryType;
+ private final SubPlan rootSubPlan;
// The fragment instances which should be sent to corresponding Nodes.
private final List<FragmentInstance> instances;
@@ -69,7 +72,7 @@ public class ClusterScheduler implements IScheduler {
public ClusterScheduler(
MPPQueryContext queryContext,
QueryStateMachine stateMachine,
- List<FragmentInstance> instances,
+ DistributedQueryPlan distributedQueryPlan,
QueryType queryType,
ExecutorService executor,
ExecutorService writeOperationExecutor,
@@ -78,7 +81,8 @@ public class ClusterScheduler implements IScheduler {
IClientManager<TEndPoint, AsyncDataNodeInternalServiceClient>
asyncInternalServiceClientManager) {
this.stateMachine = stateMachine;
- this.instances = instances;
+ this.rootSubPlan = distributedQueryPlan.getRootSubPlan();
+ this.instances = distributedQueryPlan.getInstances();
this.queryType = queryType;
this.dispatcher =
new FragmentInstanceDispatcherImpl(
@@ -114,7 +118,8 @@ public class ClusterScheduler implements IScheduler {
public void start() {
stateMachine.transitionToDispatching();
long startTime = System.nanoTime();
- Future<FragInstanceDispatchResult> dispatchResultFuture =
dispatcher.dispatch(instances);
+ Future<FragInstanceDispatchResult> dispatchResultFuture =
+ dispatcher.dispatch(rootSubPlan, instances);
// NOTICE: the FragmentInstance may be dispatched to another Host due to
consensus redirect.
// So we need to start the state fetcher after the dispatching stage.
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
index 8dbfc148bb6..fce62782abc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/FragmentInstanceDispatcherImpl.java
@@ -38,8 +38,10 @@ import
org.apache.iotdb.db.queryengine.execution.executor.RegionExecutionResult;
import org.apache.iotdb.db.queryengine.execution.executor.RegionReadExecutor;
import org.apache.iotdb.db.queryengine.execution.executor.RegionWriteExecutor;
import org.apache.iotdb.db.queryengine.metric.QueryExecutionMetricSet;
+import org.apache.iotdb.db.queryengine.plan.Coordinator;
import org.apache.iotdb.db.queryengine.plan.analyze.QueryType;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.utils.SetThreadName;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstance;
@@ -54,6 +56,7 @@ import org.apache.iotdb.rpc.TSStatusCode;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.apache.thrift.TException;
+import org.apache.tsfile.utils.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,6 +65,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import static com.google.common.util.concurrent.Futures.immediateFuture;
@@ -113,17 +117,104 @@ public class FragmentInstanceDispatcherImpl implements
IFragInstanceDispatcher {
}
@Override
- public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance>
instances) {
+ public Future<FragInstanceDispatchResult> dispatch(
+ SubPlan root, List<FragmentInstance> instances) {
if (type == QueryType.READ) {
- return dispatchRead(instances);
+ return instances.size() == 1 || root == null
+ ? dispatchRead(instances)
+ : topologicalParallelDispatchRead(root, instances);
} else {
return dispatchWriteAsync(instances);
}
}
- // TODO: (xingtanzjr) currently we use a sequential dispatch policy for
READ, which is
- // unsafe for current FragmentInstance scheduler framework. We need to
implement the
- // topological dispatch according to dependency relations between
FragmentInstances
+ private Future<FragInstanceDispatchResult> topologicalParallelDispatchRead(
+ SubPlan root, List<FragmentInstance> instances) {
+ long startTime = System.nanoTime();
+ LinkedBlockingQueue<Pair<SubPlan, Boolean>> queue = new
LinkedBlockingQueue<>(instances.size());
+ List<Future<FragInstanceDispatchResult>> futures = new
ArrayList<>(instances.size());
+ queue.add(new Pair<>(root, true));
+ try {
+ while (futures.size() < instances.size()) {
+ Pair<SubPlan, Boolean> pair = queue.take();
+ SubPlan next = pair.getLeft();
+ boolean previousSuccess = pair.getRight();
+ if (!previousSuccess) {
+ break;
+ }
+ FragmentInstance fragmentInstance =
+
instances.get(next.getPlanFragment().getIndexInFragmentInstanceList());
+ futures.add(asyncDispatchOneInstance(next, fragmentInstance, queue));
+ }
+ for (Future<FragInstanceDispatchResult> future : futures) {
+ FragInstanceDispatchResult result = future.get();
+ if (!result.isSuccessful()) {
+ return immediateFuture(result);
+ }
+ }
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ LOGGER.warn("Interrupted when dispatching read async", e);
+ return immediateFuture(
+ new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, "Interrupted errors: " +
e.getMessage())));
+ } catch (Throwable t) {
+ LOGGER.warn(DISPATCH_FAILED, t);
+ return immediateFuture(
+ new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS +
t.getMessage())));
+ } finally {
+ long queryDispatchReadTime = System.nanoTime() - startTime;
+ QUERY_EXECUTION_METRIC_SET.recordExecutionCost(DISPATCH_READ,
queryDispatchReadTime);
+ queryContext.recordDispatchCost(queryDispatchReadTime);
+ }
+ return immediateFuture(new FragInstanceDispatchResult(true));
+ }
+
+ private Future<FragInstanceDispatchResult> asyncDispatchOneInstance(
+ SubPlan plan, FragmentInstance instance,
LinkedBlockingQueue<Pair<SubPlan, Boolean>> queue) {
+ return Coordinator.getInstance()
+ .getDispatchExecutor()
+ .submit(
+ () -> {
+ boolean success = false;
+ try (SetThreadName threadName = new
SetThreadName(instance.getId().getFullId())) {
+ dispatchOneInstance(instance);
+ success = true;
+ } catch (FragmentInstanceDispatchException e) {
+ return new FragInstanceDispatchResult(e.getFailureStatus());
+ } catch (Throwable t) {
+ LOGGER.warn(DISPATCH_FAILED, t);
+ return new FragInstanceDispatchResult(
+ RpcUtils.getStatus(
+ TSStatusCode.INTERNAL_SERVER_ERROR, UNEXPECTED_ERRORS
+ t.getMessage()));
+ } finally {
+ if (!success) {
+ // In case of failure, only one notification needs to be
sent to the outer layer.
+ // If the failure is not notified and the child FI is not
sent, the outer loop
+ // won't be able to exit.
+ queue.add(new Pair<>(null, false));
+ } else {
+ for (SubPlan child : plan.getChildren()) {
+ queue.add(new Pair<>(child, true));
+ }
+ }
+ // friendly for gc, clear the plan node tree, for some queries
select all devices,
+ // it will release lots of memory
+ if (!queryContext.isExplainAnalyze()) {
+ // EXPLAIN ANALYZE will use these instances, so we can't
clear them
+ instance.getFragment().clearUselessField();
+ } else {
+ // TypeProvider is not used in EXPLAIN ANALYZE, so we can
clear it
+ instance.getFragment().clearTypeProvider();
+ }
+ }
+ return new FragInstanceDispatchResult(true);
+ });
+ }
+
private Future<FragInstanceDispatchResult>
dispatchRead(List<FragmentInstance> instances) {
long startTime = System.nanoTime();
for (FragmentInstance instance : instances) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/IFragInstanceDispatcher.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/IFragInstanceDispatcher.java
index 97363e44097..5552063a5d7 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/IFragInstanceDispatcher.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/IFragInstanceDispatcher.java
@@ -20,6 +20,7 @@
package org.apache.iotdb.db.queryengine.plan.scheduler;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
import java.util.List;
import java.util.concurrent.Future;
@@ -28,10 +29,11 @@ public interface IFragInstanceDispatcher {
/**
* Dispatch all Fragment instances asynchronously
*
+ * @param root the root SubPlan
* @param instances Fragment instance list
- * @return Boolean.
+ * @return Future<FragInstanceDispatchResult>
*/
- Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance>
instances);
+ Future<FragInstanceDispatchResult> dispatch(SubPlan root,
List<FragmentInstance> instances);
void abort();
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
index 28660fd0e5d..41b003c5b77 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileDispatcherImpl.java
@@ -36,6 +36,7 @@ import org.apache.iotdb.db.exception.load.LoadFileException;
import org.apache.iotdb.db.exception.mpp.FragmentInstanceDispatchException;
import org.apache.iotdb.db.pipe.agent.PipeDataNodeAgent;
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.SubPlan;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadSingleTsFileNode;
@@ -102,7 +103,8 @@ public class LoadTsFileDispatcherImpl implements
IFragInstanceDispatcher {
}
@Override
- public Future<FragInstanceDispatchResult> dispatch(List<FragmentInstance>
instances) {
+ public Future<FragInstanceDispatchResult> dispatch(
+ SubPlan root, List<FragmentInstance> instances) {
return executor.submit(
() -> {
for (FragmentInstance instance : instances) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
index fa24b64fc40..59aa57ffd4e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/scheduler/load/LoadTsFileScheduler.java
@@ -339,7 +339,7 @@ public class LoadTsFileScheduler implements IScheduler {
queryContext.getSession());
instance.setExecutorAndHost(new StorageExecutor(replicaSet));
Future<FragInstanceDispatchResult> dispatchResultFuture =
- dispatcher.dispatch(Collections.singletonList(instance));
+ dispatcher.dispatch(null, Collections.singletonList(instance));
try {
FragInstanceDispatchResult result =
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
index 1cf5488d917..6234705ed2d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/IoTDBThreadPoolFactory.java
@@ -160,6 +160,34 @@ public class IoTDBThreadPoolFactory {
poolName);
}
+ /**
+ * see {@link
Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory)}.
+ *
+ * @param poolName the name of thread pool.
+ * @param corePoolSize the corePoolSize of thread pool
+ * @param maximumPoolSize the maximumPoolSize of thread pool
+ * @param rejectedExecutionHandler the reject handler
+ * @return thread pool.
+ */
+ public static ExecutorService newCachedThreadPool(
+ String poolName,
+ int corePoolSize,
+ int maximumPoolSize,
+ RejectedExecutionHandler rejectedExecutionHandler) {
+ logger.info(NEW_CACHED_THREAD_POOL_LOGGER_FORMAT, poolName);
+ WrappedThreadPoolExecutor executor =
+ new WrappedThreadPoolExecutor(
+ corePoolSize,
+ maximumPoolSize,
+ 60L,
+ TimeUnit.SECONDS,
+ new SynchronousQueue<>(),
+ new IoTThreadFactory(poolName),
+ poolName);
+ executor.setRejectedExecutionHandler(rejectedExecutionHandler);
+ return executor;
+ }
+
/**
* see {@link
Executors#newCachedThreadPool(java.util.concurrent.ThreadFactory)}.
*
diff --git
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
index 4cdb24cd328..19aff15004d 100644
---
a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
+++
b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/concurrent/ThreadName.java
@@ -33,6 +33,7 @@ public enum ThreadName {
TIMED_QUERY_SQL_COUNT("Timed-Query-SQL-Count"),
FRAGMENT_INSTANCE_MANAGEMENT("Fragment-Instance-Management"),
FRAGMENT_INSTANCE_NOTIFICATION("Fragment-Instance-Notification"),
+ FRAGMENT_INSTANCE_DISPATCH("Fragment-Instance-Dispatch"),
DRIVER_TASK_SCHEDULER_NOTIFICATION("Driver-Task-Scheduler-Notification"),
// -------------------------- MPP --------------------------
MPP_COORDINATOR_SCHEDULED_EXECUTOR("MPP-Coordinator-Scheduled-Executor"),