This is an automated email from the ASF dual-hosted git repository. hashutosh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new e85731c HIVE-23443 : LLAP speculative task pre-emption seems to be not working (Prasanth J via Gopal V, Panos G) e85731c is described below commit e85731c42b6485412deefccf85f17e3ae9e0f403 Author: Prasanth Jayachandran <prasan...@apache.org> AuthorDate: Sun May 17 09:01:21 2020 -0700 HIVE-23443 : LLAP speculative task pre-emption seems to be not working (Prasanth J via Gopal V, Panos G) Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> --- .../hive/llap/daemon/impl/TaskExecutorService.java | 22 +++- .../llap/daemon/impl/TestTaskExecutorService.java | 134 +++++++++++++++++++++ 2 files changed, 152 insertions(+), 4 deletions(-) diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index d8b517d..1d6e852 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -867,7 +867,8 @@ public class TaskExecutorService extends AbstractService return sc; } - private void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishableState) { + @VisibleForTesting + void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishableState) { synchronized (lock) { LOG.debug("Fragment {} guaranteed state changed to {}; finishable {}, in wait queue {}, " + "in preemption queue {}", taskWrapper.getRequestId(), taskWrapper.isGuaranteed(), @@ -884,10 +885,20 @@ public class TaskExecutorService extends AbstractService taskWrapper.updateCanFinishForPriority(newFinishableState); forceReinsertIntoQueue(taskWrapper, isRemoved); } else { - taskWrapper.updateCanFinishForPriority(newFinishableState); - if (!newFinishableState && !taskWrapper.isInPreemptionQueue()) { - // No need to check guaranteed here; if it was false we would already be in the queue. + // if speculative task, any finishable state change should re-order the queue as speculative tasks are always + // not-guaranteed (re-order helps put non-finishable's ahead of finishable) + if (!taskWrapper.isGuaranteed()) { + removeFromPreemptionQueue(taskWrapper); + taskWrapper.updateCanFinishForPriority(newFinishableState); addToPreemptionQueue(taskWrapper); + } else { + // if guaranteed task, if the finishable state changed to non-finishable and if the task doesn't exist + // pre-emption queue, then add it so that it becomes candidate to kill + taskWrapper.updateCanFinishForPriority(newFinishableState); + if (!newFinishableState && !taskWrapper.isInPreemptionQueue()) { + // No need to check guaranteed here; if it was false we would already be in the queue. + addToPreemptionQueue(taskWrapper); + } } } @@ -896,6 +907,9 @@ public class TaskExecutorService extends AbstractService } private void addToPreemptionQueue(TaskWrapper taskWrapper) { + if (taskWrapper.isInPreemptionQueue()) { + return; + } synchronized (lock) { insertIntoPreemptionQueueOrFailUnlocked(taskWrapper); taskWrapper.setIsInPreemptableQueue(true); diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index ce9fce9..ff61fdd 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -236,6 +236,140 @@ public class TestTaskExecutorService { } } + @Test(timeout = 10000) + public void testPreemptionQueueOnFinishableStateUpdates() throws InterruptedException { + + long r1WorkTime = 1000L; + long r2WorkTime = 2000L; + long r3WorkTime = 2000L; + // all tasks start with non-finishable state + MockRequest r1 = createMockRequest(1, 2, 100, 200, false, r1WorkTime, false); + MockRequest r2 = createMockRequest(2, 1, 100, 200, false, r2WorkTime, false); + MockRequest r3 = createMockRequest(3, 3, 50, 200, false, r3WorkTime, false); + + + TaskExecutorServiceForTest taskExecutorService = + new TaskExecutorServiceForTest(4, 2, ShortestJobFirstComparator.class.getName(), true, mockMetrics); + taskExecutorService.init(new Configuration()); + taskExecutorService.start(); + + try { + String fragmentId1 = r1.getRequestId(); + Scheduler.SubmissionState submissionState1 = taskExecutorService.schedule(r1); + assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState1); + awaitStartAndSchedulerRun(r1, taskExecutorService); + + String fragmentId2 = r2.getRequestId(); + Scheduler.SubmissionState submissionState2 = taskExecutorService.schedule(r2); + assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState2); + awaitStartAndSchedulerRun(r2, taskExecutorService); + + String fragmentId3 = r3.getRequestId(); + Scheduler.SubmissionState submissionState3 = taskExecutorService.schedule(r3); + assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState3); + awaitStartAndSchedulerRun(r3, taskExecutorService); + + TaskWrapper taskWrapper = taskExecutorService.preemptionQueue.peek(); + assertNotNull(taskWrapper); + assertTrue(taskWrapper.isInPreemptionQueue()); + // all tasks are non-finishables, r2 has min tasks + assertEquals(fragmentId2, taskWrapper.getRequestId()); + assertEquals(3, taskExecutorService.preemptionQueue.size()); + + // to let us set the finishable state for tests + r1.setCanUpdateFinishable(); + r2.setCanUpdateFinishable(); + r3.setCanUpdateFinishable(); + + TaskWrapper taskWrapper1 = taskExecutorService.knownTasks.get(fragmentId1); + TaskWrapper taskWrapper2 = taskExecutorService.knownTasks.get(fragmentId2); + TaskWrapper taskWrapper3 = taskExecutorService.knownTasks.get(fragmentId3); + + // r2 is finishable now, so it should go to back of pre-emption queue. + taskExecutorService.finishableStateUpdated(taskWrapper2, true); + taskWrapper = taskExecutorService.preemptionQueue.peek(); + assertNotNull(taskWrapper); + assertTrue(taskWrapper.isInPreemptionQueue()); + // r1 is smallest among non-finishables, so should be first in queue + assertEquals(fragmentId1, taskWrapper.getRequestId()); + assertFalse(taskWrapper.canFinishForPriority()); + assertEquals(3, taskExecutorService.preemptionQueue.size()); + + // r1 is finishable now, so it should go to back of pre-emption queue. + taskExecutorService.finishableStateUpdated(taskWrapper1, true); + taskWrapper = taskExecutorService.preemptionQueue.peek(); + assertNotNull(taskWrapper); + assertTrue(taskWrapper.isInPreemptionQueue()); + // r3 is the only non-finishable + assertEquals(fragmentId3, taskWrapper.getRequestId()); + assertFalse(taskWrapper.canFinishForPriority()); + assertEquals(3, taskExecutorService.preemptionQueue.size()); + + // r3 is finishable now, so it should go to back of pre-emption queue. + taskExecutorService.finishableStateUpdated(taskWrapper3, true); + taskWrapper = taskExecutorService.preemptionQueue.peek(); + assertNotNull(taskWrapper); + assertTrue(taskWrapper.isInPreemptionQueue()); + // no more non-finishables left, r2 is smallest among the finishables + assertEquals(fragmentId2, taskWrapper.getRequestId()); + assertTrue(taskWrapper.canFinishForPriority()); + assertEquals(3, taskExecutorService.preemptionQueue.size()); + + // double notification test (nothing should change from the above sequence) + taskExecutorService.finishableStateUpdated(taskWrapper3, true);taskWrapper = taskExecutorService.preemptionQueue.peek(); + assertNotNull(taskWrapper); + assertTrue(taskWrapper.isInPreemptionQueue()); + // no more non-finishables left, r2 is smallest among the finishables + assertEquals(fragmentId2, taskWrapper.getRequestId()); + assertTrue(taskWrapper.canFinishForPriority()); + assertEquals(3, taskExecutorService.preemptionQueue.size()); + + // remove r2 from scheduler + taskExecutorService.killFragment(fragmentId2); + + taskWrapper = taskExecutorService.preemptionQueue.peek(); + assertNotNull(taskWrapper); + assertTrue(taskWrapper.isInPreemptionQueue()); + // no more non-finishables left, r1 is the smallest among the finishables + assertEquals(fragmentId1, taskWrapper.getRequestId()); + assertTrue(taskWrapper.canFinishForPriority()); + assertEquals(2, taskExecutorService.preemptionQueue.size()); + + // make r3 as non-finishable and make sure its at top of queue + taskExecutorService.finishableStateUpdated(taskWrapper3, false); + taskWrapper = taskExecutorService.preemptionQueue.peek(); + assertNotNull(taskWrapper); + assertTrue(taskWrapper.isInPreemptionQueue()); + // r3 is non-finishable and should be at top + assertEquals(fragmentId3, taskWrapper.getRequestId()); + assertFalse(taskWrapper.canFinishForPriority()); + assertEquals(2, taskExecutorService.preemptionQueue.size()); + // make sure the task is not added twice to pre-emption queue + taskExecutorService.tryScheduleUnderLock(taskWrapper); + assertEquals(2, taskExecutorService.preemptionQueue.size()); + + // remove r3 from scheduler + taskExecutorService.killFragment(fragmentId3); + + taskWrapper = taskExecutorService.preemptionQueue.peek(); + assertNotNull(taskWrapper); + assertTrue(taskWrapper.isInPreemptionQueue()); + // r1 is the only one left in queue and is finishable + assertEquals(fragmentId1, taskWrapper.getRequestId()); + assertTrue(taskWrapper.canFinishForPriority()); + assertEquals(1, taskExecutorService.preemptionQueue.size()); + + // remove r1 from scheduler + taskExecutorService.killFragment(fragmentId1); + + // no more left in queue + taskWrapper = taskExecutorService.preemptionQueue.peek(); + assertNull(taskWrapper); + } finally { + taskExecutorService.shutDown(false); + } + } + // Tests wait queue behaviour for fragments which have reported to the AM, but have not given up their executor slot. @Test (timeout = 10000) public void testWaitQueueAcceptAfterAMTaskReport() throws