Repository: hive
Updated Branches:
  refs/heads/branch-2.0 9bf8d00cf -> 80ce0449e


HIVE-12904. Fix a deadlock in LLAP task scheduling. (Sergey Shelukhin and 
Siddharth Seth)
(cherry picked from commit 19b508ecc862fae8997ec938edae4e094658544f)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/80ce0449
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/80ce0449
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/80ce0449

Branch: refs/heads/branch-2.0
Commit: 80ce0449ef2bb4f849ce7c37df0c5f1f264e1df2
Parents: 9bf8d00
Author: Siddharth Seth <ss...@apache.org>
Authored: Mon Jan 25 19:17:15 2016 -0800
Committer: Siddharth Seth <ss...@apache.org>
Committed: Mon Jan 25 19:18:31 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hive/llap/daemon/impl/QueryInfo.java | 69 +++++++++++++-------
 .../llap/daemon/impl/TaskExecutorService.java   | 64 +++++++++---------
 2 files changed, 80 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/80ce0449/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index 8bec95f..64c2b58 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -20,11 +20,13 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReentrantLock;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultimap;
@@ -166,37 +168,60 @@ public class QueryInfo {
     private final Map<FinishableStateUpdateHandler, EntityInfo> 
trackedEntities = new HashMap<>();
     private final Multimap<String, EntityInfo> sourceToEntity = 
HashMultimap.create();
 
-    synchronized boolean registerForUpdates(FinishableStateUpdateHandler 
handler,
+    private final ReentrantLock lock = new ReentrantLock();
+
+    boolean registerForUpdates(FinishableStateUpdateHandler handler,
                                          List<String> sources, 
QueryFragmentInfo fragmentInfo,
                                          boolean lastFinishableState) {
-      EntityInfo entityInfo =
-          new EntityInfo(handler, sources, fragmentInfo, lastFinishableState);
-      if (trackedEntities.put(handler, entityInfo) != null) {
-        throw new IllegalStateException(
-            "Only a single registration allowed per entity. Duplicate for " + 
handler.toString());
-      }
-      for (String source : sources) {
-        sourceToEntity.put(source, entityInfo);
-      }
+      lock.lock();
+      try {
+        EntityInfo entityInfo =
+            new EntityInfo(handler, sources, fragmentInfo, 
lastFinishableState);
+        if (trackedEntities.put(handler, entityInfo) != null) {
+          throw new IllegalStateException(
+              "Only a single registration allowed per entity. Duplicate for " 
+ handler.toString());
+        }
+        for (String source : sources) {
+          sourceToEntity.put(source, entityInfo);
+        }
 
-      if (lastFinishableState != fragmentInfo.canFinish()) {
-        entityInfo.setLastFinishableState(fragmentInfo.canFinish());
-        return false;
-      } else {
-        return true;
+        if (lastFinishableState != fragmentInfo.canFinish()) {
+          entityInfo.setLastFinishableState(fragmentInfo.canFinish());
+          return false;
+        } else {
+          return true;
+        }
+      } finally {
+        lock.unlock();
       }
     }
 
-    synchronized void unregisterForUpdates(FinishableStateUpdateHandler 
handler) {
-      EntityInfo info = trackedEntities.remove(handler);
-      Preconditions.checkState(info != null, "Cannot invoke unregister on an 
entity which has not been registered");
-      for (String source : info.getSources()) {
-        sourceToEntity.remove(source, info);
+    void unregisterForUpdates(FinishableStateUpdateHandler handler) {
+      lock.lock();
+      try {
+        EntityInfo info = trackedEntities.remove(handler);
+        Preconditions.checkState(info != null,
+            "Cannot invoke unregister on an entity which has not been 
registered");
+        for (String source : info.getSources()) {
+          sourceToEntity.remove(source, info);
+        }
+      } finally {
+        lock.unlock();
       }
     }
 
-    synchronized void sourceStateUpdated(String sourceName) {
-      Collection<EntityInfo> interestedEntityInfos = 
sourceToEntity.get(sourceName);
+    void sourceStateUpdated(String sourceName) {
+      List<EntityInfo> interestedEntityInfos = null;
+      lock.lock();
+      try {
+        Collection<EntityInfo> entities = sourceToEntity.get(sourceName);
+        if (entities != null) {
+          // Create a copy since the underlying list can be changed elsewhere.
+          interestedEntityInfos = new LinkedList<>(entities);
+        }
+      } finally {
+        lock.unlock();
+      }
       if (interestedEntityInfos != null) {
         for (EntityInfo entityInfo : interestedEntityInfos) {
           boolean newFinishState = entityInfo.getFragmentInfo().canFinish();

http://git-wip-us.apache.org/repos/asf/hive/blob/80ce0449/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 34aa5c9..7f60cf7 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -320,6 +320,7 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
     TaskWrapper taskWrapper = new TaskWrapper(task, this);
     SubmissionState result;
     TaskWrapper evictedTask;
+    boolean canFinish;
     synchronized (lock) {
       // If the queue does not have capacity, it does not throw a Rejection. 
Instead it will
       // return the task with the lowest priority, which could be the task 
which is currently being processed.
@@ -327,6 +328,7 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
       // TODO HIVE-11687 It's possible for a bunch of tasks to come in around 
the same time, without the
       // actual executor threads picking up any work. This will lead to 
unnecessary rejection of tasks.
       // The wait queue should be able to fit at least (waitQueue + 
currentFreeExecutor slots)
+      canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
       evictedTask = waitQueue.offer(taskWrapper);
 
       // null evicted task means offer accepted
@@ -366,10 +368,14 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
     // This registration has to be done after knownTasks has been populated.
     // Register for state change notifications so that the waitQueue can be 
re-ordered correctly
     // if the fragment moves in or out of the finishable state.
-    boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish();
-    // It's safe to register outside of the lock since the stateChangeTracker 
ensures that updates
-    // and registrations are mutually exclusive.
-    taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish);
+    boolean stateChanged = 
taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish);
+    if (stateChanged) {
+      if (isDebugEnabled) {
+        LOG.debug("Finishable state of {} updated to {} during registration 
for state updates",
+            taskWrapper.getRequestId(), !canFinish);
+      }
+      finishableStateUpdated(taskWrapper, !canFinish);
+    }
 
     if (isDebugEnabled) {
       LOG.debug("Wait Queue: {}", waitQueue);
@@ -397,14 +403,14 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
       TaskWrapper taskWrapper = knownTasks.remove(fragmentId);
       // Can be null since the task may have completed meanwhile.
       if (taskWrapper != null) {
-        if (taskWrapper.inWaitQueue) {
+        if (taskWrapper.isInWaitQueue()) {
           if (isDebugEnabled) {
             LOG.debug("Removing {} from waitQueue", fragmentId);
           }
           taskWrapper.setIsInWaitQueue(false);
           waitQueue.remove(taskWrapper);
         }
-        if (taskWrapper.inPreemptionQueue) {
+        if (taskWrapper.isInPreemptionQueue()) {
           if (isDebugEnabled) {
             LOG.debug("Removing {} from preemptionQueue", fragmentId);
           }
@@ -644,9 +650,9 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
 
   public static class TaskWrapper implements FinishableStateUpdateHandler {
     private final TaskRunnerCallable taskRunnerCallable;
-    private boolean inWaitQueue = false;
-    private boolean inPreemptionQueue = false;
-    private boolean registeredForNotifications = false;
+    private final AtomicBoolean inWaitQueue = new AtomicBoolean(false);
+    private final AtomicBoolean inPreemptionQueue = new AtomicBoolean(false);
+    private final AtomicBoolean registeredForNotifications = new 
AtomicBoolean(false);
     private final TaskExecutorService taskExecutorService;
 
     public TaskWrapper(TaskRunnerCallable taskRunnerCallable, 
TaskExecutorService taskExecutorService) {
@@ -654,18 +660,16 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
       this.taskExecutorService = taskExecutorService;
     }
 
-    // Methods are synchronized primarily for visibility.
+    // Don't invoke from within a scheduler lock
+
     /**
      *
      * @param currentFinishableState
-     * @return true if the current state is the same as the 
currentFinishableState. false if the state has already changed.
+     * @return true if the state has not changed from currentFinishableState, 
false otherwise
      */
-    // Synchronized to avoid register / unregister clobbering each other.
-    // Don't invoke from within a scheduler lock
-    public synchronized boolean maybeRegisterForFinishedStateNotifications(
+    public boolean maybeRegisterForFinishedStateNotifications(
         boolean currentFinishableState) {
-      if (!registeredForNotifications) {
-        registeredForNotifications = true;
+      if (!registeredForNotifications.getAndSet(true)) {
         return taskRunnerCallable.getFragmentInfo()
             .registerForFinishableStateUpdates(this, currentFinishableState);
       } else {
@@ -673,11 +677,9 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
       }
     }
 
-    // Synchronized to avoid register / unregister clobbering each other.
     // Don't invoke from within a scheduler lock
-    public synchronized void maybeUnregisterForFinishedStateNotifications() {
-      if (registeredForNotifications) {
-        registeredForNotifications = false;
+    public void maybeUnregisterForFinishedStateNotifications() {
+      if (registeredForNotifications.getAndSet(false)) {
         
taskRunnerCallable.getFragmentInfo().unregisterForFinishableStateUpdates(this);
       }
     }
@@ -686,20 +688,20 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
       return taskRunnerCallable;
     }
 
-    public synchronized boolean isInWaitQueue() {
-      return inWaitQueue;
+    public boolean isInWaitQueue() {
+      return inWaitQueue.get();
     }
 
-    public synchronized boolean isInPreemptionQueue() {
-      return inPreemptionQueue;
+    public boolean isInPreemptionQueue() {
+      return inPreemptionQueue.get();
     }
 
-    public synchronized void setIsInWaitQueue(boolean value) {
-      this.inWaitQueue = value;
+    public void setIsInWaitQueue(boolean value) {
+      this.inWaitQueue.set(value);
     }
 
-    public synchronized void setIsInPreemptableQueue(boolean value) {
-      this.inPreemptionQueue = value;
+    public void setIsInPreemptableQueue(boolean value) {
+      this.inPreemptionQueue.set(value);
     }
 
     public String getRequestId() {
@@ -710,9 +712,9 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
     public String toString() {
       return "TaskWrapper{" +
           "task=" + taskRunnerCallable.getRequestId() +
-          ", inWaitQueue=" + inWaitQueue +
-          ", inPreemptionQueue=" + inPreemptionQueue +
-          ", registeredForNotifications=" + registeredForNotifications +
+          ", inWaitQueue=" + inWaitQueue.get() +
+          ", inPreemptionQueue=" + inPreemptionQueue.get() +
+          ", registeredForNotifications=" + registeredForNotifications.get() +
           ", canFinish=" + taskRunnerCallable.canFinish() +
           ", firstAttemptStartTime=" + 
taskRunnerCallable.getFragmentRuntimeInfo().getFirstAttemptStartTime() +
           ", dagStartTime=" + 
taskRunnerCallable.getFragmentRuntimeInfo().getDagStartTime() +

Reply via email to