Repository: hive Updated Branches: refs/heads/llap b8b94f297 -> f2a055631
HIVE-10842. Fix a deadlock in handling of finishable state change notifications. (Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/f2a05563 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/f2a05563 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/f2a05563 Branch: refs/heads/llap Commit: f2a0556310e670286c7d657ad2ca470d8b883f18 Parents: b8b94f2 Author: Siddharth Seth <ss...@apache.org> Authored: Thu May 28 02:39:26 2015 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Thu May 28 02:39:26 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hive/llap/daemon/impl/QueryInfo.java | 7 ++++++- .../hive/llap/daemon/impl/TaskExecutorService.java | 14 +++++++++++--- 2 files changed, 17 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/f2a05563/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index 3487e19..6aed60f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -178,7 +178,12 @@ public class QueryInfo { sourceToEntity.put(source, entityInfo); } - return lastFinishableState == fragmentInfo.canFinish(); + if (lastFinishableState != fragmentInfo.canFinish()) { + entityInfo.setLastFinishableState(fragmentInfo.canFinish()); + return false; + } else { + return true; + } } synchronized void unregisterForUpdates(FinishableStateUpdateHandler handler) { http://git-wip-us.apache.org/repos/asf/hive/blob/f2a05563/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 453a71e..b3e5f74 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -282,9 +282,12 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { boolean scheduled = false; try { + + boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish(); + // It's safe to register outside of the lock since the stateChangeTracker ensures that updates + // and registrations are mutually exclusive. + boolean stateChanged = !taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish); synchronized (lock) { - boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish(); - boolean stateChanged = !taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish); ListenableFuture<TaskRunner2Result> future = executorService.submit(taskWrapper.getTaskRunnerCallable()); taskWrapper.setIsInWaitQueue(false); FutureCallback<TaskRunner2Result> wrappedCallback = new InternalCompletionListener(taskWrapper); @@ -299,7 +302,7 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { // only tasks that cannot finish immediately are pre-emptable. In other words, if all inputs // to the tasks are not ready yet, the task is eligible for pre-emptable. if (enablePreemption) { - if (!canFinish && !stateChanged) { + if ((!canFinish && !stateChanged) || (canFinish && stateChanged)) { if (isInfoEnabled) { LOG.info("{} is not finishable. Adding it to pre-emption queue", taskWrapper.getRequestId()); } @@ -538,6 +541,8 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { * @param currentFinishableState * @return true if the current state is the same as the currentFinishableState. false if the state has already changed. */ + // Synchronized to avoid register / unregister clobbering each other. + // Don't invoke from within a scheduler lock public synchronized boolean maybeRegisterForFinishedStateNotifications( boolean currentFinishableState) { if (!registeredForNotifications) { @@ -549,6 +554,8 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { } } + // Synchronized to avoid register / unregister clobbering each other. + // Don't invoke from within a scheduler lock public synchronized void maybeUnregisterForFinishedStateNotifications() { if (registeredForNotifications) { registeredForNotifications = false; @@ -590,6 +597,7 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { '}'; } + // No task lock. But acquires lock on the scheduler @Override public void finishableStateUpdated(boolean finishableState) { // This method should not by synchronized. Can lead to deadlocks since it calls a sync method.