Repository: hive Updated Branches: refs/heads/llap 025765382 -> 2faf01eed
HIVE-11660. LLAP: TestTaskExecutorService, TestLlapTaskSchedulerService are flaky. Also fixes a real scheduling issue in LlapTaskSchedulerService. (Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2faf01ee Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2faf01ee Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2faf01ee Branch: refs/heads/llap Commit: 2faf01eedaa32469f06d5408f599823e18096e28 Parents: 0257653 Author: Siddharth Seth <ss...@apache.org> Authored: Mon Aug 31 14:54:35 2015 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Mon Aug 31 14:54:35 2015 -0700 ---------------------------------------------------------------------- .../llap/daemon/impl/TaskExecutorService.java | 73 +++++++++++--------- .../dag/app/rm/LlapTaskSchedulerService.java | 12 +++- .../daemon/impl/TaskExecutorTestHelpers.java | 7 +- .../daemon/impl/TestTaskExecutorService.java | 48 +++++++++++-- .../app/rm/TestLlapTaskSchedulerService.java | 19 ++--- 5 files changed, 110 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/2faf01ee/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index badeb63..875aef6 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -223,6 +223,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta while (!isShutdown.get()) { + RejectedExecutionException rejectedException = null; synchronized (lock) { // Since schedule() can be called from multiple threads, we peek the wait queue, // try scheduling the task and then remove the task if scheduling is successful. @@ -259,17 +260,21 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta // queue again to pick up the task at the highest priority. continue; } - } - - boolean scheduled = trySchedule(task); - if (scheduled) { - // wait queue could have been re-ordered in the mean time because of concurrent task - // submission. So remove the specific task instead of the head task. - synchronized (lock) { + try { + trySchedule(task); + // wait queue could have been re-ordered in the mean time because of concurrent task + // submission. So remove the specific task instead of the head task. waitQueue.remove(task); + } catch (RejectedExecutionException e) { + rejectedException = e; } } + // Handle the rejection outside of the lock + if (rejectedException !=null) { + handleScheduleAttemptedRejection(task); + } + synchronized (lock) { while (waitQueue.isEmpty()) { if (!isShutdown.get()) { @@ -318,6 +323,10 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta synchronized (lock) { // If the queue does not have capacity, it does not throw a Rejection. Instead it will // return the task with the lowest priority, which could be the task which is currently being processed. + + // TODO HIVE-11687 It's possible for a bunch of tasks to come in around the same time, without the + // actual executor threads picking up any work. This will lead to unnecessary rejection of tasks. + // The wait queue should be able to fit at least (waitQueue + currentFreeExecutor slots) evictedTask = waitQueue.offer(taskWrapper); if (evictedTask != taskWrapper) { knownTasks.put(taskWrapper.getRequestId(), taskWrapper); @@ -392,10 +401,8 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta } } - private boolean trySchedule(final TaskWrapper taskWrapper) { + private void trySchedule(final TaskWrapper taskWrapper) throws RejectedExecutionException { - boolean scheduled = false; - try { synchronized (lock) { boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish(); LOG.info("Attempting to execute {}", taskWrapper); @@ -423,37 +430,35 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta } } numSlotsAvailable.decrementAndGet(); - scheduled = true; - } catch (RejectedExecutionException e) { - if (enablePreemption && taskWrapper.getTaskRunnerCallable().canFinish() && !preemptionQueue.isEmpty()) { + } - if (isDebugEnabled) { - LOG.debug("Preemption Queue: " + preemptionQueue); - } + private void handleScheduleAttemptedRejection(TaskWrapper taskWrapper) { + if (enablePreemption && taskWrapper.getTaskRunnerCallable().canFinish() && !preemptionQueue.isEmpty()) { - TaskWrapper pRequest = removeAndGetFromPreemptionQueue(); + if (isDebugEnabled) { + LOG.debug("Preemption Queue: " + preemptionQueue); + } - // Avoid preempting tasks which are finishable - callback still to be processed. - if (pRequest != null) { - if (pRequest.getTaskRunnerCallable().canFinish()) { - LOG.info( - "Removed {} from preemption queue, but not preempting since it's now finishable", - pRequest.getRequestId()); - } else { - if (isInfoEnabled) { - LOG.info("Invoking kill task for {} due to pre-emption to run {}", - pRequest.getRequestId(), taskWrapper.getRequestId()); - } - // The task will either be killed or is already in the process of completing, which will - // trigger the next scheduling run, or result in available slots being higher than 0, - // which will cause the scheduler loop to continue. - pRequest.getTaskRunnerCallable().killTask(); + TaskWrapper pRequest = removeAndGetFromPreemptionQueue(); + + // Avoid preempting tasks which are finishable - callback still to be processed. + if (pRequest != null) { + if (pRequest.getTaskRunnerCallable().canFinish()) { + LOG.info( + "Removed {} from preemption queue, but not preempting since it's now finishable", + pRequest.getRequestId()); + } else { + if (isInfoEnabled) { + LOG.info("Invoking kill task for {} due to pre-emption to run {}", + pRequest.getRequestId(), taskWrapper.getRequestId()); } + // The task will either be killed or is already in the process of completing, which will + // trigger the next scheduling run, or result in available slots being higher than 0, + // which will cause the scheduler loop to continue. + pRequest.getTaskRunnerCallable().killTask(); } } } - - return scheduled; } private void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishableState) { http://git-wip-us.apache.org/repos/asf/hive/blob/2faf01ee/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java index 38d42b9..7fb9a99 100644 --- a/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java +++ b/llap-server/src/java/org/apache/tez/dag/app/rm/LlapTaskSchedulerService.java @@ -123,6 +123,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { private final Lock scheduleLock = new ReentrantLock(); private final Condition scheduleCondition = scheduleLock.newCondition(); + private final AtomicBoolean pendingScheduleInvodations = new AtomicBoolean(false); private final ListeningExecutorService schedulerExecutor; private final SchedulerCallable schedulerCallable = new SchedulerCallable(); @@ -910,6 +911,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { private void trySchedulingPendingTasks() { scheduleLock.lock(); try { + pendingScheduleInvodations.set(true); scheduleCondition.signal(); } finally { scheduleLock.unlock(); @@ -924,7 +926,9 @@ public class LlapTaskSchedulerService extends TaskScheduler { while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { scheduleLock.lock(); try { - scheduleCondition.await(); + while (!pendingScheduleInvodations.get()) { + scheduleCondition.await(); + } } catch (InterruptedException e) { if (isShutdown.get()) { LOG.info("Scheduler thread interrupted after shutdown"); @@ -936,6 +940,12 @@ public class LlapTaskSchedulerService extends TaskScheduler { } finally { scheduleLock.unlock(); } + + // Set pending to false since scheduling is about to run. Any triggers up to this point + // will be handled in the next run. + // A new request may come in right after this is set to false, but before the actual scheduling. + // This will be handled in this run, but will cause an immediate run after, which is harmless. + pendingScheduleInvodations.set(false); // Schedule outside of the scheduleLock - which should only be used to wait on the condition. schedulePendingTasks(); } http://git-wip-us.apache.org/repos/asf/hive/blob/2faf01ee/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index ec1ffcf..38af07e 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -116,6 +116,7 @@ public class TaskExecutorTestHelpers { private final ReentrantLock lock = new ReentrantLock(); private final Condition startedCondition = lock.newCondition(); private final Condition sleepCondition = lock.newCondition(); + private boolean shouldSleep = true; private final Condition finishedCondition = lock.newCondition(); public MockRequest(SubmitWorkRequestProto requestProto, @@ -143,7 +144,9 @@ public class TaskExecutorTestHelpers { lock.lock(); try { - sleepCondition.await(workTime, TimeUnit.MILLISECONDS); + if (shouldSleep) { + sleepCondition.await(workTime, TimeUnit.MILLISECONDS); + } } catch (InterruptedException e) { wasInterrupted.set(true); return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false); @@ -171,6 +174,7 @@ public class TaskExecutorTestHelpers { lock.lock(); try { wasKilled.set(true); + shouldSleep = false; sleepCondition.signal(); } finally { lock.unlock(); @@ -192,6 +196,7 @@ public class TaskExecutorTestHelpers { void complete() { lock.lock(); try { + shouldSleep = false; sleepCondition.signal(); } finally { lock.unlock(); http://git-wip-us.apache.org/repos/asf/hive/blob/2faf01ee/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index 34ab40a..cb2d0e9 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -24,6 +24,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -81,6 +83,7 @@ public class TestTaskExecutorService { r2.awaitStart(); // Verify r1 was preempted. Also verify that it finished (single executor), otherwise // r2 could have run anyway. + r1.awaitEnd(); assertTrue(r1.wasPreempted()); assertTrue(r1.hasFinished()); @@ -117,6 +120,9 @@ public class TestTaskExecutorService { try { taskExecutorService.schedule(r1); + + // TODO HIVE-11687. Remove the awaitStart once offer can handle (waitQueueSize + numFreeExecutionSlots) + // This currently serves to allow the task to be removed from the waitQueue. r1.awaitStart(); try { taskExecutorService.schedule(r2); @@ -154,6 +160,7 @@ public class TestTaskExecutorService { assertTrue(taskExecutorService.knownTasks.containsKey(r5.getRequestId())); r1.complete(); + r1.awaitEnd(); icl1.awaitCompletion(); // Two known tasks left. r2 and r5. (r1 complete, r3 evicted, r4 rejected) @@ -165,6 +172,7 @@ public class TestTaskExecutorService { TaskExecutorServiceForTest.InternalCompletionListenerForTest icl5 = taskExecutorService.getInternalCompletionListenerForTest(r5.getRequestId()); r5.complete(); + r5.awaitEnd(); icl5.awaitCompletion(); // 1 Pending task which is not finishable @@ -175,6 +183,7 @@ public class TestTaskExecutorService { TaskExecutorServiceForTest.InternalCompletionListenerForTest icl2 = taskExecutorService.getInternalCompletionListenerForTest(r2.getRequestId()); r2.complete(); + r2.awaitEnd(); icl2.awaitCompletion(); // 0 Pending task which is not finishable assertEquals(0, taskExecutorService.knownTasks.size()); @@ -187,6 +196,10 @@ public class TestTaskExecutorService { private static class TaskExecutorServiceForTest extends TaskExecutorService { + + private final Lock iclCreationLock = new ReentrantLock(); + private final Map<String, Condition> iclCreationConditions = new HashMap<>(); + public TaskExecutorServiceForTest(int numExecutors, int waitQueueSize, String waitQueueComparatorClassName, boolean enablePreemption) { super(numExecutors, waitQueueSize, waitQueueComparatorClassName, enablePreemption); @@ -196,13 +209,38 @@ public class TestTaskExecutorService { @Override InternalCompletionListener createInternalCompletionListener(TaskWrapper taskWrapper) { - InternalCompletionListenerForTest icl = new InternalCompletionListenerForTest(taskWrapper); - completionListeners.put(taskWrapper.getRequestId(), icl); - return icl; + iclCreationLock.lock(); + try { + InternalCompletionListenerForTest icl = new InternalCompletionListenerForTest(taskWrapper); + completionListeners.put(taskWrapper.getRequestId(), icl); + Condition condition = iclCreationConditions.get(taskWrapper.getRequestId()); + if (condition == null) { + condition = iclCreationLock.newCondition(); + iclCreationConditions.put(taskWrapper.getRequestId(), condition); + } + condition.signalAll(); + return icl; + } finally { + iclCreationLock.unlock(); + } } - InternalCompletionListenerForTest getInternalCompletionListenerForTest(String requestId) { - return completionListeners.get(requestId); + InternalCompletionListenerForTest getInternalCompletionListenerForTest(String requestId) throws + InterruptedException { + iclCreationLock.lock(); + try { + Condition condition = iclCreationConditions.get(requestId); + if (condition == null) { + condition = iclCreationLock.newCondition(); + iclCreationConditions.put(requestId, condition); + } + while (completionListeners.get(requestId) == null) { + condition.await(); + } + return completionListeners.get(requestId); + } finally { + iclCreationLock.unlock(); + } } private class InternalCompletionListenerForTest extends TaskExecutorService.InternalCompletionListener { http://git-wip-us.apache.org/repos/asf/hive/blob/2faf01ee/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java index ce60e6e..2f93266 100644 --- a/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java +++ b/llap-server/src/test/org/apache/tez/dag/app/rm/TestLlapTaskSchedulerService.java @@ -343,9 +343,12 @@ public class TestLlapTaskSchedulerService { ts = new LlapTaskSchedulerServiceForTest(mockAppCallback, clock); + controlScheduler(true); ts.initialize(); ts.start(); // One scheduler pass from the nodes that are added at startup + signalSchedulerRun(); + controlScheduler(false); awaitSchedulerRun(); } @@ -386,9 +389,9 @@ public class TestLlapTaskSchedulerService { private AtomicBoolean controlScheduling = new AtomicBoolean(false); private final Lock testLock = new ReentrantLock(); private final Condition schedulingCompleteCondition = testLock.newCondition(); - private final AtomicBoolean schedulingComplete = new AtomicBoolean(false); + private boolean schedulingComplete = false; private final Condition triggerSchedulingCondition = testLock.newCondition(); - private final AtomicBoolean schedulingTriggered = new AtomicBoolean(false); + private boolean schedulingTriggered = false; private final AtomicInteger numSchedulerRuns = new AtomicInteger(0); @@ -402,7 +405,7 @@ public class TestLlapTaskSchedulerService { testLock.lock(); try { if (controlScheduling.get()) { - while (!schedulingTriggered.get()) { + while (!schedulingTriggered) { try { triggerSchedulingCondition.await(); } catch (InterruptedException e) { @@ -412,8 +415,8 @@ public class TestLlapTaskSchedulerService { } numSchedulerRuns.incrementAndGet(); super.schedulePendingTasks(); - schedulingTriggered.set(false); - schedulingComplete.set(true); + schedulingTriggered = false; + schedulingComplete = true; schedulingCompleteCondition.signal(); } finally { testLock.unlock(); @@ -428,7 +431,7 @@ public class TestLlapTaskSchedulerService { void forTestSignalSchedulingRun() throws InterruptedException { testLock.lock(); try { - schedulingTriggered.set(true); + schedulingTriggered = true; triggerSchedulingCondition.signal(); } finally { testLock.unlock(); @@ -438,10 +441,10 @@ public class TestLlapTaskSchedulerService { void forTestAwaitSchedulingRun() throws InterruptedException { testLock.lock(); try { - while (!schedulingComplete.get()) { + while (!schedulingComplete) { schedulingCompleteCondition.await(); } - schedulingComplete.set(false); + schedulingComplete = false; } finally { testLock.unlock(); }