This is an automated email from the ASF dual-hosted git repository.

abstractdog pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new a41e99a  HIVE-23210: Fix shortestjobcomparator when jobs submitted 
have 1 task their vertices (Panagiotis Garefalakis via Rajesh Balamohan)
a41e99a is described below

commit a41e99aba29bf2a0a8436546d2ab72b3d4ef6ff5
Author: Panagiotis Garefalakis <panga...@gmail.com>
AuthorDate: Tue Apr 21 11:00:18 2020 +0200

    HIVE-23210: Fix shortestjobcomparator when jobs submitted have 1 task their 
vertices (Panagiotis Garefalakis via Rajesh Balamohan)
    
    Signed-off-by: Laszlo Bodor <bodorlaszlo0...@gmail.com>
---
 .../comparator/ShortestJobFirstComparator.java     |  20 ++-
 .../llap/daemon/impl/TaskExecutorTestHelpers.java  |   2 +-
 .../comparator/TestShortestJobFirstComparator.java | 140 ++++++++++++++++++++-
 3 files changed, 157 insertions(+), 5 deletions(-)

diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
index 9d7af7e..c9004d7 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/comparator/ShortestJobFirstComparator.java
@@ -26,7 +26,7 @@ public class ShortestJobFirstComparator extends 
LlapQueueComparatorBase {
     LlapDaemonProtocolProtos.FragmentRuntimeInfo fri1 = 
o1.getFragmentRuntimeInfo();
     LlapDaemonProtocolProtos.FragmentRuntimeInfo fri2 = 
o2.getFragmentRuntimeInfo();
 
-    // Check if these belong to the same task, and work with withinDagPriority
+    // Check if these belong to the same DAG, and work with withinDagPriority
     if (o1.getQueryId().equals(o2.getQueryId())) {
       // Same Query
 
@@ -49,7 +49,20 @@ public class ShortestJobFirstComparator extends 
LlapQueueComparatorBase {
     long waitTime2 = fri2.getCurrentAttemptStartTime() - 
fri2.getFirstAttemptStartTime();
 
     if (waitTime1 == 0 || waitTime2 == 0) {
-      return knownPending1 - knownPending2;
+      // first attempt for one of those
+      if (knownPending1 == knownPending2) {
+        // exactly same number of pending tasks, avoid meddling with FIFO
+        if (waitTime1 == waitTime2) {
+          // first attempt for both
+          return Long.compare(fri1.getCurrentAttemptStartTime(), 
fri2.getCurrentAttemptStartTime());
+        }
+        // pick the one which has waited the longest, since it might have 
other bushy branches in
+        // the query to join with, because pending is only the parent part of 
this node from the DAG
+        return waitTime2 == 0 ? -1 : 1;
+      }
+      // invariant: different number of pending tasks (pending1 != pending2)
+      // if either of them is 1, then other one is greater and this comparison 
is enough
+      return Long.compare(knownPending1, knownPending2);
     }
 
     double ratio1 = (double) knownPending1 / (double) waitTime1;
@@ -60,6 +73,7 @@ public class ShortestJobFirstComparator extends 
LlapQueueComparatorBase {
       return 1;
     }
 
-    return 0;
+    // when ratio is the same, pick the one which has waited the longest
+    return Long.compare(fri1.getCurrentAttemptStartTime(), 
fri2.getCurrentAttemptStartTime());
   }
 }
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
index 69e1d87..50dec47 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java
@@ -131,7 +131,7 @@ public class TaskExecutorTestHelpers {
       int fragmentNumber, int selfAndUpstreamParallelism,
       int selfAndUpstreamComplete, long firstAttemptStartTime,
       long currentAttemptStartTime, int withinDagPriority) {
-    return createSubmitWorkRequestProto(fragmentNumber, 
selfAndUpstreamParallelism, 0, firstAttemptStartTime,
+    return createSubmitWorkRequestProto(fragmentNumber, 
selfAndUpstreamParallelism, selfAndUpstreamComplete, firstAttemptStartTime,
         currentAttemptStartTime, withinDagPriority, "MockDag", false);
   }
 
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
index 048e1d7..4b7ec07 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/comparator/TestShortestJobFirstComparator.java
@@ -18,9 +18,11 @@ import static 
org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.cr
 import static 
org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import org.apache.hadoop.hive.llap.daemon.impl.EvictingPriorityBlockingQueue;
 import org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorService.TaskWrapper;
+import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import org.junit.Test;
 
 public class TestShortestJobFirstComparator {
@@ -82,12 +84,15 @@ public class TestShortestJobFirstComparator {
     assertNull(queue.offer(r1, 0));
     assertEquals(r1, queue.peek());
     assertNull(queue.offer(r2, 0));
+    // q2 can not finish thus q1 remains in top
     assertEquals(r1, queue.peek());
     assertNull(queue.offer(r3, 0));
+    // q1 is waiting longer than q3
     assertEquals(r1, queue.peek());
     assertNull(queue.offer(r4, 0));
+    // q4 can not finish thus q1 remains in top
     assertEquals(r1, queue.peek());
-    // offer accepted and r4 gets evicted
+    // offer accepted and r4 gets evicted (later start-time than q4)
     assertEquals(r4, queue.offer(r5, 0));
     assertEquals(r1, queue.take());
     assertEquals(r3, queue.take());
@@ -278,4 +283,137 @@ public class TestShortestJobFirstComparator {
     assertEquals(r3, queue.take());
     assertEquals(r1, queue.take());
   }
+
+  @Test(timeout = 60000)
+  public void testWaitQueueAging() throws InterruptedException {
+    // Different Queries (DAGs) where all (different) fragments have
+    // upstream parallelism of 1. They also have 1 task, which means first
+    // & current attempt time would be the same.
+    TaskWrapper[] r = new TaskWrapper[50];
+
+    for (int i = 0; i < 50; i++) {
+      LlapDaemonProtocolProtos.SubmitWorkRequestProto proto =
+              createSubmitWorkRequestProto(i, 1, 100 + i, 100 + i, "q" + i, 
true);
+      r[i] = createTaskWrapper(proto, true, 100000);
+    }
+
+    // Make sure we dont have evictions triggered (maxSize = taskSize)
+    EvictingPriorityBlockingQueue<TaskWrapper> queue =
+            new EvictingPriorityBlockingQueue<>(new 
ShortestJobFirstComparator(), 50);
+
+    for (int i = 0; i < 50; i++) {
+      assertNull(queue.offer(r[i], 0));
+    }
+
+    TaskWrapper prev = queue.take();
+    for (int i = 1; i < 50; i++) {
+      TaskWrapper curr = queue.take();
+      // Make sure order is respected (earlier requestStartTime first)
+      assertTrue(curr.getRequestId().compareTo(prev.getRequestId()) > 0);
+      prev = curr;
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testWaitQueueEdgeCases() {
+    // Make sure we dont have evictions triggered (maxSize = taskSize)
+    EvictingPriorityBlockingQueue<TaskWrapper> queue =
+            new EvictingPriorityBlockingQueue<>(new 
ShortestJobFirstComparator(), 10);
+
+    // same number of pending tasks (longer waitTime has priority)
+    // Single task DAG with same start and attempt time (wait-time zero)
+    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 
1000, 1000, "q11", true), true, 1000);
+    // Multi task DAG with 11 out of 12 task completed
+    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 12, 11, 
1000, 1500,1, "q12", true), true, 1000);
+    assertNull(queue.offer(r1, 0));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2, 0));
+    assertEquals(r2, queue.peek());
+
+    queue.remove(r1);
+    queue.remove(r2);
+    assertTrue(queue.isEmpty());
+
+    // Single task DAG with different start and attempt time
+    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 800, 1000, 
"q11", true), true, 1000);
+    assertNull(queue.offer(r1, 0));
+    assertEquals(r1, queue.peek()); // ratio = 1/200 = 0.005
+    assertNull(queue.offer(r2, 0));
+    assertEquals(r2, queue.peek()); // ratio = 1/500 = 0.002
+
+    queue.remove(r1);
+    queue.remove(r2);
+    assertTrue(queue.isEmpty());
+
+    // same waitTime -> lower number of pending has priority
+    // Single task DAG with different start and attempt time
+    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 1000, 1000, 
"q11", true), true, 1000);
+    // Multi-task DAG with 5 out of 12
+    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 12, 5, 1000, 1000, 
1, "q12", true), true, 1000);
+
+    // pending/wait-time -> r2 has lower priority because it has more pending 
tasks
+    assertNull(queue.offer(r1, 0));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2, 0));
+    assertEquals(r1, queue.peek());
+
+    queue.remove(r1);
+    queue.remove(r2);
+    assertTrue(queue.isEmpty());
+
+    // waitTime1==waitTime2 AND pending1==pending2 -> earlier startTime gets 
priority
+    // Single task DAG with different start and attempt time
+    r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 5, 800, 1000, 
"q11", true), true, 1000);
+    // Multi-task DAG with 5 out of 12
+    r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 12, 7, 700, 900, 1, 
"q12", true), true, 1000);
+
+    // r2 started earlier it should thus receive priority
+    assertNull(queue.offer(r1, 0));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2, 0));
+    assertEquals(r2, queue.peek());
+  }
+
+  @Test(timeout = 60000)
+  public void testWaitQueueAgingComplex() throws InterruptedException {
+    // Make sure we dont have evictions triggered (maxSize = taskSize)
+    EvictingPriorityBlockingQueue<TaskWrapper> queue =
+            new EvictingPriorityBlockingQueue<>(new 
ShortestJobFirstComparator(), 10);
+
+    // Single-Task DAGs
+    TaskWrapper r1 = createTaskWrapper(createSubmitWorkRequestProto(1, 1, 200, 
200, "q1", true), true, 1000);
+    TaskWrapper r2 = createTaskWrapper(createSubmitWorkRequestProto(2, 1, 199, 
199, "q2", true), true, 1000);
+    TaskWrapper r3 = createTaskWrapper(createSubmitWorkRequestProto(3, 1, 300, 
310, "q3", true), true, 1000);
+    TaskWrapper r4 = createTaskWrapper(createSubmitWorkRequestProto(4, 1, 400, 
420, "q4", true), true, 1000);
+    TaskWrapper r5 = createTaskWrapper(createSubmitWorkRequestProto(5, 1, 500, 
521, "q5", true), true, 1000);
+
+    assertNull(queue.offer(r1, 0));
+    assertEquals(r1, queue.peek());
+    assertNull(queue.offer(r2, 0));
+    assertEquals(r2, queue.peek());
+    assertNull(queue.offer(r3, 0));
+    assertEquals(r3, queue.peek());
+    assertNull(queue.offer(r4, 0));
+    assertEquals(r4, queue.peek());
+    assertNull(queue.offer(r5, 0));
+    assertEquals(r5, queue.peek());
+
+    // Multi-Task DAGs
+    TaskWrapper r6 = createTaskWrapper(createSubmitWorkRequestProto(6, 10, 
100, 200, "q6", true), true, 1000);
+    TaskWrapper r7 = createTaskWrapper(createSubmitWorkRequestProto(7, 10, 
200, 400, "q7", true), true, 1000);
+    TaskWrapper r8 = createTaskWrapper(createSubmitWorkRequestProto(8, 10, 
300, 600, "q8", true), true, 1000);
+    TaskWrapper r9 = createTaskWrapper(createSubmitWorkRequestProto(9, 10, 
400, 800, "q9", true), true, 1000);
+    TaskWrapper r10 = createTaskWrapper(createSubmitWorkRequestProto(10, 10, 
500, 1000, "q10", true), true, 1000);
+
+    assertNull(queue.offer(r6, 0));
+    assertEquals(r5, queue.peek());
+    assertNull(queue.offer(r7, 0));
+    assertEquals(r5, queue.peek());
+    assertNull(queue.offer(r8, 0));
+    assertEquals(r8, queue.peek()); // r5: 1/21 (0.047) -> r8: 10/300 (0.033)
+    assertNull(queue.offer(r9, 0));
+    assertEquals(r9, queue.peek()); // r9: 10/400 (0.025)
+    assertNull(queue.offer(r10, 0));
+    assertEquals(r10, queue.peek()); // r10: 10/500 (0.02)
+  }
 }

Reply via email to