This is an automated email from the ASF dual-hosted git repository. xiangweiwei pushed a commit to branch multiQueue in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit a736d70950b3163b527c89d06508bbb440d420d8 Author: Alima777 <wxw19981...@gmail.com> AuthorDate: Wed Feb 1 17:17:35 2023 +0800 Allow query scheduler to receive more tasks --- .../db/mpp/execution/schedule/DriverScheduler.java | 51 +++++++++++++++++----- .../schedule/queue/IndexedBlockingQueue.java | 38 ++++++++++++---- 2 files changed, 70 insertions(+), 19 deletions(-) diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java index ddcf072a1b..3175d563bf 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/DriverScheduler.java @@ -40,6 +40,8 @@ import org.apache.iotdb.db.mpp.metric.QueryMetricsManager; import org.apache.iotdb.db.utils.SetThreadName; import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -92,7 +94,8 @@ public class DriverScheduler implements IDriverScheduler, IService { this.readyQueue = new MultilevelPriorityQueue(LEVEL_TIME_MULTIPLIER, MAX_CAPACITY, new DriverTask()); this.timeoutQueue = - new L1PriorityQueue<>(MAX_CAPACITY, new DriverTask.TimeoutComparator(), new DriverTask()); + new L1PriorityQueue<>( + Integer.MAX_VALUE, new DriverTask.TimeoutComparator(), new DriverTask()); this.queryMap = new ConcurrentHashMap<>(); this.blockedTasks = Collections.synchronizedSet(new HashSet<>()); this.scheduler = new Scheduler(); @@ -198,9 +201,18 @@ public class DriverScheduler implements IDriverScheduler, IService { if (task.getStatus() != DriverTaskStatus.READY) { continue; } - timeoutQueue.push(task); - readyQueue.push(task); - task.setLastEnterReadyQueueTime(System.nanoTime()); + SettableFuture<?> isBlocked = readyQueue.push(task); + if (isBlocked.isDone()) { + timeoutQueue.push(task); + task.setLastEnterReadyQueueTime(System.nanoTime()); + } else { + isBlocked.addListener( + () -> { + timeoutQueue.push(task); + task.setLastEnterReadyQueueTime(System.nanoTime()); + }, + MoreExecutors.directExecutor()); + } } finally { task.unlock(); } @@ -362,12 +374,23 @@ public class DriverScheduler implements IDriverScheduler, IService { } task.setStatus(DriverTaskStatus.READY); - QUERY_METRICS.recordTaskQueueTime( - BLOCK_QUEUED_TIME, System.nanoTime() - task.getLastEnterBlockQueueTime()); - task.setLastEnterReadyQueueTime(System.nanoTime()); - task.resetLevelScheduledTime(); - readyQueue.push(task); blockedTasks.remove(task); + SettableFuture<?> isBlocked = readyQueue.push(task); + if (isBlocked.isDone()) { + QUERY_METRICS.recordTaskQueueTime( + BLOCK_QUEUED_TIME, System.nanoTime() - task.getLastEnterBlockQueueTime()); + task.setLastEnterReadyQueueTime(System.nanoTime()); + task.resetLevelScheduledTime(); + } else { + isBlocked.addListener( + () -> { + QUERY_METRICS.recordTaskQueueTime( + BLOCK_QUEUED_TIME, System.nanoTime() - task.getLastEnterBlockQueueTime()); + task.setLastEnterReadyQueueTime(System.nanoTime()); + task.resetLevelScheduledTime(); + }, + MoreExecutors.directExecutor()); + } } finally { task.unlock(); } @@ -399,8 +422,14 @@ public class DriverScheduler implements IDriverScheduler, IService { } task.updateSchedulePriority(context); task.setStatus(DriverTaskStatus.READY); - task.setLastEnterReadyQueueTime(System.nanoTime()); - readyQueue.push(task); + SettableFuture<?> isBlocked = readyQueue.push(task); + if (isBlocked.isDone()) { + task.setLastEnterReadyQueueTime(System.nanoTime()); + } else { + isBlocked.addListener( + () -> task.setLastEnterReadyQueueTime(System.nanoTime()), + MoreExecutors.directExecutor()); + } } finally { task.unlock(); } diff --git a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/IndexedBlockingQueue.java b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/IndexedBlockingQueue.java index 1498bbbc30..1e39eb8f08 100644 --- a/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/IndexedBlockingQueue.java +++ b/server/src/main/java/org/apache/iotdb/db/mpp/execution/schedule/queue/IndexedBlockingQueue.java @@ -19,6 +19,10 @@ package org.apache.iotdb.db.mpp.execution.schedule.queue; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.SettableFuture; + +import java.util.Deque; +import java.util.LinkedList; /** * The base class of a special kind of blocking queue, which has these characters: @@ -40,6 +44,7 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> { private final int MAX_CAPACITY; private final E queryHolder; private int size; + private Deque<SettableFuture<?>> blockedTasks; /** * Init the queue with a max capacity. The queryHolder is just a simple reused object in query to @@ -53,6 +58,7 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> { public IndexedBlockingQueue(int maxCapacity, E queryHolder) { this.MAX_CAPACITY = maxCapacity; this.queryHolder = queryHolder; + this.blockedTasks = new LinkedList<>(); } /** @@ -67,31 +73,38 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> { } E output = pollFirst(); size--; + tryToAddBlockedTask(); return output; } /** * Push an element to the queue. The new element position is determined by the implementation. If - * the queue size has been reached the maxCapacity, or the queue has already contained an element - * with the same ID, an {@link IllegalStateException} will be thrown. If the element is null, an - * {@link NullPointerException} will be thrown. + * the queue has already contained an element with the same ID, an {@link IllegalStateException} + * will be thrown. If the queue size has been reached the maxCapacity, the element will wait a + * free space. If the element is null, an {@link NullPointerException} will be thrown. * * @param element the element to be pushed. * @throws NullPointerException the pushed element is null. * @throws IllegalStateException the queue size has been reached the maxCapacity, or the queue has * already contained the same ID element . */ - public synchronized void push(E element) { + public synchronized SettableFuture<?> push(E element) { if (element == null) { throw new NullPointerException("pushed element is null"); } Preconditions.checkState( !contains(element), "The queue has already contained the element: " + element.getDriverTaskId()); - Preconditions.checkState(size < MAX_CAPACITY, "The queue is full"); - pushToQueue(element); - size++; - this.notifyAll(); + SettableFuture<?> blockedFuture = SettableFuture.create(); + if (size < MAX_CAPACITY) { + pushToQueue(element); + size++; + this.notifyAll(); + blockedFuture.set(null); + } else { + blockedTasks.add(blockedFuture); + } + return blockedFuture; } /** @@ -107,6 +120,7 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> { return null; } size--; + tryToAddBlockedTask(); return output; } @@ -124,9 +138,17 @@ public abstract class IndexedBlockingQueue<E extends IDIndexedAccessible> { /** Clear all the elements in the queue. */ public synchronized void clear() { clearAllElements(); + blockedTasks.clear(); size = 0; } + public synchronized void tryToAddBlockedTask() { + if (!blockedTasks.isEmpty()) { + SettableFuture<?> blockedFuture = blockedTasks.pollFirst(); + blockedFuture.set(null); + } + } + /** * Get the current queue size. *