Repository: hive
Updated Branches:
  refs/heads/llap b8b94f297 -> f2a055631


HIVE-10842. Fix a deadlock in handling of finishable state change 
notifications. (Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f2a05563
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f2a05563
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f2a05563

Branch: refs/heads/llap
Commit: f2a0556310e670286c7d657ad2ca470d8b883f18
Parents: b8b94f2
Author: Siddharth Seth <ss...@apache.org>
Authored: Thu May 28 02:39:26 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Thu May 28 02:39:26 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hive/llap/daemon/impl/QueryInfo.java       |  7 ++++++-
 .../hive/llap/daemon/impl/TaskExecutorService.java    | 14 +++++++++++---
 2 files changed, 17 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/f2a05563/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index 3487e19..6aed60f 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -178,7 +178,12 @@ public class QueryInfo {
         sourceToEntity.put(source, entityInfo);
       }
 
-      return lastFinishableState == fragmentInfo.canFinish();
+      if (lastFinishableState != fragmentInfo.canFinish()) {
+        entityInfo.setLastFinishableState(fragmentInfo.canFinish());
+        return false;
+      } else {
+        return true;
+      }
     }
 
     synchronized void unregisterForUpdates(FinishableStateUpdateHandler 
handler) {

http://git-wip-us.apache.org/repos/asf/hive/blob/f2a05563/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 453a71e..b3e5f74 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -282,9 +282,12 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
 
     boolean scheduled = false;
     try {
+
+      boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
+      // It's safe to register outside of the lock since the 
stateChangeTracker ensures that updates
+      // and registrations are mutually exclusive.
+      boolean stateChanged = 
!taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish);
       synchronized (lock) {
-        boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
-        boolean stateChanged  = 
!taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish);
         ListenableFuture<TaskRunner2Result> future = 
executorService.submit(taskWrapper.getTaskRunnerCallable());
         taskWrapper.setIsInWaitQueue(false);
         FutureCallback<TaskRunner2Result> wrappedCallback = new 
InternalCompletionListener(taskWrapper);
@@ -299,7 +302,7 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
         // only tasks that cannot finish immediately are pre-emptable. In 
other words, if all inputs
         // to the tasks are not ready yet, the task is eligible for 
pre-emptable.
         if (enablePreemption) {
-          if (!canFinish && !stateChanged) {
+          if ((!canFinish && !stateChanged) || (canFinish && stateChanged)) {
             if (isInfoEnabled) {
               LOG.info("{} is not finishable. Adding it to pre-emption queue", 
taskWrapper.getRequestId());
             }
@@ -538,6 +541,8 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
      * @param currentFinishableState
      * @return true if the current state is the same as the 
currentFinishableState. false if the state has already changed.
      */
+    // Synchronized to avoid register / unregister clobbering each other.
+    // Don't invoke from within a scheduler lock
     public synchronized boolean maybeRegisterForFinishedStateNotifications(
         boolean currentFinishableState) {
       if (!registeredForNotifications) {
@@ -549,6 +554,8 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
       }
     }
 
+    // Synchronized to avoid register / unregister clobbering each other.
+    // Don't invoke from within a scheduler lock
     public synchronized void maybeUnregisterForFinishedStateNotifications() {
       if (registeredForNotifications) {
         registeredForNotifications = false;
@@ -590,6 +597,7 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
           '}';
     }
 
+    // No task lock. But acquires lock on the scheduler
     @Override
     public void finishableStateUpdated(boolean finishableState) {
       // This method should not by synchronized. Can lead to deadlocks since 
it calls a sync method.

Reply via email to