Repository: hive
Updated Branches:
  refs/heads/llap 1d0881e04 -> baddfa5cc


HIVE-10758. LLAP: Modify running / wait queues on on fragment finishable
state changes. (Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/baddfa5c
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/baddfa5c
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/baddfa5c

Branch: refs/heads/llap
Commit: baddfa5cca4bf8ccdd9133a63b4e9409ce520459
Parents: 1d0881e
Author: Siddharth Seth <ss...@apache.org>
Authored: Fri May 22 10:58:58 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Fri May 22 10:58:58 2015 -0700

----------------------------------------------------------------------
 .../daemon/FinishableStateUpdateHandler.java    |  21 ++
 .../impl/EvictingPriorityBlockingQueue.java     |   4 +-
 .../llap/daemon/impl/QueryFragmentInfo.java     |  28 ++
 .../hadoop/hive/llap/daemon/impl/QueryInfo.java | 114 +++++++
 .../hive/llap/daemon/impl/QueryTracker.java     |   8 +-
 .../llap/daemon/impl/TaskExecutorService.java   | 305 +++++++++++++++----
 .../llap/daemon/impl/TaskRunnerCallable.java    |   4 +
 7 files changed, 414 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/baddfa5c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FinishableStateUpdateHandler.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FinishableStateUpdateHandler.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FinishableStateUpdateHandler.java
new file mode 100644
index 0000000..8d40ce9
--- /dev/null
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FinishableStateUpdateHandler.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon;
+
+public interface FinishableStateUpdateHandler {
+
+  void finishableStateUpdated(boolean finishableState);
+
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/baddfa5c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
index ab3a130..4ea3b0b 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java
@@ -61,8 +61,8 @@ public class EvictingPriorityBlockingQueue<E> {
     return deque.take();
   }
 
-  public synchronized void remove(E e) {
-    deque.remove(e);
+  public synchronized boolean remove(E e) {
+    return deque.remove(e);
   }
 
   public synchronized int size() {

http://git-wip-us.apache.org/repos/asf/hive/blob/baddfa5c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
index f6cd8ab..554864e 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
@@ -15,9 +15,11 @@
 package org.apache.hadoop.hive.llap.daemon.impl;
 
 import java.io.IOException;
+import java.util.LinkedList;
 import java.util.List;
 
 import com.google.common.base.Preconditions;
+import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
@@ -111,6 +113,32 @@ public class QueryFragmentInfo {
     return queryInfo.getLocalDirs();
   }
 
+  /**
+   *
+   * @param handler
+   * @param lastFinishableState
+   * @return true if the current state is the same as the lastFinishableState. 
false if the state has already changed.
+   */
+  public boolean 
registerForFinishableStateUpdates(FinishableStateUpdateHandler handler,
+                                                boolean lastFinishableState) {
+    List<String> sourcesOfInterest = new LinkedList<>();
+    List<IOSpecProto> inputSpecList = fragmentSpec.getInputSpecsList();
+    if (inputSpecList != null && !inputSpecList.isEmpty()) {
+      for (IOSpecProto inputSpec : inputSpecList) {
+        if (isSourceOfInterest(inputSpec)) {
+          sourcesOfInterest.add(inputSpec.getConnectedVertexName());
+        }
+      }
+    }
+    return queryInfo.registerForFinishableStateUpdates(handler, 
sourcesOfInterest, this,
+        lastFinishableState);
+  }
+
+
+  public void unregisterForFinishableStateUpdates(FinishableStateUpdateHandler 
handler) {
+    queryInfo.unregisterFinishableStateUpdate(handler);
+  }
+
   private boolean isSourceOfInterest(IOSpecProto inputSpec) {
     String inputClassName = inputSpec.getIoDescriptor().getClassName();
     // MRInput is not of interest since it'll always be ready.

http://git-wip-us.apache.org/repos/asf/hive/blob/baddfa5c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index efa18cd..3487e19 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -16,13 +16,22 @@ package org.apache.hadoop.hive.llap.daemon.impl;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
+import com.google.common.base.Preconditions;
+import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Multimap;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
 
@@ -42,6 +51,7 @@ public class QueryInfo {
 
   private final ConcurrentMap<String, SourceStateProto> sourceStateMap;
 
+  private final FinishableStateTracker finishableStateTracker = new 
FinishableStateTracker();
 
   public QueryInfo(String queryId, String appIdString, String dagName, int 
dagIdentifier,
                    String user, ConcurrentMap<String, SourceStateProto> 
sourceStateMap,
@@ -125,4 +135,108 @@ public class QueryInfo {
     return baseDir + File.separator + "usercache" + File.separator + user + 
File.separator +
         "appcache" + File.separator + applicationIdString + File.separator + 
dagIdentifier;
   }
+
+  /**
+   *
+   * @param handler
+   * @param sources
+   * @param fragmentInfo
+   * @param lastFinishableState
+   * @return true if the current state is the same as the lastFinishableState. 
false if the state has already changed.
+   */
+  boolean registerForFinishableStateUpdates(FinishableStateUpdateHandler 
handler,
+                                         List<String> sources, 
QueryFragmentInfo fragmentInfo,
+                                         boolean lastFinishableState) {
+    return finishableStateTracker
+        .registerForUpdates(handler, sources, fragmentInfo, 
lastFinishableState);
+  }
+
+  void unregisterFinishableStateUpdate(FinishableStateUpdateHandler handler) {
+    finishableStateTracker.unregisterForUpdates(handler);
+  }
+
+  void sourceStateUpdated(String sourceName) {
+    finishableStateTracker.sourceStateUpdated(sourceName);
+  }
+
+
+  private static class FinishableStateTracker {
+
+    private final Map<FinishableStateUpdateHandler, EntityInfo> 
trackedEntities = new HashMap<>();
+    private final Multimap<String, EntityInfo> sourceToEntity = 
HashMultimap.create();
+
+    synchronized boolean registerForUpdates(FinishableStateUpdateHandler 
handler,
+                                         List<String> sources, 
QueryFragmentInfo fragmentInfo,
+                                         boolean lastFinishableState) {
+      EntityInfo entityInfo =
+          new EntityInfo(handler, sources, fragmentInfo, lastFinishableState);
+      if (trackedEntities.put(handler, entityInfo) != null) {
+        throw new IllegalStateException(
+            "Only a single registration allowed per entity. Duplicate for " + 
handler.toString());
+      }
+      for (String source : sources) {
+        sourceToEntity.put(source, entityInfo);
+      }
+
+      return lastFinishableState == fragmentInfo.canFinish();
+    }
+
+    synchronized void unregisterForUpdates(FinishableStateUpdateHandler 
handler) {
+      EntityInfo info = trackedEntities.remove(handler);
+      Preconditions.checkState(info != null, "Cannot invoke unregister on an 
entity which has not been registered");
+      for (String source : info.getSources()) {
+        sourceToEntity.remove(source, info);
+      }
+    }
+
+    synchronized void sourceStateUpdated(String sourceName) {
+      Collection<EntityInfo> interestedEntityInfos = 
sourceToEntity.get(sourceName);
+      if (interestedEntityInfos != null) {
+        for (EntityInfo entityInfo : interestedEntityInfos) {
+          boolean newFinishState = entityInfo.getFragmentInfo().canFinish();
+          if (newFinishState != entityInfo.getLastFinishableState()) {
+            // State changed. Callback
+            entityInfo.setLastFinishableState(newFinishState);
+            entityInfo.getHandler().finishableStateUpdated(newFinishState);
+          }
+        }
+      }
+    }
+
+
+  }
+
+  private static class EntityInfo {
+    final FinishableStateUpdateHandler handler;
+    final List<String> sources;
+    final QueryFragmentInfo fragmentInfo;
+    boolean lastFinishableState;
+
+    public EntityInfo(FinishableStateUpdateHandler handler, List<String> 
sources, QueryFragmentInfo fragmentInfo, boolean lastFinishableState) {
+      this.handler = handler;
+      this.sources = sources;
+      this.fragmentInfo = fragmentInfo;
+      this.lastFinishableState = lastFinishableState;
+    }
+
+    public FinishableStateUpdateHandler getHandler() {
+      return handler;
+    }
+
+    public QueryFragmentInfo getFragmentInfo() {
+      return fragmentInfo;
+    }
+
+    public boolean getLastFinishableState() {
+      return lastFinishableState;
+    }
+
+    public List<String> getSources() {
+      return sources;
+    }
+
+    public void setLastFinishableState(boolean lastFinishableState) {
+      this.lastFinishableState = lastFinishableState;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/baddfa5c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 90ad923..d796b24 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -178,7 +178,13 @@ public class QueryTracker extends CompositeService {
    */
   void registerSourceStateChange(String dagName, String sourceName, 
SourceStateProto sourceState) {
     getSourceCompletionMap(dagName).put(sourceName, sourceState);
-    // TODO HIVE-10758 source completion notifications
+    QueryInfo queryInfo = queryInfoMap.get(dagName);
+    if (queryInfo != null) {
+      queryInfo.sourceStateUpdated(sourceName);
+    } else {
+      // Could be null if there's a race between the threads processing 
requests, with a
+      // dag finish processed earlier.
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/hive/blob/baddfa5c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 18daa75..bfc4d89 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -17,8 +17,11 @@
  */
 package org.apache.hadoop.hive.llap.daemon.impl;
 
+import java.util.Collections;
 import java.util.Comparator;
+import java.util.Set;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -29,6 +32,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
+import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler;
 import org.apache.tez.runtime.task.EndReason;
 import org.apache.tez.runtime.task.TaskRunner2Result;
 import org.slf4j.Logger;
@@ -62,29 +66,32 @@ import 
com.google.common.util.concurrent.ThreadFactoryBuilder;
  * new tasks. Shutting down of the task executor service can be done 
gracefully or immediately.
  */
 public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
+
+
   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutorService.class);
   private static final boolean isInfoEnabled = LOG.isInfoEnabled();
   private static final boolean isDebugEnabled = LOG.isDebugEnabled();
   private static final String TASK_EXECUTOR_THREAD_NAME_FORMAT = 
"Task-Executor-%d";
   private static final String WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT = 
"Wait-Queue-Scheduler-%d";
 
-  // some object to lock upon. Used by task scheduler to notify wait queue 
scheduler of new items
-  // to wait queue
-  private final Object waitLock;
   // Thread pool for actual execution of work.
   private final ListeningExecutorService executorService;
-  private final EvictingPriorityBlockingQueue<TaskRunnerCallable> waitQueue;
+  private final EvictingPriorityBlockingQueue<TaskWrapper> waitQueue;
   // Thread pool for taking entities off the wait queue.
   private final ListeningExecutorService waitQueueExecutorService;
   // Thread pool for callbacks on completion of execution of a work unit.
   private final ListeningExecutorService executionCompletionExecutorService;
-  private final BlockingQueue<TaskRunnerCallable> preemptionQueue;
+  private final BlockingQueue<TaskWrapper> preemptionQueue;
   private final boolean enablePreemption;
   private final ThreadPoolExecutor threadPoolExecutor;
   private final AtomicInteger numSlotsAvailable;
 
+  // Tracks known tasks.
+  private final Set<TaskWrapper> knownTasks = Collections.newSetFromMap(new 
ConcurrentHashMap<TaskWrapper, Boolean>());
+
+  private final Object lock = new Object();
+
   public TaskExecutorService(int numExecutors, int waitQueueSize, boolean 
enablePreemption) {
-    this.waitLock = new Object();
     this.waitQueue = new EvictingPriorityBlockingQueue<>(new 
WaitQueueComparator(), waitQueueSize);
     this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core 
pool size
         numExecutors, // max pool size
@@ -115,32 +122,36 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
             + ", enablePreemption=" + enablePreemption);
   }
 
+
   /**
    * Worker that takes tasks from wait queue and schedule it for execution.
    */
   private final class WaitQueueWorker implements Runnable {
-    TaskRunnerCallable task;
+    TaskWrapper task;
 
     @Override
     public void run() {
       try {
 
-        synchronized (waitLock) {
+        synchronized (lock) {
           while (waitQueue.isEmpty()) {
-            waitLock.wait();
+            lock.wait();
           }
         }
 
         // Since schedule() can be called from multiple threads, we peek the 
wait queue,
         // try scheduling the task and then remove the task if scheduling is 
successful.
         // This will make sure the task's place in the wait queue is held 
until it gets scheduled.
-        while ((task = waitQueue.peek()) != null) {
+        while (true) {
+          synchronized (lock) {
+            task = waitQueue.peek();
+            if (task == null) {
+              break;
+            }
 
           // if the task cannot finish and if no slots are available then 
don't schedule it.
-          // TODO: Event notifications that change canFinish state should 
notify waitLock
-          synchronized (waitLock) {
             boolean shouldWait = false;
-            if (task.canFinish()) {
+            if (task.getTaskRunnerCallable().canFinish()) {
               if (numSlotsAvailable.get() == 0 && preemptionQueue.isEmpty()) {
                 shouldWait = true;
               }
@@ -150,9 +161,9 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
               }
             }
             if (shouldWait) {
+              lock.wait();
               // Another task at a higher priority may have come in during the 
wait. Lookup the
               // queue again to pick up the task at the highest priority.
-              waitLock.wait();
               continue;
             }
           }
@@ -161,12 +172,14 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
           if (scheduled) {
             // wait queue could have been re-ordered in the mean time because 
of concurrent task
             // submission. So remove the specific task instead of the head 
task.
-            waitQueue.remove(task);
+            synchronized (lock) {
+              waitQueue.remove(task);
+            }
           }
 
-          synchronized (waitLock) {
+          synchronized (lock) {
             while (waitQueue.isEmpty()) {
-              waitLock.wait();
+              lock.wait();
             }
           }
         }
@@ -194,71 +207,92 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
 
   @Override
   public void schedule(TaskRunnerCallable task) throws 
RejectedExecutionException {
-    TaskRunnerCallable evictedTask = waitQueue.offer(task);
+    TaskWrapper taskWrapper = new TaskWrapper(task);
+    knownTasks.add(taskWrapper);
+    TaskWrapper evictedTask;
+    try {
+      // Don't need a lock. Not subscribed for notifications yet, and marked 
as inWaitQueue
+      evictedTask = waitQueue.offer(taskWrapper);
+    } catch (RejectedExecutionException e) {
+      knownTasks.remove(taskWrapper);
+      throw e;
+    }
     if (evictedTask == null) {
       if (isInfoEnabled) {
         LOG.info(task.getRequestId() + " added to wait queue.");
       }
-
       if (isDebugEnabled) {
         LOG.debug("Wait Queue: {}", waitQueue);
       }
-      synchronized (waitLock) {
-        waitLock.notify();
-      }
     } else {
-      evictedTask.killTask();
+      evictedTask.maybeUnregisterForFinishedStateNotifications();
+      evictedTask.getTaskRunnerCallable().killTask();
       if (isInfoEnabled) {
-        LOG.info(task.getRequestId() + " evicted from wait queue because of 
low priority");
+        LOG.info("{} evicted from wait queue in favor of {} because of lower 
priority",
+            evictedTask.getRequestId(), task.getRequestId());
       }
     }
+    synchronized (lock) {
+      lock.notify();
+    }
   }
 
-  private boolean trySchedule(final TaskRunnerCallable task) {
+  private boolean trySchedule(final TaskWrapper taskWrapper) {
 
     boolean scheduled = false;
     try {
-      ListenableFuture<TaskRunner2Result> future = 
executorService.submit(task);
-      FutureCallback<TaskRunner2Result> wrappedCallback = new 
InternalCompletionListener(task);
-      // Callback on a separate thread so that when a task completes, the 
thread in the main queue
-      // is actually available for execution and will not potentially result 
in a RejectedExecution
-      Futures.addCallback(future, wrappedCallback, 
executionCompletionExecutorService);
+      synchronized (lock) {
+        boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
+        boolean stateChanged = 
taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish);
+        ListenableFuture<TaskRunner2Result> future = 
executorService.submit(taskWrapper.getTaskRunnerCallable());
+        taskWrapper.setIsInWaitQueue(false);
+        FutureCallback<TaskRunner2Result> wrappedCallback = new 
InternalCompletionListener(taskWrapper);
+        // Callback on a separate thread so that when a task completes, the 
thread in the main queue
+        // is actually available for execution and will not potentially result 
in a RejectedExecution
+        Futures.addCallback(future, wrappedCallback, 
executionCompletionExecutorService);
 
-      if (isInfoEnabled) {
-        LOG.info(task.getRequestId() + " scheduled for execution.");
-      }
-
-      // only tasks that cannot finish immediately are pre-emptable. In other 
words, if all inputs
-      // to the tasks are not ready yet, the task is eligible for pre-emptable.
-      if (enablePreemption && !task.canFinish()) {
         if (isInfoEnabled) {
-          LOG.info(task.getRequestId() + " is not finishable. Adding it to 
pre-emption queue.");
+          LOG.info("{} scheduled for execution. canFinish={}", 
taskWrapper.getRequestId(), canFinish);
         }
-        preemptionQueue.add(task);
-      }
 
+        // only tasks that cannot finish immediately are pre-emptable. In 
other words, if all inputs
+        // to the tasks are not ready yet, the task is eligible for 
pre-emptable.
+        if (enablePreemption) {
+          if (!canFinish && !stateChanged) {
+            if (isInfoEnabled) {
+              LOG.info("{} is not finishable. Adding it to pre-emption queue", 
taskWrapper.getRequestId());
+            }
+            addToPreemptionQueue(taskWrapper);
+          }
+        }
+      }
       numSlotsAvailable.decrementAndGet();
       scheduled = true;
     } catch (RejectedExecutionException e) {
-
-      if (enablePreemption && task.canFinish() && !preemptionQueue.isEmpty()) {
+      if (enablePreemption && taskWrapper.getTaskRunnerCallable().canFinish() 
&& !preemptionQueue.isEmpty()) {
 
         if (isDebugEnabled) {
           LOG.debug("Preemption Queue: " + preemptionQueue);
         }
 
-        TaskRunnerCallable pRequest = preemptionQueue.remove();
-        if (pRequest != null) {
+        TaskWrapper pRequest = removeAndGetFromPreemptionQueue();
 
-          if (isInfoEnabled) {
-            LOG.info("Invoking kill task for {} due to pre-emption to run {}",
-                pRequest.getRequestId(), task.getRequestId());
+        // Avoid preempting tasks which are finishable - callback still to be 
processed.
+        if (pRequest != null) {
+          if (pRequest.getTaskRunnerCallable().canFinish()) {
+            LOG.info(
+                "Removed {} from preemption queue, but not preempting since 
it's now finishable",
+                pRequest.getRequestId());
+          } else {
+            if (isInfoEnabled) {
+              LOG.info("Invoking kill task for {} due to pre-emption to run 
{}",
+                  pRequest.getRequestId(), taskWrapper.getRequestId());
+            }
+            // The task will either be killed or is already in the process of 
completing, which will
+            // trigger the next scheduling run, or result in available slots 
being higher than 0,
+            // which will cause the scheduler loop to continue.
+            pRequest.getTaskRunnerCallable().killTask();
           }
-
-          // The task will either be killed or is already in the process of 
completing, which will
-          // trigger the next scheduling run, or result in available slots 
being higher than 0,
-          // which will cause the scheduler loop to continue.
-          pRequest.killTask();
         }
       }
     }
@@ -266,23 +300,76 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
     return scheduled;
   }
 
+  private void finishableStateUpdated(TaskWrapper taskWrapper, boolean 
newFinishableState) {
+    synchronized (lock) {
+      if (taskWrapper.isInWaitQueue()) {
+        // Re-order the wait queue
+        LOG.info("DEBUG: Re-ordering the wait queue since {} finishable state 
moved to {}",
+            taskWrapper.getRequestId(), newFinishableState);
+        if (waitQueue.remove(taskWrapper)) {
+          // Put element back onlt if it existed.
+          waitQueue.offer(taskWrapper);
+        } else {
+          LOG.warn("Failed to remove {} from waitQueue",
+              taskWrapper.getTaskRunnerCallable().getRequestId());
+        }
+      }
+
+      if (newFinishableState == true && taskWrapper.isInPreemptionQueue()) {
+        LOG.info("DEBUG: Removing {} from preemption queue because it's state 
changed to {}",
+            taskWrapper.getRequestId(), newFinishableState);
+        preemptionQueue.remove(taskWrapper.getTaskRunnerCallable());
+      } else if (newFinishableState == false && 
!taskWrapper.isInPreemptionQueue() &&
+          !taskWrapper.isInWaitQueue()) {
+        LOG.info("DEBUG: Adding {} to preemption queue since finishable state 
changed to {}",
+            taskWrapper.getRequestId(), newFinishableState);
+        preemptionQueue.offer(taskWrapper);
+      }
+      lock.notify();
+    }
+  }
+
+  private void addToPreemptionQueue(TaskWrapper taskWrapper) {
+    synchronized (lock) {
+      preemptionQueue.add(taskWrapper);
+      taskWrapper.setIsInPreemptableQueue(true);
+    }
+  }
+
+  private TaskWrapper removeAndGetFromPreemptionQueue() {
+    TaskWrapper taskWrapper;
+    synchronized (lock) {
+       taskWrapper = preemptionQueue.remove();
+      if (taskWrapper != null) {
+        taskWrapper.setIsInPreemptableQueue(false);
+      }
+    }
+    return taskWrapper;
+  }
+
   private final class InternalCompletionListener implements
       FutureCallback<TaskRunner2Result> {
-    private TaskRunnerCallable task;
+    private final TaskWrapper taskWrapper;
 
-    public InternalCompletionListener(TaskRunnerCallable task) {
-      this.task = task;
+    public InternalCompletionListener(TaskWrapper taskWrapper) {
+      this.taskWrapper = taskWrapper;
     }
 
     @Override
     public void onSuccess(TaskRunner2Result result) {
-      task.getCallback().onSuccess(result);
+      knownTasks.remove(taskWrapper);
+      taskWrapper.setIsInPreemptableQueue(false);
+      taskWrapper.maybeUnregisterForFinishedStateNotifications();
+      taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result);
       updatePreemptionListAndNotify(result.getEndReason());
     }
 
     @Override
     public void onFailure(Throwable t) {
-      task.getCallback().onFailure(t);
+      knownTasks.remove(taskWrapper);
+      taskWrapper.setIsInPreemptableQueue(false);
+      taskWrapper.maybeUnregisterForFinishedStateNotifications();
+      taskWrapper.getTaskRunnerCallable().getCallback().onFailure(t);
       updatePreemptionListAndNotify(null);
       LOG.error("Failed notification received: Stacktrace: " + 
ExceptionUtils.getStackTrace(t));
     }
@@ -291,17 +378,18 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
       // if this task was added to pre-emption list, remove it
       if (enablePreemption) {
         String state = reason == null ? "FAILED" : reason.name();
-        preemptionQueue.remove(task.getRequest());
-        if (isInfoEnabled) {
-          
LOG.info(TaskRunnerCallable.getTaskIdentifierString(task.getRequest())
+        boolean removed = preemptionQueue.remove(taskWrapper);
+        if (removed && isInfoEnabled) {
+          LOG.info(TaskRunnerCallable
+              
.getTaskIdentifierString(taskWrapper.getTaskRunnerCallable().getRequest())
               + " request " + state + "! Removed from preemption list.");
         }
       }
 
       numSlotsAvailable.incrementAndGet();
-      if (!waitQueue.isEmpty()) {
-        synchronized (waitLock) {
-          waitLock.notify();
+      synchronized (lock) {
+        if (!waitQueue.isEmpty()) {
+          lock.notify();
         }
       }
     }
@@ -340,10 +428,12 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
   }
 
   @VisibleForTesting
-  public static class WaitQueueComparator implements 
Comparator<TaskRunnerCallable> {
+  public static class WaitQueueComparator implements Comparator<TaskWrapper> {
 
     @Override
-    public int compare(TaskRunnerCallable o1, TaskRunnerCallable o2) {
+    public int compare(TaskWrapper t1, TaskWrapper t2) {
+      TaskRunnerCallable o1 = t1.getTaskRunnerCallable();
+      TaskRunnerCallable o2 = t2.getTaskRunnerCallable();
       boolean newCanFinish = o1.canFinish();
       boolean oldCanFinish = o2.canFinish();
       if (newCanFinish == true && oldCanFinish == false) {
@@ -368,10 +458,12 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
   }
 
   @VisibleForTesting
-  public static class PreemptionQueueComparator implements 
Comparator<TaskRunnerCallable> {
+  public static class PreemptionQueueComparator implements 
Comparator<TaskWrapper> {
 
     @Override
-    public int compare(TaskRunnerCallable o1, TaskRunnerCallable o2) {
+    public int compare(TaskWrapper t1, TaskWrapper t2) {
+      TaskRunnerCallable o1 = t1.getTaskRunnerCallable();
+      TaskRunnerCallable o2 = t2.getTaskRunnerCallable();
       if (o1.getVertexParallelism() > o2.getVertexParallelism()) {
         return 1;
       } else if (o1.getVertexParallelism() < o2.getVertexParallelism()) {
@@ -380,4 +472,83 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
       return 0;
     }
   }
+
+
+  private class TaskWrapper implements FinishableStateUpdateHandler {
+    private final TaskRunnerCallable taskRunnerCallable;
+    private boolean inWaitQueue = true;
+    private boolean inPreemptionQueue = false;
+    private boolean registeredForNotifications = false;
+
+    public TaskWrapper(TaskRunnerCallable taskRunnerCallable) {
+      this.taskRunnerCallable = taskRunnerCallable;
+    }
+
+    // Methods are synchronized primarily for visibility.
+    /**
+     *
+     * @param currentFinishableState
+     * @return true if the current state is the same as the 
currentFinishableState. false if the state has already changed.
+     */
+    public synchronized boolean maybeRegisterForFinishedStateNotifications(
+        boolean currentFinishableState) {
+      if (!registeredForNotifications) {
+        registeredForNotifications = true;
+        return taskRunnerCallable.getFragmentInfo()
+            .registerForFinishableStateUpdates(this, currentFinishableState);
+      } else {
+        return true;
+      }
+    }
+
+    public synchronized void maybeUnregisterForFinishedStateNotifications() {
+      if (registeredForNotifications) {
+        registeredForNotifications = false;
+        
taskRunnerCallable.getFragmentInfo().unregisterForFinishableStateUpdates(this);
+      }
+    }
+
+    public TaskRunnerCallable getTaskRunnerCallable() {
+      return taskRunnerCallable;
+    }
+
+    public synchronized boolean isInWaitQueue() {
+      return inWaitQueue;
+    }
+
+    public synchronized boolean isInPreemptionQueue() {
+      return inPreemptionQueue;
+    }
+
+    public synchronized void setIsInWaitQueue(boolean value) {
+      this.inWaitQueue = value;
+    }
+
+    public synchronized void setIsInPreemptableQueue(boolean value) {
+      this.inPreemptionQueue = value;
+    }
+
+    public String getRequestId() {
+      return taskRunnerCallable.getRequestId();
+    }
+
+    @Override
+    public String toString() {
+      return "TaskWrapper{" +
+          "task=" + taskRunnerCallable.getRequestId() +
+          ", inWaitQueue=" + inWaitQueue +
+          ", inPreemptionQueue=" + inPreemptionQueue +
+          ", registeredForNotifications=" + registeredForNotifications +
+          '}';
+    }
+
+    @Override
+    public void finishableStateUpdated(boolean finishableState) {
+      // This method should not by synchronized. Can lead to deadlocks since 
it calls a sync method.
+      // Meanwhile the scheduler could try updating states via a synchronized 
method.
+      LOG.info("DEBUG: Received finishable state update for {}, state={}",
+          taskRunnerCallable.getRequestId(), finishableState);
+      TaskExecutorService.this.finishableStateUpdated(this, finishableState);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/baddfa5c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 2ea39b7..007c83d 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -324,6 +324,10 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
     return requestId;
   }
 
+  public QueryFragmentInfo getFragmentInfo() {
+    return fragmentInfo;
+  }
+
   public TaskRunnerCallback getCallback() {
     return new TaskRunnerCallback(request, this);
   }

Reply via email to