Repository: hive Updated Branches: refs/heads/llap 41d123412 -> f0175bc8d
HIVE-10756. LLAP: Misc changes to daemon scheduling. (Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f0175bc8 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f0175bc8 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f0175bc8 Branch: refs/heads/llap Commit: f0175bc8de1a585903fee71f280812983117f299 Parents: 41d1234 Author: Siddharth Seth <ss...@apache.org> Authored: Tue May 19 14:08:13 2015 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Tue May 19 14:08:13 2015 -0700 ---------------------------------------------------------------------- .../llap/daemon/impl/ContainerRunnerImpl.java | 2 - .../impl/EvictingPriorityBlockingQueue.java | 4 +- .../llap/daemon/impl/TaskExecutorService.java | 60 +++++++++++-------- .../llap/daemon/impl/TaskRunnerCallable.java | 62 ++++++++++---------- 4 files changed, 70 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f0175bc8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index e544789..3fd7920 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -67,7 +67,6 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun private final QueryTracker queryTracker; private final Scheduler<TaskRunnerCallable> executorService; private final AtomicReference<InetSocketAddress> localAddress; - private final String[] localDirsBase; private final Map<String, String> localEnv = new HashMap<>(); private final long memoryPerExecutor; private final LlapDaemonExecutorMetrics metrics; @@ -87,7 +86,6 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun this.conf = conf; Preconditions.checkState(numExecutors > 0, "Invalid number of executors: " + numExecutors + ". Must be > 0"); - this.localDirsBase = localDirsBase; this.localAddress = localAddress; this.queryTracker = new QueryTracker(conf, localDirsBase); http://git-wip-us.apache.org/repos/asf/hive/blob/f0175bc8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java index e8d789b..101a69c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java @@ -27,8 +27,8 @@ import java.util.Comparator; * returned back. If the queue is not full, new element will be added to queue and null is returned. */ public class EvictingPriorityBlockingQueue<E> { - private PriorityBlockingDeque<E> deque; - private Comparator<E> comparator; + private final PriorityBlockingDeque<E> deque; + private final Comparator<E> comparator; public EvictingPriorityBlockingQueue(Comparator<E> comparator, int maxSize) { this.deque = new PriorityBlockingDeque<>(comparator, maxSize); http://git-wip-us.apache.org/repos/asf/hive/blob/f0175bc8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 08af1e2..5323f05 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -71,9 +71,13 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { // some object to lock upon. Used by task scheduler to notify wait queue scheduler of new items // to wait queue private final Object waitLock; + // Thread pool for actual execution of work. private final ListeningExecutorService executorService; private final EvictingPriorityBlockingQueue<TaskRunnerCallable> waitQueue; + // Thread pool for taking entities off the wait queue. private final ListeningExecutorService waitQueueExecutorService; + // Thread pool for callbacks on completion of execution of a work unit. + private final ListeningExecutorService executionCompletionExecutorService; private final BlockingQueue<TaskRunnerCallable> preemptionQueue; private final boolean enablePreemption; private final ThreadPoolExecutor threadPoolExecutor; @@ -94,9 +98,14 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { this.numSlotsAvailable = new AtomicInteger(numExecutors); // single threaded scheduler for tasks from wait queue to executor threads - ExecutorService wes = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() + ExecutorService wes = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder().setDaemon(true) .setNameFormat(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT).build()); this.waitQueueExecutorService = MoreExecutors.listeningDecorator(wes); + + ExecutorService executionCompletionExecutorServiceRaw = Executors.newFixedThreadPool(1, + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("ExecutionCompletionThread #%d") + .build()); + executionCompletionExecutorService = MoreExecutors.listeningDecorator(executionCompletionExecutorServiceRaw); ListenableFuture<?> future = waitQueueExecutorService.submit(new WaitQueueWorker()); Futures.addCallback(future, new WaitQueueWorkerCallback()); } @@ -125,8 +134,12 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { // if the task cannot finish and if no slots are available then don't schedule it. // TODO: Event notifications that change canFinish state should notify waitLock synchronized (waitLock) { + // KKK Is this a tight loop when there's only finishable tasks available ? if (!task.canFinish() && numSlotsAvailable.get() == 0) { waitLock.wait(); + // Another task at a higher priority may have come in during the wait. Lookup the + // queue again to pick up the task at the highest priority. + continue; } } @@ -190,7 +203,9 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { try { ListenableFuture<TaskRunner2Result> future = executorService.submit(task); FutureCallback<TaskRunner2Result> wrappedCallback = new InternalCompletionListener(task); - Futures.addCallback(future, wrappedCallback); + // Callback on a separate thread so that when a task completes, the thread in the main queue + // is actually available for execution and will not potentially result in a RejectedExecution + Futures.addCallback(future, wrappedCallback, executionCompletionExecutorService); if (isInfoEnabled) { LOG.info(task.getRequestId() + " scheduled for execution."); @@ -216,13 +231,15 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { } TaskRunnerCallable pRequest = preemptionQueue.remove(); - if (pRequest != null && !pRequest.isCompleted() && !pRequest.isKillInvoked()) { + if (pRequest != null) { if (isInfoEnabled) { - LOG.info("Kill task invoked for " + pRequest.getRequestId() + " due to pre-emption"); + LOG.info("Invoking kill task for {} due to pre-emption.", pRequest.getRequestId()); } - pRequest.setKillInvoked(); + // The task will either be killed or is already in the process of completing, which will + // trigger the next scheduling run, or result in available slots being higher than 0, + // which will cause the scheduler loop to continue. pRequest.killTask(); } } @@ -241,14 +258,12 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { @Override public void onSuccess(TaskRunner2Result result) { - task.setCompleted(); task.getCallback().onSuccess(result); updatePreemptionListAndNotify(result.getEndReason()); } @Override public void onFailure(Throwable t) { - task.setCompleted(); task.getCallback().onFailure(t); updatePreemptionListAndNotify(null); LOG.error("Failed notification received: Stacktrace: " + ExceptionUtils.getStackTrace(t)); @@ -282,23 +297,9 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor" + " service gracefully"); } - executorService.shutdown(); - try { - if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) { - executorService.shutdownNow(); - } - } catch (InterruptedException e) { - executorService.shutdownNow(); - } - - waitQueueExecutorService.shutdown(); - try { - if (!waitQueueExecutorService.awaitTermination(1, TimeUnit.MINUTES)) { - waitQueueExecutorService.shutdownNow(); - } - } catch (InterruptedException e) { - waitQueueExecutorService.shutdownNow(); - } + shutdownExecutor(waitQueueExecutorService); + shutdownExecutor(executorService); + shutdownExecutor(executionCompletionExecutorService); } else { if (isDebugEnabled) { LOG.debug("awaitTermination: " + awaitTermination + " shutting down task executor" + @@ -309,6 +310,17 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { } } + private void shutdownExecutor(ExecutorService executorService) { + executorService.shutdown(); + try { + if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) { + executorService.shutdownNow(); + } + } catch (InterruptedException e) { + executorService.shutdownNow(); + } + } + @VisibleForTesting public static class WaitQueueComparator implements Comparator<TaskRunnerCallable> { http://git-wip-us.apache.org/repos/asf/hive/blob/f0175bc8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index e505070..d1b1c61 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -102,8 +102,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { private boolean shouldRunTask = true; final Stopwatch runtimeWatch = new Stopwatch(); final Stopwatch killtimerWatch = new Stopwatch(); - private AtomicBoolean isCompleted; - private AtomicBoolean killInvoked; + private final AtomicBoolean isCompleted = new AtomicBoolean(false); + private final AtomicBoolean killInvoked = new AtomicBoolean(false); TaskRunnerCallable(LlapDaemonProtocolProtos.SubmitWorkRequestProto request, Configuration conf, ExecutionContext executionContext, Map<String, String> envMap, @@ -133,24 +133,6 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { this.metrics = metrics; this.requestId = getTaskAttemptId(request); this.killedTaskHandler = killedTaskHandler; - this.isCompleted = new AtomicBoolean(false); - this.killInvoked = new AtomicBoolean(false); - } - - public void setCompleted() { - isCompleted.set(true); - } - - public boolean isCompleted() { - return isCompleted.get(); - } - - public boolean isKillInvoked() { - return killInvoked.get(); - } - - public void setKillInvoked() { - killInvoked.set(true); } @Override @@ -226,6 +208,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { if (result.isContainerShutdownRequested()) { LOG.warn("Unexpected container shutdown requested while running task. Ignoring"); } + isCompleted.set(true); return result; } finally { @@ -242,21 +225,38 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { /** * Attempt to kill a running task. If the task has not started running, it will not start. * If it's already running, a kill request will be sent to it. - * + * <p/> * The AM will be informed about the task kill. */ public void killTask() { - synchronized (this) { - LOG.info("Killing task with id {}, taskRunnerSetup={}", taskSpec.getTaskAttemptID(), (taskRunner != null)); - if (taskRunner != null) { - killtimerWatch.start(); - LOG.info("Issuing kill to task {}" + taskSpec.getTaskAttemptID()); - taskRunner.killTask(); - shouldRunTask = false; + if (!isCompleted.get()) { + if (!killInvoked.getAndSet(true)) { + synchronized (this) { + LOG.info("Kill task requested for id={}, taskRunnerSetup={}", taskSpec.getTaskAttemptID(), + (taskRunner != null)); + if (taskRunner != null) { + killtimerWatch.start(); + LOG.info("Issuing kill to task {}", taskSpec.getTaskAttemptID()); + boolean killed = taskRunner.killTask(); + if (killed) { + // Sending a kill message to the AM right here. Don't need to wait for the task to complete. + reportTaskKilled(); + } else { + LOG.info("Kill request for task {} did not complete because the task is already complete", + taskSpec.getTaskAttemptID()); + } + shouldRunTask = false; + } + } + } else { + // This should not happen. + LOG.warn("Ignoring kill request for task {} since a previous kill request was processed", + taskSpec.getTaskAttemptID()); } + } else { + LOG.info("Ignoring kill request for task {} since it's already complete", + taskSpec.getTaskAttemptID()); } - // Sending a kill message to the AM right here. Don't need to wait for the task to complete. - reportTaskKilled(); } /** @@ -382,6 +382,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { // via a kill message when a task kill is requested by the daemon. @Override public void onSuccess(TaskRunner2Result result) { + isCompleted.set(true); switch(result.getEndReason()) { // Only the KILLED case requires a message to be sent out to the AM. case SUCCESS: @@ -424,6 +425,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { @Override public void onFailure(Throwable t) { + isCompleted.set(true); LOG.error("TezTaskRunner execution failed for : " + getTaskIdentifierString(request), t); // TODO HIVE-10236 Report a fatal error over the umbilical taskRunnerCallable.shutdown();