This is an automated email from the ASF dual-hosted git repository. lancelly pushed a commit to branch queryPriority in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 8e747b982857d2afa4d4afeadac6207f4d2b564a Author: lancelly <[email protected]> AuthorDate: Thu Nov 16 23:22:01 2023 +0800 make FIs of one query managed together by TaskHandle in MultiLevelPriorityQueue --- .../execution/schedule/DriverScheduler.java | 56 +++++++++++++--------- .../queue/multilevelqueue/DriverTaskHandle.java | 5 ++ .../multilevelqueue/MultilevelPriorityQueue.java | 2 +- 3 files changed, 40 insertions(+), 23 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java index 1ad010eead7..45796fa3223 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java @@ -65,6 +65,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import static com.google.common.base.Preconditions.checkState; import static org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME; import static org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME; @@ -86,6 +87,9 @@ public class DriverScheduler implements IDriverScheduler, IService { private final IndexedBlockingQueue<DriverTask> timeoutQueue; private final Set<DriverTask> blockedTasks; private final Map<QueryId, Map<FragmentInstanceId, Set<DriverTask>>> queryMap; + /** All FIs of one query dispatched to this Node shares one DriverTaskHandle */ + private final Map<QueryId, DriverTaskHandle> queryIdToDriverTaskHandleMap; + private final ITaskScheduler scheduler; private final AtomicInteger nextDriverTaskHandleId = new AtomicInteger(0); @@ -105,6 +109,7 @@ public class DriverScheduler implements IDriverScheduler, IService { new L1PriorityQueue<>( QUERY_MAX_CAPACITY, new DriverTask.TimeoutComparator(), new DriverTask()); this.queryMap = new ConcurrentHashMap<>(); + this.queryIdToDriverTaskHandleMap = new ConcurrentHashMap<>(); this.blockedTasks = Collections.synchronizedSet(new HashSet<>()); this.scheduler = new Scheduler(); this.workerGroups = new ThreadGroup("ScheduleThreads"); @@ -183,10 +188,13 @@ public class DriverScheduler implements IDriverScheduler, IService { QueryId queryId, List<IDriver> drivers, long timeOut, SessionInfo sessionInfo) throws CpuNotEnoughException, MemoryNotEnoughException { DriverTaskHandle driverTaskHandle = - new DriverTaskHandle( - getNextDriverTaskHandleId(), - (MultilevelPriorityQueue) readyQueue, - OptionalInt.of(Integer.MAX_VALUE)); + queryIdToDriverTaskHandleMap.computeIfAbsent( + queryId, + k -> + new DriverTaskHandle( + getNextDriverTaskHandleId(), + (MultilevelPriorityQueue) readyQueue, + OptionalInt.of(Integer.MAX_VALUE))); List<DriverTask> tasks = new ArrayList<>(); drivers.forEach( driver -> @@ -293,21 +301,6 @@ public class DriverScheduler implements IDriverScheduler, IService { } } - @Override - public void abortQuery(QueryId queryId) { - Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = queryMap.remove(queryId); - if (queryRelatedTasks != null) { - for (Set<DriverTask> fragmentRelatedTasks : queryRelatedTasks.values()) { - if (fragmentRelatedTasks != null) { - for (DriverTask task : fragmentRelatedTasks) { - task.setAbortCause(DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED); - clearDriverTask(task); - } - } - } - } - } - @Override public void abortFragmentInstance(FragmentInstanceId instanceId) { Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = @@ -362,8 +355,8 @@ public class DriverScheduler implements IDriverScheduler, IService { } timeoutQueue.remove(task.getDriverTaskId()); - Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = - queryMap.get(task.getDriverTaskId().getQueryId()); + QueryId queryId = task.getDriverTaskId().getQueryId(); + Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = queryMap.get(queryId); if (queryRelatedTasks != null) { Set<DriverTask> instanceRelatedTasks = queryRelatedTasks.get(task.getDriverTaskId().getFragmentInstanceId()); @@ -374,7 +367,8 @@ public class DriverScheduler implements IDriverScheduler, IService { } } if (queryRelatedTasks.isEmpty()) { - queryMap.remove(task.getDriverTaskId().getQueryId()); + queryMap.remove(queryId); + queryIdToDriverTaskHandleMap.remove(queryId); } } try { @@ -455,6 +449,24 @@ public class DriverScheduler implements IDriverScheduler, IService { this.blockManager = blockManager; } + @TestOnly + @Override + public void abortQuery(QueryId queryId) { + Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = queryMap.remove(queryId); + if (queryRelatedTasks != null) { + for (Set<DriverTask> fragmentRelatedTasks : queryRelatedTasks.values()) { + if (fragmentRelatedTasks != null) { + for (DriverTask task : fragmentRelatedTasks) { + task.setAbortCause(DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED); + clearDriverTask(task); + } + } + } + } + DriverTaskHandle taskHandle = queryIdToDriverTaskHandleMap.remove(queryId); + checkState(taskHandle == null, "taskHandle must be removed when clearDriverTasks are done."); + } + private static class InstanceHolder { private InstanceHolder() {} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/DriverTaskHandle.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/DriverTaskHandle.java index d30784dcd6f..c1ec657fd29 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/DriverTaskHandle.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/DriverTaskHandle.java @@ -27,6 +27,11 @@ import java.util.concurrent.atomic.AtomicReference; import static java.util.Objects.requireNonNull; +/** + * This class is used to ensure that all the DriverTasks of all the FIs of one query accumulate + * scheduling time together, to avoid a single query occupying too much Level0 time due to an excess + * of FIs. + */ public class DriverTaskHandle { private final int driverTaskHandleId; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java index 82c4f774f0e..e70e588956e 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java @@ -37,7 +37,7 @@ import static java.util.concurrent.TimeUnit.SECONDS; */ public class MultilevelPriorityQueue extends IndexedBlockingReserveQueue<DriverTask> { /** Scheduled time threshold of TASK in each level. */ - static final int[] LEVEL_THRESHOLD_SECONDS = {0, 1, 10, 60, 300}; + static final int[] LEVEL_THRESHOLD_SECONDS = {0, 3, 30, 120, 600}; /** the upper limit one Task can contribute to its level in one scheduled time. */ static final long LEVEL_CONTRIBUTION_CAP = SECONDS.toNanos(30);
