This is an automated email from the ASF dual-hosted git repository. abstractdog pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new a41e99a HIVE-23210: Fix shortestjobcomparator when jobs submitted have 1 task their vertices (Panagiotis Garefalakis via Rajesh Balamohan) a41e99a is described below commit a41e99aba29bf2a0a8436546d2ab72b3d4ef6ff5 Author: Panagiotis Garefalakis <panga...@gmail.com> AuthorDate: Tue Apr 21 11:00:18 2020 +0200 HIVE-23210: Fix shortestjobcomparator when jobs submitted have 1 task their vertices (Panagiotis Garefalakis via Rajesh Balamohan) Signed-off-by: Laszlo Bodor <bodorlaszlo0...@gmail.com> --- .../comparator/ShortestJobFirstComparator.java | 20 ++- .../llap/daemon/impl/TaskExecutorTestHelpers.java | 2 +- .../comparator/TestShortestJobFirstComparator.java | 140 ++++++++++++++++++++- 3 files changed, 157 insertions(+), 5 deletions(-) diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java index 9d7af7e..c9004d7 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java @@ -26,7 +26,7 @@ public class ShortestJobFirstComparator extends LlapQueueComparatorBase { LlapDaemonProtocolProtos.FragmentRuntimeInfo fri1 = o1.getFragmentRuntimeInfo(); LlapDaemonProtocolProtos.FragmentRuntimeInfo fri2 = o2.getFragmentRuntimeInfo(); - // Check if these belong to the same task, and work with withinDagPriority + // Check if these belong to the same DAG, and work with withinDagPriority if (o1.getQueryId().equals(o2.getQueryId())) { // Same Query @@ -49,7 +49,20 @@ public class ShortestJobFirstComparator extends LlapQueueComparatorBase { long waitTime2 = fri2.getCurrentAttemptStartTime() - fri2.getFirstAttemptStartTime(); if (waitTime1 == 0 || waitTime2 == 0) { - return knownPending1 - knownPending2; + // first attempt for one of those + if (knownPending1 == knownPending2) { + // exactly same number of pending tasks, avoid meddling with FIFO + if (waitTime1 == waitTime2) { + // first attempt for both + return Long.compare(fri1.getCurrentAttemptStartTime(), fri2.getCurrentAttemptStartTime()); + } + // pick the one which has waited the longest, since it might have other bushy branches in + // the query to join with, because pending is only the parent part of this node from the DAG + return waitTime2 == 0 ? -1 : 1; + } + // invariant: different number of pending tasks (pending1 != pending2) + // if either of them is 1, then other one is greater and this comparison is enough + return Long.compare(knownPending1, knownPending2); } double ratio1 = (double) knownPending1 / (double) waitTime1; @@ -60,6 +73,7 @@ public class ShortestJobFirstComparator extends LlapQueueComparatorBase { return 1; } - return 0; + // when ratio is the same, pick the one which has waited the longest + return Long.compare(fri1.getCurrentAttemptStartTime(), fri2.getCurrentAttemptStartTime()); } } diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 69e1d87..50dec47 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -131,7 +131,7 @@ public class TaskExecutorTestHelpers { int fragmentNumber, int selfAndUpstreamParallelism, int selfAndUpstreamComplete, long firstAttemptStartTime, long currentAttemptStartTime, int withinDagPriority) { - return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, 0, firstAttemptStartTime, + return createSubmitWorkRequestProto(fragmentNumber, selfAndUpstreamParallelism, selfAndUpstreamComplete, firstAttemptStartTime, currentAttemptStartTime, withinDagPriority, "MockDag", false); } diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java index 048e1d7..4b7ec07 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java @@ -18,9 +18,11 @@ import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.cr import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; import org.apache.hadoop.hive.llap.daemon.impl.EvictingPriorityBlockingQueue; import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper; +import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.junit.Test; public class TestShortestJobFirstComparator { @@ -82,12 +84,15 @@ public class TestShortestJobFirstComparator { assertNull(queue.offer(r1, 0)); assertEquals(r1, queue.peek()); assertNull(queue.offer(r2, 0)); + // q2 can not finish thus q1 remains in top assertEquals(r1, queue.peek()); assertNull(queue.offer(r3, 0)); + // q1 is waiting longer than q3 assertEquals(r1, queue.peek()); assertNull(queue.offer(r4, 0)); + // q4 can not finish thus q1 remains in top assertEquals(r1, queue.peek()); - // offer accepted and r4 gets evicted + // offer accepted and r4 gets evicted (later start-time than q4) assertEquals(r4, queue.offer(r5, 0)); assertEquals(r1, queue.take()); assertEquals(r3, queue.take()); @@ -278,4 +283,137 @@ public class TestShortestJobFirstComparator { assertEquals(r3, queue.take()); assertEquals(r1, queue.take()); } + + @Test(timeout = 60000) + public void testWaitQueueAging() throws InterruptedException { + // Different Queries (DAGs) where all (different) fragments have + // upstream parallelism of 1. They also have 1 task, which means first + // & current attempt time would be the same. + TaskWrapper[] r = new TaskWrapper[50]; + + for (int i = 0; i < 50; i++) { + LlapDaemonProtocolProtos.SubmitWorkRequestProto proto = + createSubmitWorkRequestProto(i, 1, 100 + i, 100 + i, "q" + i, true); + r[i] = createTaskWrapper(proto, true, 100000); + } + + // Make sure we dont have evictions triggered (maxSize = taskSize) + EvictingPriorityBlockingQueue<TaskWrapper> queue = + new EvictingPriorityBlockingQueue<>(new ShortestJobFirstComparator(), 50); + + for (int i = 0; i < 50; i++) { + assertNull(queue.offer(r[i], 0)); + } + + TaskWrapper prev = queue.take(); + for (int i = 1; i < 50; i++) { + TaskWrapper curr = queue.take(); + // Make sure order is respected (earlier requestStartTime first) + assertTrue(curr.getRequestId().compareTo(prev.getRequestId()) > 0); + prev = curr; + } + } + + @Test(timeout = 60000) + public void testWaitQueueEdgeCases() { + // Make sure we dont have evictions triggered (maxSize = taskSize) + EvictingPriorityBlockingQueue<TaskWrapper> queue = + new EvictingPriorityBlockingQueue<>(new ShortestJobFirstComparator(), 10); + + // same number of pending tasks (longer waitTime has priority) + // Single task DAG with same start and attempt time (wait-time zero) + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 1000, 1000, "q11", true), true, 1000); + // Multi task DAG with 11 out of 12 task completed + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 12, 11, 1000, 1500,1, "q12", true), true, 1000); + assertNull(queue.offer(r1, 0)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2, 0)); + assertEquals(r2, queue.peek()); + + queue.remove(r1); + queue.remove(r2); + assertTrue(queue.isEmpty()); + + // Single task DAG with different start and attempt time + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 800, 1000, "q11", true), true, 1000); + assertNull(queue.offer(r1, 0)); + assertEquals(r1, queue.peek()); // ratio = 1/200 = 0.005 + assertNull(queue.offer(r2, 0)); + assertEquals(r2, queue.peek()); // ratio = 1/500 = 0.002 + + queue.remove(r1); + queue.remove(r2); + assertTrue(queue.isEmpty()); + + // same waitTime -> lower number of pending has priority + // Single task DAG with different start and attempt time + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 1000, 1000, "q11", true), true, 1000); + // Multi-task DAG with 5 out of 12 + r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 12, 5, 1000, 1000, 1, "q12", true), true, 1000); + + // pending/wait-time -> r2 has lower priority because it has more pending tasks + assertNull(queue.offer(r1, 0)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2, 0)); + assertEquals(r1, queue.peek()); + + queue.remove(r1); + queue.remove(r2); + assertTrue(queue.isEmpty()); + + // waitTime1==waitTime2 AND pending1==pending2 -> earlier startTime gets priority + // Single task DAG with different start and attempt time + r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 5, 800, 1000, "q11", true), true, 1000); + // Multi-task DAG with 5 out of 12 + r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 12, 7, 700, 900, 1, "q12", true), true, 1000); + + // r2 started earlier it should thus receive priority + assertNull(queue.offer(r1, 0)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2, 0)); + assertEquals(r2, queue.peek()); + } + + @Test(timeout = 60000) + public void testWaitQueueAgingComplex() throws InterruptedException { + // Make sure we dont have evictions triggered (maxSize = taskSize) + EvictingPriorityBlockingQueue<TaskWrapper> queue = + new EvictingPriorityBlockingQueue<>(new ShortestJobFirstComparator(), 10); + + // Single-Task DAGs + TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 200, 200, "q1", true), true, 1000); + TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 199, 199, "q2", true), true, 1000); + TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 300, 310, "q3", true), true, 1000); + TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 400, 420, "q4", true), true, 1000); + TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 1, 500, 521, "q5", true), true, 1000); + + assertNull(queue.offer(r1, 0)); + assertEquals(r1, queue.peek()); + assertNull(queue.offer(r2, 0)); + assertEquals(r2, queue.peek()); + assertNull(queue.offer(r3, 0)); + assertEquals(r3, queue.peek()); + assertNull(queue.offer(r4, 0)); + assertEquals(r4, queue.peek()); + assertNull(queue.offer(r5, 0)); + assertEquals(r5, queue.peek()); + + // Multi-Task DAGs + TaskWrapper r6 = createTaskWrapper(createSubmitWorkRequestProto(6, 10, 100, 200, "q6", true), true, 1000); + TaskWrapper r7 = createTaskWrapper(createSubmitWorkRequestProto(7, 10, 200, 400, "q7", true), true, 1000); + TaskWrapper r8 = createTaskWrapper(createSubmitWorkRequestProto(8, 10, 300, 600, "q8", true), true, 1000); + TaskWrapper r9 = createTaskWrapper(createSubmitWorkRequestProto(9, 10, 400, 800, "q9", true), true, 1000); + TaskWrapper r10 = createTaskWrapper(createSubmitWorkRequestProto(10, 10, 500, 1000, "q10", true), true, 1000); + + assertNull(queue.offer(r6, 0)); + assertEquals(r5, queue.peek()); + assertNull(queue.offer(r7, 0)); + assertEquals(r5, queue.peek()); + assertNull(queue.offer(r8, 0)); + assertEquals(r8, queue.peek()); // r5: 1/21 (0.047) -> r8: 10/300 (0.033) + assertNull(queue.offer(r9, 0)); + assertEquals(r9, queue.peek()); // r9: 10/400 (0.025) + assertNull(queue.offer(r10, 0)); + assertEquals(r10, queue.peek()); // r10: 10/500 (0.02) + } }