Repository: hive Updated Branches: refs/heads/llap 3bf0a45f8 -> a8ac648c8
HIVE-11272. LLAP: Execution order within LLAP daemons should consider query-specific priority assigned to fragments. (Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/a8ac648c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/a8ac648c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/a8ac648c Branch: refs/heads/llap Commit: a8ac648c86f11f856b1a307aa0e545482618b769 Parents: 3bf0a45 Author: Siddharth Seth <ss...@apache.org> Authored: Thu Aug 13 18:01:52 2015 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Thu Aug 13 18:01:52 2015 -0700 ---------------------------------------------------------------------- .../llap/daemon/impl/TaskExecutorService.java | 94 +++++++++++++++----- .../llap/daemon/impl/TaskRunnerCallable.java | 36 ++++---- .../daemon/impl/TestTaskExecutorService.java | 50 ++++++++++- .../daemon/impl/TestTaskExecutorService2.java | 52 ++++++++++- 4 files changed, 187 insertions(+), 45 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/a8ac648c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 5099a5c..f99c05d 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.service.AbstractService; import org.apache.tez.runtime.task.EndReason; @@ -380,6 +381,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta try { synchronized (lock) { boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish(); + LOG.info("Attempting to execute {}", taskWrapper); ListenableFuture<TaskRunner2Result> future = executorService.submit(taskWrapper.getTaskRunnerCallable()); taskWrapper.setIsInWaitQueue(false); FutureCallback<TaskRunner2Result> wrappedCallback = createInternalCompletionListener(taskWrapper); @@ -584,23 +586,41 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta public int compare(TaskWrapper t1, TaskWrapper t2) { TaskRunnerCallable o1 = t1.getTaskRunnerCallable(); TaskRunnerCallable o2 = t2.getTaskRunnerCallable(); - boolean newCanFinish = o1.canFinish(); - boolean oldCanFinish = o2.canFinish(); - if (newCanFinish == true && oldCanFinish == false) { + boolean o1CanFinish = o1.canFinish(); + boolean o2CanFinish = o2.canFinish(); + if (o1CanFinish == true && o2CanFinish == false) { return -1; - } else if (newCanFinish == false && oldCanFinish == true) { + } else if (o1CanFinish == false && o2CanFinish == true) { return 1; } - if (o1.getVertexParallelism() < o2.getVertexParallelism()) { + FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo(); + FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo(); + + // Check if these belong to the same task, and work with withinDagPriority + if (o1.getQueryId().equals(o2.getQueryId())) { + // Same Query + // Within dag priority - lower values indicate higher priority. + if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) { + return -1; + } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){ + return 1; + } + } + + // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and + // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed. + int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks(); + int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks(); + if (knownPending1 < knownPending2) { return -1; - } else if (o1.getVertexParallelism() > o2.getVertexParallelism()) { + } else if (knownPending1 > knownPending2) { return 1; } - if (o1.getFirstAttemptStartTime() < o2.getFirstAttemptStartTime()) { + if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) { return -1; - } else if (o1.getFirstAttemptStartTime() > o2.getFirstAttemptStartTime()) { + } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) { return 1; } return 0; @@ -610,8 +630,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta // if map tasks and reduce tasks are in finishable state then priority is given to the task in // the following order // 1) Dag start time - // 2) Attempt start time - // 3) Vertex parallelism + // 2) Within dag priority + // 3) Attempt start time + // 4) Vertex parallelism @VisibleForTesting public static class FirstInFirstOutComparator implements Comparator<TaskWrapper> { @@ -619,29 +640,47 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta public int compare(TaskWrapper t1, TaskWrapper t2) { TaskRunnerCallable o1 = t1.getTaskRunnerCallable(); TaskRunnerCallable o2 = t2.getTaskRunnerCallable(); - boolean newCanFinish = o1.canFinish(); - boolean oldCanFinish = o2.canFinish(); - if (newCanFinish == true && oldCanFinish == false) { + boolean o1CanFinish = o1.canFinish(); + boolean o2CanFinish = o2.canFinish(); + if (o1CanFinish == true && o2CanFinish == false) { return -1; - } else if (newCanFinish == false && oldCanFinish == true) { + } else if (o1CanFinish == false && o2CanFinish == true) { return 1; } - if (o1.getDagStartTime() < o2.getDagStartTime()) { + FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo(); + FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo(); + + if (fri1.getDagStartTime() < fri2.getDagStartTime()) { return -1; - } else if (o1.getDagStartTime() > o2.getDagStartTime()) { + } else if (fri1.getDagStartTime() > fri2.getDagStartTime()) { return 1; } - if (o1.getFirstAttemptStartTime() < o2.getFirstAttemptStartTime()) { + // Check if these belong to the same task, and work with withinDagPriority + if (o1.getQueryId().equals(o2.getQueryId())) { + // Same Query + // Within dag priority - lower values indicate higher priority. + if (fri1.getWithinDagPriority() < fri2.getWithinDagPriority()) { + return -1; + } else if (fri1.getWithinDagPriority() > fri2.getWithinDagPriority()){ + return 1; + } + } + + if (fri1.getFirstAttemptStartTime() < fri2.getFirstAttemptStartTime()) { return -1; - } else if (o1.getFirstAttemptStartTime() > o2.getFirstAttemptStartTime()) { + } else if (fri1.getFirstAttemptStartTime() > fri2.getFirstAttemptStartTime()) { return 1; } - if (o1.getVertexParallelism() < o2.getVertexParallelism()) { + // Compute knownPending tasks. selfAndUpstream indicates task counts for current vertex and + // it's parent hierarchy. selfAndUpstreamComplete indicates how many of these have completed. + int knownPending1 = fri1.getNumSelfAndUpstreamTasks() - fri1.getNumSelfAndUpstreamCompletedTasks(); + int knownPending2 = fri2.getNumSelfAndUpstreamTasks() - fri2.getNumSelfAndUpstreamCompletedTasks(); + if (knownPending1 < knownPending2) { return -1; - } else if (o1.getVertexParallelism() > o2.getVertexParallelism()) { + } else if (knownPending1 > knownPending2) { return 1; } @@ -656,9 +695,12 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta public int compare(TaskWrapper t1, TaskWrapper t2) { TaskRunnerCallable o1 = t1.getTaskRunnerCallable(); TaskRunnerCallable o2 = t2.getTaskRunnerCallable(); - if (o1.getVertexParallelism() > o2.getVertexParallelism()) { + FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo(); + FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo(); + + if (fri1.getNumSelfAndUpstreamTasks() > fri2.getNumSelfAndUpstreamTasks()) { return 1; - } else if (o1.getVertexParallelism() < o2.getVertexParallelism()) { + } else if (fri1.getNumSelfAndUpstreamTasks() < fri2.getNumSelfAndUpstreamTasks()) { return -1; } return 0; @@ -738,8 +780,12 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta ", inPreemptionQueue=" + inPreemptionQueue + ", registeredForNotifications=" + registeredForNotifications + ", canFinish=" + taskRunnerCallable.canFinish() + - ", firstAttemptStartTime=" + taskRunnerCallable.getFirstAttemptStartTime() + - ", vertexParallelism=" + taskRunnerCallable.getVertexParallelism() + + ", firstAttemptStartTime=" + taskRunnerCallable.getFragmentRuntimeInfo().getFirstAttemptStartTime() + + ", dagStartTime=" + taskRunnerCallable.getFragmentRuntimeInfo().getDagStartTime() + + ", withinDagPriority=" + taskRunnerCallable.getFragmentRuntimeInfo().getWithinDagPriority() + + ", vertexParallelism= " + taskRunnerCallable.getFragmentSpec().getVertexParallelism() + + ", selfAndUpstreamParallelism= " + taskRunnerCallable.getFragmentRuntimeInfo().getNumSelfAndUpstreamTasks() + + ", selfAndUpstreamComplete= " + taskRunnerCallable.getFragmentRuntimeInfo().getNumSelfAndUpstreamCompletedTasks() + '}'; } http://git-wip-us.apache.org/repos/asf/hive/blob/a8ac648c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 52f21d9..6ceb2e5 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.common.CallableWithNdc; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWorkRequestProto; @@ -96,6 +97,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { private volatile String threadName; private final LlapDaemonExecutorMetrics metrics; private final String requestId; + private final String queryId; private boolean shouldRunTask = true; final Stopwatch runtimeWatch = new Stopwatch(); final Stopwatch killtimerWatch = new Stopwatch(); @@ -129,7 +131,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { request.getUser(), jobToken, null, request.getFragmentSpec().getDagName()); } this.metrics = metrics; - this.requestId = getRequestId(request); + this.requestId = request.getFragmentSpec().getFragmentIdentifierString(); + // TODO Change this to the queryId/Name when that's available. + this.queryId = request.getFragmentSpec().getDagName(); this.killedTaskHandler = killedTaskHandler; this.fragmentCompletionHanler = fragmentCompleteHandler; } @@ -330,8 +334,13 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { @Override public String toString() { return requestId + " {canFinish: " + canFinish() + - " vertexParallelism: " + getVertexParallelism() + - " firstAttemptStartTime: " + getFirstAttemptStartTime() + "}"; + ", vertexParallelism: " + request.getFragmentSpec().getVertexParallelism() + + ", selfAndUpstreamParallelism: " + request.getFragmentRuntimeInfo().getNumSelfAndUpstreamTasks() + + ", selfAndUpstreamComplete: " + request.getFragmentRuntimeInfo().getNumSelfAndUpstreamCompletedTasks() + + ", firstAttemptStartTime: " + getFragmentRuntimeInfo().getFirstAttemptStartTime() + + ", dagStartTime:" + getFragmentRuntimeInfo().getDagStartTime() + + ", withinDagPriority: " + getFragmentRuntimeInfo().getWithinDagPriority() + + "}"; } @Override @@ -347,14 +356,14 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { return requestId.equals(((TaskRunnerCallable) obj).getRequestId()); } - public int getVertexParallelism() { - return request.getFragmentSpec().getVertexParallelism(); - } - public String getRequestId() { return requestId; } + public String getQueryId() { + return queryId; + } + public QueryFragmentInfo getFragmentInfo() { return fragmentInfo; } @@ -470,16 +479,11 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { return sb.toString(); } - private static String getRequestId(SubmitWorkRequestProto request) { - return request.getFragmentSpec().getFragmentIdentifierString(); + public FragmentRuntimeInfo getFragmentRuntimeInfo() { + return request.getFragmentRuntimeInfo(); } - public long getFirstAttemptStartTime() { - return request.getFragmentRuntimeInfo().getFirstAttemptStartTime(); + public FragmentSpecProto getFragmentSpec() { + return request.getFragmentSpec(); } - - public long getDagStartTime() { - return request.getFragmentRuntimeInfo().getDagStartTime(); - } - } http://git-wip-us.apache.org/repos/asf/hive/blob/a8ac648c/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index 25f7a81..7a01b39 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -67,6 +67,7 @@ public class TestTaskExecutorService { conf = new Configuration(); } + @Test(timeout = 5000) public void testWaitQueueComparator() throws InterruptedException { TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000); @@ -203,6 +204,42 @@ public class TestTaskExecutorService { } @Test(timeout = 5000) + public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException { + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 0, 100, 10), false, 100000); + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 0, 100, 1), false, 100000); + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 0, 100, 5), false, 100000); + + EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>( + new TaskExecutorService.ShortestJobFirstComparator(), 4); + + assertNull(queue.offer(r1)); + assertNull(queue.offer(r2)); + assertNull(queue.offer(r3)); + + assertEquals(r2, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r1, queue.take()); + } + + @Test(timeout = 5000) + public void testWaitQueueComparatorParallelism() throws InterruptedException { + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 10, 3, 100, 1), false, 100000); // 7 pending + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 10, 7, 100, 1), false, 100000); // 3 pending + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 10, 5, 100, 1), false, 100000); // 5 pending + + EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>( + new TaskExecutorService.ShortestJobFirstComparator(), 4); + + assertNull(queue.offer(r1)); + assertNull(queue.offer(r2)); + assertNull(queue.offer(r3)); + + assertEquals(r2, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r1, queue.take()); + } + + @Test(timeout = 5000) public void testPreemptionQueueComparator() throws InterruptedException { TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 2, 100), false, 100000); TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 4, 200), false, 100000); @@ -344,8 +381,15 @@ public class TestTaskExecutorService { // ----------- Helper classes and methods go after this point. Tests above this ----------- - private SubmitWorkRequestProto createSubmitWorkRequestProto(int fragmentNumber, int parallelism, + // Create requests with the same within dag priority + private SubmitWorkRequestProto createSubmitWorkRequestProto(int fragmentNumber, int selfAndUpstreamParallelism, long attemptStartTime) { + return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, attemptStartTime, 1); + } + + private SubmitWorkRequestProto createSubmitWorkRequestProto(int fragmentNumber, int selfAndUpstreamParallelism, + int selfAndUpstreamComplete, + long attemptStartTime, int withinDagPriority) { ApplicationId appId = ApplicationId.newInstance(9999, 72); TezDAGID dagId = TezDAGID.getInstance(appId, 1); TezVertexID vId = TezVertexID.getInstance(dagId, 35); @@ -360,7 +404,6 @@ public class TestTaskExecutorService { .setDagName("MockDag") .setFragmentNumber(fragmentNumber) .setVertexName("MockVertex") - .setVertexParallelism(parallelism) .setProcessorDescriptor( EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build()) .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost") @@ -371,6 +414,9 @@ public class TestTaskExecutorService { .FragmentRuntimeInfo .newBuilder() .setFirstAttemptStartTime(attemptStartTime) + .setNumSelfAndUpstreamTasks(selfAndUpstreamParallelism) + .setNumSelfAndUpstreamCompletedTasks(selfAndUpstreamComplete) + .setWithinDagPriority(withinDagPriority) .build()) .build(); } http://git-wip-us.apache.org/repos/asf/hive/blob/a8ac648c/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java index ad2a15b..1929439 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService2.java @@ -81,8 +81,15 @@ public class TestTaskExecutorService2 { conf = new Configuration(); } - private SubmitWorkRequestProto createRequest(int fragmentNumber, int parallelism, int dagStartTime, - int attemptStartTime) { + private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, int dagStartTime, + int attemptStartTime) { + // Same priority for all tasks. + return createRequest(fragmentNumber, numSelfAndUpstreamTasks, 0, dagStartTime, attemptStartTime, 1); + } + + private SubmitWorkRequestProto createRequest(int fragmentNumber, int numSelfAndUpstreamTasks, + int numSelfAndUpstreamComplete, int dagStartTime, + int attemptStartTime, int withinDagPriority) { ApplicationId appId = ApplicationId.newInstance(9999, 72); TezDAGID dagId = TezDAGID.getInstance(appId, 1); TezVertexID vId = TezVertexID.getInstance(dagId, 35); @@ -97,7 +104,6 @@ public class TestTaskExecutorService2 { .setDagName("MockDag") .setFragmentNumber(fragmentNumber) .setVertexName("MockVertex") - .setVertexParallelism(parallelism) .setProcessorDescriptor( EntityDescriptorProto.newBuilder().setClassName("MockProcessor").build()) .setFragmentIdentifierString(taId.toString()).build()).setAmHost("localhost") @@ -109,6 +115,9 @@ public class TestTaskExecutorService2 { .newBuilder() .setDagStartTime(dagStartTime) .setFirstAttemptStartTime(attemptStartTime) + .setNumSelfAndUpstreamTasks(numSelfAndUpstreamTasks) + .setNumSelfAndUpstreamCompletedTasks(numSelfAndUpstreamComplete) + .setWithinDagPriority(withinDagPriority) .build()) .build(); } @@ -270,6 +279,43 @@ public class TestTaskExecutorService2 { assertEquals(r2, queue.take()); } + @Test(timeout = 5000) + public void testWaitQueueComparatorWithinDagPriority() throws InterruptedException { + TaskWrapper r1 = createTaskWrapper(createRequest(1, 1, 0, 100, 100, 10), false, 100000); + TaskWrapper r2 = createTaskWrapper(createRequest(2, 1, 0, 100, 100, 1), false, 100000); + TaskWrapper r3 = createTaskWrapper(createRequest(3, 1, 0, 100, 100, 5), false, 100000); + + EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>( + new TaskExecutorService.ShortestJobFirstComparator(), 4); + + assertNull(queue.offer(r1)); + assertNull(queue.offer(r2)); + assertNull(queue.offer(r3)); + + assertEquals(r2, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r1, queue.take()); + } + + @Test(timeout = 5000) + public void testWaitQueueComparatorParallelism() throws InterruptedException { + TaskWrapper r1 = createTaskWrapper(createRequest(1, 10, 3, 100, 100, 1), false, 100000); + TaskWrapper r2 = createTaskWrapper(createRequest(2, 10, 7, 100, 100, 1), false, 100000); + TaskWrapper r3 = createTaskWrapper(createRequest(3, 10, 5, 100, 100, 1), false, 100000); + + EvictingPriorityBlockingQueue<TaskWrapper> queue = new EvictingPriorityBlockingQueue<>( + new TaskExecutorService.ShortestJobFirstComparator(), 4); + + assertNull(queue.offer(r1)); + assertNull(queue.offer(r2)); + assertNull(queue.offer(r3)); + + assertEquals(r2, queue.take()); + assertEquals(r3, queue.take()); + assertEquals(r1, queue.take()); + } + + private TaskWrapper createTaskWrapper(SubmitWorkRequestProto request, boolean canFinish, int workTime) { MockRequest mockRequest = new MockRequest(request, canFinish, workTime); TaskWrapper taskWrapper = new TaskWrapper(mockRequest, null);