This is an automated email from the ASF dual-hosted git repository.

lancelly pushed a commit to branch queryPriority
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 8e747b982857d2afa4d4afeadac6207f4d2b564a
Author: lancelly <[email protected]>
AuthorDate: Thu Nov 16 23:22:01 2023 +0800

    make FIs of one query managed together by TaskHandle in 
MultiLevelPriorityQueue
---
 .../execution/schedule/DriverScheduler.java        | 56 +++++++++++++---------
 .../queue/multilevelqueue/DriverTaskHandle.java    |  5 ++
 .../multilevelqueue/MultilevelPriorityQueue.java   |  2 +-
 3 files changed, 40 insertions(+), 23 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
index 1ad010eead7..45796fa3223 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/DriverScheduler.java
@@ -65,6 +65,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.BLOCK_QUEUED_TIME;
 import static 
org.apache.iotdb.db.queryengine.metric.DriverSchedulerMetricSet.READY_QUEUED_TIME;
 
@@ -86,6 +87,9 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
   private final IndexedBlockingQueue<DriverTask> timeoutQueue;
   private final Set<DriverTask> blockedTasks;
   private final Map<QueryId, Map<FragmentInstanceId, Set<DriverTask>>> 
queryMap;
+  /** All FIs of one query dispatched to this Node shares one DriverTaskHandle 
*/
+  private final Map<QueryId, DriverTaskHandle> queryIdToDriverTaskHandleMap;
+
   private final ITaskScheduler scheduler;
 
   private final AtomicInteger nextDriverTaskHandleId = new AtomicInteger(0);
@@ -105,6 +109,7 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
         new L1PriorityQueue<>(
             QUERY_MAX_CAPACITY, new DriverTask.TimeoutComparator(), new 
DriverTask());
     this.queryMap = new ConcurrentHashMap<>();
+    this.queryIdToDriverTaskHandleMap = new ConcurrentHashMap<>();
     this.blockedTasks = Collections.synchronizedSet(new HashSet<>());
     this.scheduler = new Scheduler();
     this.workerGroups = new ThreadGroup("ScheduleThreads");
@@ -183,10 +188,13 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
       QueryId queryId, List<IDriver> drivers, long timeOut, SessionInfo 
sessionInfo)
       throws CpuNotEnoughException, MemoryNotEnoughException {
     DriverTaskHandle driverTaskHandle =
-        new DriverTaskHandle(
-            getNextDriverTaskHandleId(),
-            (MultilevelPriorityQueue) readyQueue,
-            OptionalInt.of(Integer.MAX_VALUE));
+        queryIdToDriverTaskHandleMap.computeIfAbsent(
+            queryId,
+            k ->
+                new DriverTaskHandle(
+                    getNextDriverTaskHandleId(),
+                    (MultilevelPriorityQueue) readyQueue,
+                    OptionalInt.of(Integer.MAX_VALUE)));
     List<DriverTask> tasks = new ArrayList<>();
     drivers.forEach(
         driver ->
@@ -293,21 +301,6 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
     }
   }
 
-  @Override
-  public void abortQuery(QueryId queryId) {
-    Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = 
queryMap.remove(queryId);
-    if (queryRelatedTasks != null) {
-      for (Set<DriverTask> fragmentRelatedTasks : queryRelatedTasks.values()) {
-        if (fragmentRelatedTasks != null) {
-          for (DriverTask task : fragmentRelatedTasks) {
-            
task.setAbortCause(DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED);
-            clearDriverTask(task);
-          }
-        }
-      }
-    }
-  }
-
   @Override
   public void abortFragmentInstance(FragmentInstanceId instanceId) {
     Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks =
@@ -362,8 +355,8 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
       }
 
       timeoutQueue.remove(task.getDriverTaskId());
-      Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks =
-          queryMap.get(task.getDriverTaskId().getQueryId());
+      QueryId queryId = task.getDriverTaskId().getQueryId();
+      Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = 
queryMap.get(queryId);
       if (queryRelatedTasks != null) {
         Set<DriverTask> instanceRelatedTasks =
             
queryRelatedTasks.get(task.getDriverTaskId().getFragmentInstanceId());
@@ -374,7 +367,8 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
           }
         }
         if (queryRelatedTasks.isEmpty()) {
-          queryMap.remove(task.getDriverTaskId().getQueryId());
+          queryMap.remove(queryId);
+          queryIdToDriverTaskHandleMap.remove(queryId);
         }
       }
       try {
@@ -455,6 +449,24 @@ public class DriverScheduler implements IDriverScheduler, 
IService {
     this.blockManager = blockManager;
   }
 
+  @TestOnly
+  @Override
+  public void abortQuery(QueryId queryId) {
+    Map<FragmentInstanceId, Set<DriverTask>> queryRelatedTasks = 
queryMap.remove(queryId);
+    if (queryRelatedTasks != null) {
+      for (Set<DriverTask> fragmentRelatedTasks : queryRelatedTasks.values()) {
+        if (fragmentRelatedTasks != null) {
+          for (DriverTask task : fragmentRelatedTasks) {
+            
task.setAbortCause(DriverTaskAbortedException.BY_QUERY_CASCADING_ABORTED);
+            clearDriverTask(task);
+          }
+        }
+      }
+    }
+    DriverTaskHandle taskHandle = queryIdToDriverTaskHandleMap.remove(queryId);
+    checkState(taskHandle == null, "taskHandle must be removed when 
clearDriverTasks are done.");
+  }
+
   private static class InstanceHolder {
 
     private InstanceHolder() {}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/DriverTaskHandle.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/DriverTaskHandle.java
index d30784dcd6f..c1ec657fd29 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/DriverTaskHandle.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/DriverTaskHandle.java
@@ -27,6 +27,11 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static java.util.Objects.requireNonNull;
 
+/**
+ * This class is used to ensure that all the DriverTasks of all the FIs of one 
query accumulate
+ * scheduling time together, to avoid a single query occupying too much Level0 
time due to an excess
+ * of FIs.
+ */
 public class DriverTaskHandle {
 
   private final int driverTaskHandleId;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java
index 82c4f774f0e..e70e588956e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/schedule/queue/multilevelqueue/MultilevelPriorityQueue.java
@@ -37,7 +37,7 @@ import static java.util.concurrent.TimeUnit.SECONDS;
  */
 public class MultilevelPriorityQueue extends 
IndexedBlockingReserveQueue<DriverTask> {
   /** Scheduled time threshold of TASK in each level. */
-  static final int[] LEVEL_THRESHOLD_SECONDS = {0, 1, 10, 60, 300};
+  static final int[] LEVEL_THRESHOLD_SECONDS = {0, 3, 30, 120, 600};
 
   /** the upper limit one Task can contribute to its level in one scheduled 
time. */
   static final long LEVEL_CONTRIBUTION_CAP = SECONDS.toNanos(30);

Reply via email to