Repository: hive Updated Branches: refs/heads/branch-2.0 9bf8d00cf -> 80ce0449e
HIVE-12904. Fix a deadlock in LLAP task scheduling. (Sergey Shelukhin and Siddharth Seth) (cherry picked from commit 19b508ecc862fae8997ec938edae4e094658544f) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/80ce0449 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/80ce0449 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/80ce0449 Branch: refs/heads/branch-2.0 Commit: 80ce0449ef2bb4f849ce7c37df0c5f1f264e1df2 Parents: 9bf8d00 Author: Siddharth Seth <ss...@apache.org> Authored: Mon Jan 25 19:17:15 2016 -0800 Committer: Siddharth Seth <ss...@apache.org> Committed: Mon Jan 25 19:18:31 2016 -0800 ---------------------------------------------------------------------- .../hadoop/hive/llap/daemon/impl/QueryInfo.java | 69 +++++++++++++------- .../llap/daemon/impl/TaskExecutorService.java | 64 +++++++++--------- 2 files changed, 80 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/80ce0449/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index 8bec95f..64c2b58 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -20,11 +20,13 @@ import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; +import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.locks.ReentrantLock; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; @@ -166,37 +168,60 @@ public class QueryInfo { private final Map<FinishableStateUpdateHandler, EntityInfo> trackedEntities = new HashMap<>(); private final Multimap<String, EntityInfo> sourceToEntity = HashMultimap.create(); - synchronized boolean registerForUpdates(FinishableStateUpdateHandler handler, + private final ReentrantLock lock = new ReentrantLock(); + + boolean registerForUpdates(FinishableStateUpdateHandler handler, List<String> sources, QueryFragmentInfo fragmentInfo, boolean lastFinishableState) { - EntityInfo entityInfo = - new EntityInfo(handler, sources, fragmentInfo, lastFinishableState); - if (trackedEntities.put(handler, entityInfo) != null) { - throw new IllegalStateException( - "Only a single registration allowed per entity. Duplicate for " + handler.toString()); - } - for (String source : sources) { - sourceToEntity.put(source, entityInfo); - } + lock.lock(); + try { + EntityInfo entityInfo = + new EntityInfo(handler, sources, fragmentInfo, lastFinishableState); + if (trackedEntities.put(handler, entityInfo) != null) { + throw new IllegalStateException( + "Only a single registration allowed per entity. Duplicate for " + handler.toString()); + } + for (String source : sources) { + sourceToEntity.put(source, entityInfo); + } - if (lastFinishableState != fragmentInfo.canFinish()) { - entityInfo.setLastFinishableState(fragmentInfo.canFinish()); - return false; - } else { - return true; + if (lastFinishableState != fragmentInfo.canFinish()) { + entityInfo.setLastFinishableState(fragmentInfo.canFinish()); + return false; + } else { + return true; + } + } finally { + lock.unlock(); } } - synchronized void unregisterForUpdates(FinishableStateUpdateHandler handler) { - EntityInfo info = trackedEntities.remove(handler); - Preconditions.checkState(info != null, "Cannot invoke unregister on an entity which has not been registered"); - for (String source : info.getSources()) { - sourceToEntity.remove(source, info); + void unregisterForUpdates(FinishableStateUpdateHandler handler) { + lock.lock(); + try { + EntityInfo info = trackedEntities.remove(handler); + Preconditions.checkState(info != null, + "Cannot invoke unregister on an entity which has not been registered"); + for (String source : info.getSources()) { + sourceToEntity.remove(source, info); + } + } finally { + lock.unlock(); } } - synchronized void sourceStateUpdated(String sourceName) { - Collection<EntityInfo> interestedEntityInfos = sourceToEntity.get(sourceName); + void sourceStateUpdated(String sourceName) { + List<EntityInfo> interestedEntityInfos = null; + lock.lock(); + try { + Collection<EntityInfo> entities = sourceToEntity.get(sourceName); + if (entities != null) { + // Create a copy since the underlying list can be changed elsewhere. + interestedEntityInfos = new LinkedList<>(entities); + } + } finally { + lock.unlock(); + } if (interestedEntityInfos != null) { for (EntityInfo entityInfo : interestedEntityInfos) { boolean newFinishState = entityInfo.getFragmentInfo().canFinish(); http://git-wip-us.apache.org/repos/asf/hive/blob/80ce0449/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 34aa5c9..7f60cf7 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -320,6 +320,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta TaskWrapper taskWrapper = new TaskWrapper(task, this); SubmissionState result; TaskWrapper evictedTask; + boolean canFinish; synchronized (lock) { // If the queue does not have capacity, it does not throw a Rejection. Instead it will // return the task with the lowest priority, which could be the task which is currently being processed. @@ -327,6 +328,7 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta // TODO HIVE-11687 It's possible for a bunch of tasks to come in around the same time, without the // actual executor threads picking up any work. This will lead to unnecessary rejection of tasks. // The wait queue should be able to fit at least (waitQueue + currentFreeExecutor slots) + canFinish = taskWrapper.getTaskRunnerCallable().canFinish(); evictedTask = waitQueue.offer(taskWrapper); // null evicted task means offer accepted @@ -366,10 +368,14 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta // This registration has to be done after knownTasks has been populated. // Register for state change notifications so that the waitQueue can be re-ordered correctly // if the fragment moves in or out of the finishable state. - boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish(); - // It's safe to register outside of the lock since the stateChangeTracker ensures that updates - // and registrations are mutually exclusive. - taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish); + boolean stateChanged = taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish); + if (stateChanged) { + if (isDebugEnabled) { + LOG.debug("Finishable state of {} updated to {} during registration for state updates", + taskWrapper.getRequestId(), !canFinish); + } + finishableStateUpdated(taskWrapper, !canFinish); + } if (isDebugEnabled) { LOG.debug("Wait Queue: {}", waitQueue); @@ -397,14 +403,14 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta TaskWrapper taskWrapper = knownTasks.remove(fragmentId); // Can be null since the task may have completed meanwhile. if (taskWrapper != null) { - if (taskWrapper.inWaitQueue) { + if (taskWrapper.isInWaitQueue()) { if (isDebugEnabled) { LOG.debug("Removing {} from waitQueue", fragmentId); } taskWrapper.setIsInWaitQueue(false); waitQueue.remove(taskWrapper); } - if (taskWrapper.inPreemptionQueue) { + if (taskWrapper.isInPreemptionQueue()) { if (isDebugEnabled) { LOG.debug("Removing {} from preemptionQueue", fragmentId); } @@ -644,9 +650,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta public static class TaskWrapper implements FinishableStateUpdateHandler { private final TaskRunnerCallable taskRunnerCallable; - private boolean inWaitQueue = false; - private boolean inPreemptionQueue = false; - private boolean registeredForNotifications = false; + private final AtomicBoolean inWaitQueue = new AtomicBoolean(false); + private final AtomicBoolean inPreemptionQueue = new AtomicBoolean(false); + private final AtomicBoolean registeredForNotifications = new AtomicBoolean(false); private final TaskExecutorService taskExecutorService; public TaskWrapper(TaskRunnerCallable taskRunnerCallable, TaskExecutorService taskExecutorService) { @@ -654,18 +660,16 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta this.taskExecutorService = taskExecutorService; } - // Methods are synchronized primarily for visibility. + // Don't invoke from within a scheduler lock + /** * * @param currentFinishableState - * @return true if the current state is the same as the currentFinishableState. false if the state has already changed. + * @return true if the state has not changed from currentFinishableState, false otherwise */ - // Synchronized to avoid register / unregister clobbering each other. - // Don't invoke from within a scheduler lock - public synchronized boolean maybeRegisterForFinishedStateNotifications( + public boolean maybeRegisterForFinishedStateNotifications( boolean currentFinishableState) { - if (!registeredForNotifications) { - registeredForNotifications = true; + if (!registeredForNotifications.getAndSet(true)) { return taskRunnerCallable.getFragmentInfo() .registerForFinishableStateUpdates(this, currentFinishableState); } else { @@ -673,11 +677,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta } } - // Synchronized to avoid register / unregister clobbering each other. // Don't invoke from within a scheduler lock - public synchronized void maybeUnregisterForFinishedStateNotifications() { - if (registeredForNotifications) { - registeredForNotifications = false; + public void maybeUnregisterForFinishedStateNotifications() { + if (registeredForNotifications.getAndSet(false)) { taskRunnerCallable.getFragmentInfo().unregisterForFinishableStateUpdates(this); } } @@ -686,20 +688,20 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta return taskRunnerCallable; } - public synchronized boolean isInWaitQueue() { - return inWaitQueue; + public boolean isInWaitQueue() { + return inWaitQueue.get(); } - public synchronized boolean isInPreemptionQueue() { - return inPreemptionQueue; + public boolean isInPreemptionQueue() { + return inPreemptionQueue.get(); } - public synchronized void setIsInWaitQueue(boolean value) { - this.inWaitQueue = value; + public void setIsInWaitQueue(boolean value) { + this.inWaitQueue.set(value); } - public synchronized void setIsInPreemptableQueue(boolean value) { - this.inPreemptionQueue = value; + public void setIsInPreemptableQueue(boolean value) { + this.inPreemptionQueue.set(value); } public String getRequestId() { @@ -710,9 +712,9 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta public String toString() { return "TaskWrapper{" + "task=" + taskRunnerCallable.getRequestId() + - ", inWaitQueue=" + inWaitQueue + - ", inPreemptionQueue=" + inPreemptionQueue + - ", registeredForNotifications=" + registeredForNotifications + + ", inWaitQueue=" + inWaitQueue.get() + + ", inPreemptionQueue=" + inPreemptionQueue.get() + + ", registeredForNotifications=" + registeredForNotifications.get() + ", canFinish=" + taskRunnerCallable.canFinish() + ", firstAttemptStartTime=" + taskRunnerCallable.getFragmentRuntimeInfo().getFirstAttemptStartTime() + ", dagStartTime=" + taskRunnerCallable.getFragmentRuntimeInfo().getDagStartTime() +