Repository: hive
Updated Branches:
  refs/heads/llap 41d123412 -> f0175bc8d


HIVE-10756. LLAP: Misc changes to daemon scheduling. (Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f0175bc8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f0175bc8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f0175bc8

Branch: refs/heads/llap
Commit: f0175bc8de1a585903fee71f280812983117f299
Parents: 41d1234
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue May 19 14:08:13 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue May 19 14:08:13 2015 -0700

----------------------------------------------------------------------
 .../llap/daemon/impl/ContainerRunnerImpl.java   |  2 -
 .../impl/EvictingPriorityBlockingQueue.java     |  4 +-
 .../llap/daemon/impl/TaskExecutorService.java   | 60 +++++++++++--------
 .../llap/daemon/impl/TaskRunnerCallable.java    | 62 ++++++++++----------
 4 files changed, 70 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f0175bc8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index e544789..3fd7920 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -67,7 +67,6 @@ public class ContainerRunnerImpl extends AbstractService 
implements ContainerRun
   private final QueryTracker queryTracker;
   private final Scheduler<TaskRunnerCallable> executorService;
   private final AtomicReference<InetSocketAddress> localAddress;
-  private final String[] localDirsBase;
   private final Map<String, String> localEnv = new HashMap<>();
   private final long memoryPerExecutor;
   private final LlapDaemonExecutorMetrics metrics;
@@ -87,7 +86,6 @@ public class ContainerRunnerImpl extends AbstractService 
implements ContainerRun
     this.conf = conf;
     Preconditions.checkState(numExecutors > 0,
         "Invalid number of executors: " + numExecutors + ". Must be > 0");
-    this.localDirsBase = localDirsBase;
     this.localAddress = localAddress;
 
     this.queryTracker = new QueryTracker(conf, localDirsBase);

http://git-wip-us.apache.org/repos/asf/hive/blob/f0175bc8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
index e8d789b..101a69c 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
@@ -27,8 +27,8 @@ import java.util.Comparator;
  * returned back. If the queue is not full, new element will be added to queue 
and null is returned.
  */
 public class EvictingPriorityBlockingQueue<E> {
-  private PriorityBlockingDeque<E> deque;
-  private Comparator<E> comparator;
+  private final PriorityBlockingDeque<E> deque;
+  private final Comparator<E> comparator;
 
   public EvictingPriorityBlockingQueue(Comparator<E> comparator, int maxSize) {
     this.deque = new PriorityBlockingDeque<>(comparator, maxSize);

http://git-wip-us.apache.org/repos/asf/hive/blob/f0175bc8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 08af1e2..5323f05 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -71,9 +71,13 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
   // some object to lock upon. Used by task scheduler to notify wait queue 
scheduler of new items
   // to wait queue
   private final Object waitLock;
+  // Thread pool for actual execution of work.
   private final ListeningExecutorService executorService;
   private final EvictingPriorityBlockingQueue<TaskRunnerCallable> waitQueue;
+  // Thread pool for taking entities off the wait queue.
   private final ListeningExecutorService waitQueueExecutorService;
+  // Thread pool for callbacks on completion of execution of a work unit.
+  private final ListeningExecutorService executionCompletionExecutorService;
   private final BlockingQueue<TaskRunnerCallable> preemptionQueue;
   private final boolean enablePreemption;
   private final ThreadPoolExecutor threadPoolExecutor;
@@ -94,9 +98,14 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
     this.numSlotsAvailable = new AtomicInteger(numExecutors);
 
     // single threaded scheduler for tasks from wait queue to executor threads
-    ExecutorService wes = Executors.newFixedThreadPool(1, new 
ThreadFactoryBuilder()
+    ExecutorService wes = Executors.newFixedThreadPool(1, new 
ThreadFactoryBuilder().setDaemon(true)
         .setNameFormat(WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT).build());
     this.waitQueueExecutorService = MoreExecutors.listeningDecorator(wes);
+
+    ExecutorService executionCompletionExecutorServiceRaw = 
Executors.newFixedThreadPool(1,
+        new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("ExecutionCompletionThread 
#%d")
+            .build());
+    executionCompletionExecutorService = 
MoreExecutors.listeningDecorator(executionCompletionExecutorServiceRaw);
     ListenableFuture<?> future = waitQueueExecutorService.submit(new 
WaitQueueWorker());
     Futures.addCallback(future, new WaitQueueWorkerCallback());
   }
@@ -125,8 +134,12 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
           // if the task cannot finish and if no slots are available then 
don't schedule it.
           // TODO: Event notifications that change canFinish state should 
notify waitLock
           synchronized (waitLock) {
+            // KKK Is this a tight loop when there's only finishable tasks 
available ?
             if (!task.canFinish() && numSlotsAvailable.get() == 0) {
               waitLock.wait();
+              // Another task at a higher priority may have come in during the 
wait. Lookup the
+              // queue again to pick up the task at the highest priority.
+              continue;
             }
           }
 
@@ -190,7 +203,9 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
     try {
       ListenableFuture<TaskRunner2Result> future = 
executorService.submit(task);
       FutureCallback<TaskRunner2Result> wrappedCallback = new 
InternalCompletionListener(task);
-      Futures.addCallback(future, wrappedCallback);
+      // Callback on a separate thread so that when a task completes, the 
thread in the main queue
+      // is actually available for execution and will not potentially result 
in a RejectedExecution
+      Futures.addCallback(future, wrappedCallback, 
executionCompletionExecutorService);
 
       if (isInfoEnabled) {
         LOG.info(task.getRequestId() + " scheduled for execution.");
@@ -216,13 +231,15 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
         }
 
         TaskRunnerCallable pRequest = preemptionQueue.remove();
-        if (pRequest != null && !pRequest.isCompleted() && 
!pRequest.isKillInvoked()) {
+        if (pRequest != null) {
 
           if (isInfoEnabled) {
-            LOG.info("Kill task invoked for " + pRequest.getRequestId() + " 
due to pre-emption");
+            LOG.info("Invoking kill task for {} due to pre-emption.", 
pRequest.getRequestId());
           }
 
-          pRequest.setKillInvoked();
+          // The task will either be killed or is already in the process of 
completing, which will
+          // trigger the next scheduling run, or result in available slots 
being higher than 0,
+          // which will cause the scheduler loop to continue.
           pRequest.killTask();
         }
       }
@@ -241,14 +258,12 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
 
     @Override
     public void onSuccess(TaskRunner2Result result) {
-      task.setCompleted();
       task.getCallback().onSuccess(result);
       updatePreemptionListAndNotify(result.getEndReason());
     }
 
     @Override
     public void onFailure(Throwable t) {
-      task.setCompleted();
       task.getCallback().onFailure(t);
       updatePreemptionListAndNotify(null);
       LOG.error("Failed notification received: Stacktrace: " + 
ExceptionUtils.getStackTrace(t));
@@ -282,23 +297,9 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
         LOG.debug("awaitTermination: " + awaitTermination + " shutting down 
task executor" +
             " service gracefully");
       }
-      executorService.shutdown();
-      try {
-        if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
-          executorService.shutdownNow();
-        }
-      } catch (InterruptedException e) {
-        executorService.shutdownNow();
-      }
-
-      waitQueueExecutorService.shutdown();
-      try {
-        if (!waitQueueExecutorService.awaitTermination(1, TimeUnit.MINUTES)) {
-          waitQueueExecutorService.shutdownNow();
-        }
-      } catch (InterruptedException e) {
-        waitQueueExecutorService.shutdownNow();
-      }
+      shutdownExecutor(waitQueueExecutorService);
+      shutdownExecutor(executorService);
+      shutdownExecutor(executionCompletionExecutorService);
     } else {
       if (isDebugEnabled) {
         LOG.debug("awaitTermination: " + awaitTermination + " shutting down 
task executor" +
@@ -309,6 +310,17 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
     }
   }
 
+  private void shutdownExecutor(ExecutorService executorService) {
+    executorService.shutdown();
+    try {
+      if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
+        executorService.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      executorService.shutdownNow();
+    }
+  }
+
   @VisibleForTesting
   public static class WaitQueueComparator implements 
Comparator<TaskRunnerCallable> {
 

http://git-wip-us.apache.org/repos/asf/hive/blob/f0175bc8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index e505070..d1b1c61 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -102,8 +102,8 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
   private boolean shouldRunTask = true;
   final Stopwatch runtimeWatch = new Stopwatch();
   final Stopwatch killtimerWatch = new Stopwatch();
-  private AtomicBoolean isCompleted;
-  private AtomicBoolean killInvoked;
+  private final AtomicBoolean isCompleted = new AtomicBoolean(false);
+  private final AtomicBoolean killInvoked = new AtomicBoolean(false);
 
   TaskRunnerCallable(LlapDaemonProtocolProtos.SubmitWorkRequestProto request, 
Configuration conf,
       ExecutionContext executionContext, Map<String, String> envMap,
@@ -133,24 +133,6 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
     this.metrics = metrics;
     this.requestId = getTaskAttemptId(request);
     this.killedTaskHandler = killedTaskHandler;
-    this.isCompleted = new AtomicBoolean(false);
-    this.killInvoked = new AtomicBoolean(false);
-  }
-
-  public void setCompleted() {
-    isCompleted.set(true);
-  }
-
-  public boolean isCompleted() {
-    return isCompleted.get();
-  }
-
-  public boolean isKillInvoked() {
-    return killInvoked.get();
-  }
-
-  public void setKillInvoked() {
-    killInvoked.set(true);
   }
 
   @Override
@@ -226,6 +208,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
       if (result.isContainerShutdownRequested()) {
         LOG.warn("Unexpected container shutdown requested while running task. 
Ignoring");
       }
+      isCompleted.set(true);
       return result;
 
     } finally {
@@ -242,21 +225,38 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
   /**
    * Attempt to kill a running task. If the task has not started running, it 
will not start.
    * If it's already running, a kill request will be sent to it.
-   *
+   * <p/>
    * The AM will be informed about the task kill.
    */
   public void killTask() {
-    synchronized (this) {
-      LOG.info("Killing task with id {}, taskRunnerSetup={}", 
taskSpec.getTaskAttemptID(), (taskRunner != null));
-      if (taskRunner != null) {
-        killtimerWatch.start();
-        LOG.info("Issuing kill to task {}" + taskSpec.getTaskAttemptID());
-        taskRunner.killTask();
-        shouldRunTask = false;
+    if (!isCompleted.get()) {
+      if (!killInvoked.getAndSet(true)) {
+        synchronized (this) {
+          LOG.info("Kill task requested for id={}, taskRunnerSetup={}", 
taskSpec.getTaskAttemptID(),
+              (taskRunner != null));
+          if (taskRunner != null) {
+            killtimerWatch.start();
+            LOG.info("Issuing kill to task {}", taskSpec.getTaskAttemptID());
+            boolean killed = taskRunner.killTask();
+            if (killed) {
+              // Sending a kill message to the AM right here. Don't need to 
wait for the task to complete.
+              reportTaskKilled();
+            } else {
+              LOG.info("Kill request for task {} did not complete because the 
task is already complete",
+                  taskSpec.getTaskAttemptID());
+            }
+            shouldRunTask = false;
+          }
+        }
+      } else {
+        // This should not happen.
+        LOG.warn("Ignoring kill request for task {} since a previous kill 
request was processed",
+            taskSpec.getTaskAttemptID());
       }
+    } else {
+      LOG.info("Ignoring kill request for task {} since it's already complete",
+          taskSpec.getTaskAttemptID());
     }
-    // Sending a kill message to the AM right here. Don't need to wait for the 
task to complete.
-    reportTaskKilled();
   }
 
   /**
@@ -382,6 +382,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
     // via a kill message when a task kill is requested by the daemon.
     @Override
     public void onSuccess(TaskRunner2Result result) {
+      isCompleted.set(true);
       switch(result.getEndReason()) {
         // Only the KILLED case requires a message to be sent out to the AM.
         case SUCCESS:
@@ -424,6 +425,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
 
     @Override
     public void onFailure(Throwable t) {
+      isCompleted.set(true);
       LOG.error("TezTaskRunner execution failed for : " + 
getTaskIdentifierString(request), t);
       // TODO HIVE-10236 Report a fatal error over the umbilical
       taskRunnerCallable.shutdown();

Reply via email to