Repository: hive Updated Branches: refs/heads/branch-2.1 f3a05bd1e -> cb2330d94
HIVE-13599. LLAP: Incorrect handling of the preemption queue on finishable state updates. (Siddharth Seth, reviewed by Prasanth Jayachandran) (cherry picked from commit 5776025c0a18f15f28dfccee24c08a6e951f8e2a) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cb2330d9 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cb2330d9 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cb2330d9 Branch: refs/heads/branch-2.1 Commit: cb2330d94583d459c8041bfa257df3ab3e15daec Parents: f3a05bd Author: Siddharth Seth <ss...@apache.org> Authored: Mon Jun 6 20:36:03 2016 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Mon Jun 6 20:37:35 2016 -0700 ---------------------------------------------------------------------- .../llap/daemon/impl/TaskExecutorService.java | 80 +++++++++---- .../daemon/impl/TestTaskExecutorService.java | 116 ++++++++++++++++++- 2 files changed, 167 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/cb2330d9/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 1e302e8..7744611 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -94,7 +94,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta private final ListeningExecutorService waitQueueExecutorService; // Thread pool for callbacks on completion of execution of a work unit. private final ListeningExecutorService executionCompletionExecutorService; - private final BlockingQueue<TaskWrapper> preemptionQueue; + + @VisibleForTesting + final BlockingQueue<TaskWrapper> preemptionQueue; private final boolean enablePreemption; private final ThreadPoolExecutor threadPoolExecutor; private final AtomicInteger numSlotsAvailable; @@ -183,6 +185,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta @Override public Set<String> getExecutorsStatus() { + // TODO Change this method to make the output easier to parse (parse programmatically) Set<String> result = new HashSet<>(); StringBuilder value = new StringBuilder(); for (Map.Entry<String, TaskWrapper> e : knownTasks.entrySet()) { @@ -449,11 +452,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta if (isDebugEnabled) { LOG.debug("Removing {} from preemptionQueue", fragmentId); } - taskWrapper.setIsInPreemptableQueue(false); - preemptionQueue.remove(taskWrapper); - if (metrics != null) { - metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); - } + removeFromPreemptionQueue(taskWrapper); } taskWrapper.getTaskRunnerCallable().killTask(); } else { @@ -463,7 +462,8 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta } } - private void trySchedule(final TaskWrapper taskWrapper) throws RejectedExecutionException { + @VisibleForTesting + void trySchedule(final TaskWrapper taskWrapper) throws RejectedExecutionException { synchronized (lock) { boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish(); @@ -508,7 +508,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta LOG.debug("Preemption Queue: " + preemptionQueue); } - TaskWrapper pRequest = removeAndGetFromPreemptionQueue(); + TaskWrapper pRequest = removeAndGetNextFromPreemptionQueue(); // Avoid preempting tasks which are finishable - callback still to be processed. if (pRequest != null) { @@ -548,18 +548,12 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta if (newFinishableState == true && taskWrapper.isInPreemptionQueue()) { LOG.debug("Removing {} from preemption queue because it's state changed to {}", taskWrapper.getRequestId(), newFinishableState); - preemptionQueue.remove(taskWrapper.getTaskRunnerCallable()); - if (metrics != null) { - metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); - } + removeFromPreemptionQueue(taskWrapper); } else if (newFinishableState == false && !taskWrapper.isInPreemptionQueue() && !taskWrapper.isInWaitQueue()) { LOG.debug("Adding {} to preemption queue since finishable state changed to {}", taskWrapper.getRequestId(), newFinishableState); - preemptionQueue.offer(taskWrapper); - if (metrics != null) { - metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); - } + addToPreemptionQueue(taskWrapper); } lock.notify(); } @@ -567,7 +561,12 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta private void addToPreemptionQueue(TaskWrapper taskWrapper) { synchronized (lock) { - preemptionQueue.add(taskWrapper); + boolean added = preemptionQueue.offer(taskWrapper); + if (!added) { + LOG.warn("Failed to add element {} to preemption queue. Terminating", taskWrapper); + Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), + new IllegalStateException("Preemption queue full. Cannot proceed")); + } taskWrapper.setIsInPreemptableQueue(true); if (metrics != null) { metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); @@ -575,10 +574,30 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta } } - private TaskWrapper removeAndGetFromPreemptionQueue() { + /** + * Remove the specified taskWrapper from the preemption queue + * @param taskWrapper the taskWrapper to be removed + * @return true if the element existed in the queue and wasa removed, false otherwise + */ + private boolean removeFromPreemptionQueue(TaskWrapper taskWrapper) { + synchronized (lock) { + return removeFromPreemptionQueueUnlocked(taskWrapper); + } + } + + private boolean removeFromPreemptionQueueUnlocked(TaskWrapper taskWrapper) { + boolean removed = preemptionQueue.remove(taskWrapper); + taskWrapper.setIsInPreemptableQueue(false); + if (metrics != null) { + metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); + } + return removed; + } + + private TaskWrapper removeAndGetNextFromPreemptionQueue() { TaskWrapper taskWrapper; synchronized (lock) { - taskWrapper = preemptionQueue.remove(); + taskWrapper = preemptionQueue.poll(); if (taskWrapper != null) { taskWrapper.setIsInPreemptableQueue(false); if (metrics != null) { @@ -603,6 +622,24 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta this.taskWrapper = taskWrapper; } + // By the time either success / failed are called, the task itself knows that it has terminated, + // and will ignore subsequent kill requests if they go out. + + // There's a race between removing the current task from the preemption queue and the actual scheduler + // attempting to take an element from the preemption queue to make space for another task. + // If the current element is removed to make space - that is OK, since the current task is completing and + // will end up making space for execution. Any kill message sent out by the scheduler to the task will + // be ignored, since the task knows it has completed (otherwise it would not be in this callback). + // + // If the task is removed from the queue as a result of this callback, and the scheduler happens to + // be in the section where it's looking for a preemptible task - the scheuler may end up pulling the + // next pre-emptible task and killing it (an extra preemption). + // TODO: This potential extra preemption can be avoided by synchronizing the entire tryScheduling block.\ + // This would essentially synchronize all operations - it would be better to see if there's an + // approach where multiple locks could be used to avoid single threaded operation. + // - It checks available and preempts (which could be this task) + // - Or this task completes making space, and removing the need for preemption + @Override public void onSuccess(TaskRunner2Result result) { knownTasks.remove(taskWrapper.getRequestId()); @@ -626,15 +663,12 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta // if this task was added to pre-emption list, remove it if (enablePreemption) { String state = reason == null ? "FAILED" : reason.name(); - boolean removed = preemptionQueue.remove(taskWrapper); + boolean removed = removeFromPreemptionQueueUnlocked(taskWrapper); if (removed && isInfoEnabled) { TaskRunnerCallable trc = taskWrapper.getTaskRunnerCallable(); LOG.info(TaskRunnerCallable.getTaskIdentifierString(trc.getRequest(), trc.getVertexSpec()) + " request " + state + "! Removed from preemption list."); } - if (metrics != null) { - metrics.setExecutorNumPreemptableRequests(preemptionQueue.size()); - } } numSlotsAvailable.incrementAndGet(); http://git-wip-us.apache.org/repos/asf/hive/blob/cb2330d9/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index 506f611..ac4e5f1 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -21,6 +21,9 @@ import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.cr import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createSubmitWorkRequestProto; import static org.apache.hadoop.hive.llap.daemon.impl.TaskExecutorTestHelpers.createTaskWrapper; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import java.util.HashMap; @@ -29,6 +32,7 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.PriorityBlockingQueue; +import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; @@ -76,9 +80,9 @@ public class TestTaskExecutorService { try { taskExecutorService.schedule(r1); - r1.awaitStart(); + awaitStartAndSchedulerRun(r1, taskExecutorService); taskExecutorService.schedule(r2); - r2.awaitStart(); + awaitStartAndSchedulerRun(r2, taskExecutorService); // Verify r1 was preempted. Also verify that it finished (single executor), otherwise // r2 could have run anyway. r1.awaitEnd(); @@ -104,6 +108,73 @@ public class TestTaskExecutorService { } @Test(timeout = 10000) + public void testPreemptionStateOnTaskMoveToFinishableState() throws InterruptedException { + + MockRequest r1 = createMockRequest(1, 1, 100, false, 20000l); + + TaskExecutorServiceForTest taskExecutorService = + new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true); + taskExecutorService.init(new Configuration()); + taskExecutorService.start(); + + try { + Scheduler.SubmissionState submissionState = taskExecutorService.schedule(r1); + assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState); + awaitStartAndSchedulerRun(r1, taskExecutorService); + + TaskWrapper taskWrapper = taskExecutorService.preemptionQueue.peek(); + assertNotNull(taskWrapper); + assertTrue(taskWrapper.isInPreemptionQueue()); + + // Now notify the executorService that the task has moved to finishable state. + taskWrapper.finishableStateUpdated(true); + TaskWrapper taskWrapper2 = taskExecutorService.preemptionQueue.peek(); + assertNull(taskWrapper2); + assertFalse(taskWrapper.isInPreemptionQueue()); + + r1.complete(); + r1.awaitEnd(); + } finally { + taskExecutorService.shutDown(false); + } + } + + @Test(timeout = 10000) + public void testPreemptionStateOnTaskMoveToNonFinishableState() throws InterruptedException { + + MockRequest r1 = createMockRequest(1, 1, 100, true, 20000l); + + TaskExecutorServiceForTest taskExecutorService = + new TaskExecutorServiceForTest(1, 2, ShortestJobFirstComparator.class.getName(), true); + taskExecutorService.init(new Configuration()); + taskExecutorService.start(); + + try { + Scheduler.SubmissionState submissionState = taskExecutorService.schedule(r1); + assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState); + awaitStartAndSchedulerRun(r1, taskExecutorService); + + TaskWrapper taskWrapper = taskExecutorService.preemptionQueue.peek(); + assertNull(taskWrapper); + assertEquals(1, taskExecutorService.knownTasks.size()); + taskWrapper = taskExecutorService.knownTasks.entrySet().iterator().next().getValue(); + assertFalse(taskWrapper.isInPreemptionQueue()); + + // Now notify the executorService that the task has moved to finishable state. + taskWrapper.finishableStateUpdated(false); + TaskWrapper taskWrapper2 = taskExecutorService.preemptionQueue.peek(); + assertNotNull(taskWrapper2); + assertTrue(taskWrapper2.isInPreemptionQueue()); + assertEquals(taskWrapper, taskWrapper2); + + r1.complete(); + r1.awaitEnd(); + } finally { + taskExecutorService.shutDown(false); + } + } + + @Test(timeout = 10000) public void testWaitQueuePreemption() throws InterruptedException { MockRequest r1 = createMockRequest(1, 1, 100, true, 20000l); MockRequest r2 = createMockRequest(2, 1, 200, false, 20000l); @@ -121,7 +192,7 @@ public class TestTaskExecutorService { // TODO HIVE-11687. Remove the awaitStart once offer can handle (waitQueueSize + numFreeExecutionSlots) // This currently serves to allow the task to be removed from the waitQueue. - r1.awaitStart(); + awaitStartAndSchedulerRun(r1, taskExecutorService); Scheduler.SubmissionState submissionState = taskExecutorService.schedule(r2); assertEquals(Scheduler.SubmissionState.ACCEPTED, submissionState); @@ -155,7 +226,7 @@ public class TestTaskExecutorService { assertTrue(taskExecutorService.knownTasks.containsKey(r2.getRequestId())); assertTrue(taskExecutorService.knownTasks.containsKey(r5.getRequestId())); - r5.awaitStart(); + awaitStartAndSchedulerRun(r5, taskExecutorService); TaskExecutorServiceForTest.InternalCompletionListenerForTest icl5 = taskExecutorService.getInternalCompletionListenerForTest(r5.getRequestId()); r5.complete(); @@ -166,7 +237,7 @@ public class TestTaskExecutorService { assertEquals(1, taskExecutorService.knownTasks.size()); assertTrue(taskExecutorService.knownTasks.containsKey(r2.getRequestId())); - r2.awaitStart(); + awaitStartAndSchedulerRun(r2, taskExecutorService); TaskExecutorServiceForTest.InternalCompletionListenerForTest icl2 = taskExecutorService.getInternalCompletionListenerForTest(r2.getRequestId()); r2.complete(); @@ -180,13 +251,22 @@ public class TestTaskExecutorService { } - + private void awaitStartAndSchedulerRun(MockRequest mockRequest, + TaskExecutorServiceForTest taskExecutorServiceForTest) throws + InterruptedException { + mockRequest.awaitStart(); + taskExecutorServiceForTest.awaitTryScheduleIfInProgress(); + } private static class TaskExecutorServiceForTest extends TaskExecutorService { private final Lock iclCreationLock = new ReentrantLock(); private final Map<String, Condition> iclCreationConditions = new HashMap<>(); + private final Lock tryScheduleLock = new ReentrantLock(); + private final Condition tryScheduleCondition = tryScheduleLock.newCondition(); + private boolean isInTrySchedule = false; + public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName, boolean enablePreemption) { super(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption, @@ -196,6 +276,30 @@ public class TestTaskExecutorService { private ConcurrentMap<String, InternalCompletionListenerForTest> completionListeners = new ConcurrentHashMap<>(); @Override + void trySchedule(final TaskWrapper taskWrapper) throws RejectedExecutionException { + tryScheduleLock.lock(); + try { + isInTrySchedule = true; + super.trySchedule(taskWrapper); + isInTrySchedule = false; + tryScheduleCondition.signal(); + } finally { + tryScheduleLock.unlock(); + } + } + + private void awaitTryScheduleIfInProgress() throws InterruptedException { + tryScheduleLock.lock(); + try { + while (isInTrySchedule) { + tryScheduleCondition.await(); + } + } finally { + tryScheduleLock.unlock(); + } + } + + @Override InternalCompletionListener createInternalCompletionListener(TaskWrapper taskWrapper) { iclCreationLock.lock(); try {