Repository: hive Updated Branches: refs/heads/llap 1d0881e04 -> baddfa5cc
HIVE-10758. LLAP: Modify running / wait queues on on fragment finishable state changes. (Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/baddfa5c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/baddfa5c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/baddfa5c Branch: refs/heads/llap Commit: baddfa5cca4bf8ccdd9133a63b4e9409ce520459 Parents: 1d0881e Author: Siddharth Seth <ss...@apache.org> Authored: Fri May 22 10:58:58 2015 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Fri May 22 10:58:58 2015 -0700 ---------------------------------------------------------------------- .../daemon/FinishableStateUpdateHandler.java | 21 ++ .../impl/EvictingPriorityBlockingQueue.java | 4 +- .../llap/daemon/impl/QueryFragmentInfo.java | 28 ++ .../hadoop/hive/llap/daemon/impl/QueryInfo.java | 114 +++++++ .../hive/llap/daemon/impl/QueryTracker.java | 8 +- .../llap/daemon/impl/TaskExecutorService.java | 305 +++++++++++++++---- .../llap/daemon/impl/TaskRunnerCallable.java | 4 + 7 files changed, 414 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/baddfa5c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FinishableStateUpdateHandler.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FinishableStateUpdateHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FinishableStateUpdateHandler.java new file mode 100644 index 0000000..8d40ce9 --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/FinishableStateUpdateHandler.java @@ -0,0 +1,21 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon; + +public interface FinishableStateUpdateHandler { + + void finishableStateUpdated(boolean finishableState); + +} http://git-wip-us.apache.org/repos/asf/hive/blob/baddfa5c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java index ab3a130..4ea3b0b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/EvictingPriorityBlockingQueue.java @@ -61,8 +61,8 @@ public class EvictingPriorityBlockingQueue<E> { return deque.take(); } - public synchronized void remove(E e) { - deque.remove(e); + public synchronized boolean remove(E e) { + return deque.remove(e); } public synchronized int size() { http://git-wip-us.apache.org/repos/asf/hive/blob/baddfa5c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java index f6cd8ab..554864e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java @@ -15,9 +15,11 @@ package org.apache.hadoop.hive.llap.daemon.impl; import java.io.IOException; +import java.util.LinkedList; import java.util.List; import com.google.common.base.Preconditions; +import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; @@ -111,6 +113,32 @@ public class QueryFragmentInfo { return queryInfo.getLocalDirs(); } + /** + * + * @param handler + * @param lastFinishableState + * @return true if the current state is the same as the lastFinishableState. false if the state has already changed. + */ + public boolean registerForFinishableStateUpdates(FinishableStateUpdateHandler handler, + boolean lastFinishableState) { + List<String> sourcesOfInterest = new LinkedList<>(); + List<IOSpecProto> inputSpecList = fragmentSpec.getInputSpecsList(); + if (inputSpecList != null && !inputSpecList.isEmpty()) { + for (IOSpecProto inputSpec : inputSpecList) { + if (isSourceOfInterest(inputSpec)) { + sourcesOfInterest.add(inputSpec.getConnectedVertexName()); + } + } + } + return queryInfo.registerForFinishableStateUpdates(handler, sourcesOfInterest, this, + lastFinishableState); + } + + + public void unregisterForFinishableStateUpdates(FinishableStateUpdateHandler handler) { + queryInfo.unregisterFinishableStateUpdate(handler); + } + private boolean isSourceOfInterest(IOSpecProto inputSpec) { String inputClassName = inputSpec.getIoDescriptor().getClassName(); // MRInput is not of interest since it'll always be ready. http://git-wip-us.apache.org/repos/asf/hive/blob/baddfa5c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index efa18cd..3487e19 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -16,13 +16,22 @@ package org.apache.hadoop.hive.llap.daemon.impl; import java.io.File; import java.io.IOException; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Multimap; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; @@ -42,6 +51,7 @@ public class QueryInfo { private final ConcurrentMap<String, SourceStateProto> sourceStateMap; + private final FinishableStateTracker finishableStateTracker = new FinishableStateTracker(); public QueryInfo(String queryId, String appIdString, String dagName, int dagIdentifier, String user, ConcurrentMap<String, SourceStateProto> sourceStateMap, @@ -125,4 +135,108 @@ public class QueryInfo { return baseDir + File.separator + "usercache" + File.separator + user + File.separator + "appcache" + File.separator + applicationIdString + File.separator + dagIdentifier; } + + /** + * + * @param handler + * @param sources + * @param fragmentInfo + * @param lastFinishableState + * @return true if the current state is the same as the lastFinishableState. false if the state has already changed. + */ + boolean registerForFinishableStateUpdates(FinishableStateUpdateHandler handler, + List<String> sources, QueryFragmentInfo fragmentInfo, + boolean lastFinishableState) { + return finishableStateTracker + .registerForUpdates(handler, sources, fragmentInfo, lastFinishableState); + } + + void unregisterFinishableStateUpdate(FinishableStateUpdateHandler handler) { + finishableStateTracker.unregisterForUpdates(handler); + } + + void sourceStateUpdated(String sourceName) { + finishableStateTracker.sourceStateUpdated(sourceName); + } + + + private static class FinishableStateTracker { + + private final Map<FinishableStateUpdateHandler, EntityInfo> trackedEntities = new HashMap<>(); + private final Multimap<String, EntityInfo> sourceToEntity = HashMultimap.create(); + + synchronized boolean registerForUpdates(FinishableStateUpdateHandler handler, + List<String> sources, QueryFragmentInfo fragmentInfo, + boolean lastFinishableState) { + EntityInfo entityInfo = + new EntityInfo(handler, sources, fragmentInfo, lastFinishableState); + if (trackedEntities.put(handler, entityInfo) != null) { + throw new IllegalStateException( + "Only a single registration allowed per entity. Duplicate for " + handler.toString()); + } + for (String source : sources) { + sourceToEntity.put(source, entityInfo); + } + + return lastFinishableState == fragmentInfo.canFinish(); + } + + synchronized void unregisterForUpdates(FinishableStateUpdateHandler handler) { + EntityInfo info = trackedEntities.remove(handler); + Preconditions.checkState(info != null, "Cannot invoke unregister on an entity which has not been registered"); + for (String source : info.getSources()) { + sourceToEntity.remove(source, info); + } + } + + synchronized void sourceStateUpdated(String sourceName) { + Collection<EntityInfo> interestedEntityInfos = sourceToEntity.get(sourceName); + if (interestedEntityInfos != null) { + for (EntityInfo entityInfo : interestedEntityInfos) { + boolean newFinishState = entityInfo.getFragmentInfo().canFinish(); + if (newFinishState != entityInfo.getLastFinishableState()) { + // State changed. Callback + entityInfo.setLastFinishableState(newFinishState); + entityInfo.getHandler().finishableStateUpdated(newFinishState); + } + } + } + } + + + } + + private static class EntityInfo { + final FinishableStateUpdateHandler handler; + final List<String> sources; + final QueryFragmentInfo fragmentInfo; + boolean lastFinishableState; + + public EntityInfo(FinishableStateUpdateHandler handler, List<String> sources, QueryFragmentInfo fragmentInfo, boolean lastFinishableState) { + this.handler = handler; + this.sources = sources; + this.fragmentInfo = fragmentInfo; + this.lastFinishableState = lastFinishableState; + } + + public FinishableStateUpdateHandler getHandler() { + return handler; + } + + public QueryFragmentInfo getFragmentInfo() { + return fragmentInfo; + } + + public boolean getLastFinishableState() { + return lastFinishableState; + } + + public List<String> getSources() { + return sources; + } + + public void setLastFinishableState(boolean lastFinishableState) { + this.lastFinishableState = lastFinishableState; + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/baddfa5c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index 90ad923..d796b24 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -178,7 +178,13 @@ public class QueryTracker extends CompositeService { */ void registerSourceStateChange(String dagName, String sourceName, SourceStateProto sourceState) { getSourceCompletionMap(dagName).put(sourceName, sourceState); - // TODO HIVE-10758 source completion notifications + QueryInfo queryInfo = queryInfoMap.get(dagName); + if (queryInfo != null) { + queryInfo.sourceStateUpdated(sourceName); + } else { + // Could be null if there's a race between the threads processing requests, with a + // dag finish processed earlier. + } } http://git-wip-us.apache.org/repos/asf/hive/blob/baddfa5c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 18daa75..bfc4d89 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -17,8 +17,11 @@ */ package org.apache.hadoop.hive.llap.daemon.impl; +import java.util.Collections; import java.util.Comparator; +import java.util.Set; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.PriorityBlockingQueue; @@ -29,6 +32,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.tez.runtime.task.EndReason; import org.apache.tez.runtime.task.TaskRunner2Result; import org.slf4j.Logger; @@ -62,29 +66,32 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * new tasks. Shutting down of the task executor service can be done gracefully or immediately. */ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { + + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorService.class); private static final boolean isInfoEnabled = LOG.isInfoEnabled(); private static final boolean isDebugEnabled = LOG.isDebugEnabled(); private static final String TASK_EXECUTOR_THREAD_NAME_FORMAT = "Task-Executor-%d"; private static final String WAIT_QUEUE_SCHEDULER_THREAD_NAME_FORMAT = "Wait-Queue-Scheduler-%d"; - // some object to lock upon. Used by task scheduler to notify wait queue scheduler of new items - // to wait queue - private final Object waitLock; // Thread pool for actual execution of work. private final ListeningExecutorService executorService; - private final EvictingPriorityBlockingQueue<TaskRunnerCallable> waitQueue; + private final EvictingPriorityBlockingQueue<TaskWrapper> waitQueue; // Thread pool for taking entities off the wait queue. private final ListeningExecutorService waitQueueExecutorService; // Thread pool for callbacks on completion of execution of a work unit. private final ListeningExecutorService executionCompletionExecutorService; - private final BlockingQueue<TaskRunnerCallable> preemptionQueue; + private final BlockingQueue<TaskWrapper> preemptionQueue; private final boolean enablePreemption; private final ThreadPoolExecutor threadPoolExecutor; private final AtomicInteger numSlotsAvailable; + // Tracks known tasks. + private final Set<TaskWrapper> knownTasks = Collections.newSetFromMap(new ConcurrentHashMap<TaskWrapper, Boolean>()); + + private final Object lock = new Object(); + public TaskExecutorService(int numExecutors, int waitQueueSize, boolean enablePreemption) { - this.waitLock = new Object(); this.waitQueue = new EvictingPriorityBlockingQueue<>(new WaitQueueComparator(), waitQueueSize); this.threadPoolExecutor = new ThreadPoolExecutor(numExecutors, // core pool size numExecutors, // max pool size @@ -115,32 +122,36 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { + ", enablePreemption=" + enablePreemption); } + /** * Worker that takes tasks from wait queue and schedule it for execution. */ private final class WaitQueueWorker implements Runnable { - TaskRunnerCallable task; + TaskWrapper task; @Override public void run() { try { - synchronized (waitLock) { + synchronized (lock) { while (waitQueue.isEmpty()) { - waitLock.wait(); + lock.wait(); } } // Since schedule() can be called from multiple threads, we peek the wait queue, // try scheduling the task and then remove the task if scheduling is successful. // This will make sure the task's place in the wait queue is held until it gets scheduled. - while ((task = waitQueue.peek()) != null) { + while (true) { + synchronized (lock) { + task = waitQueue.peek(); + if (task == null) { + break; + } // if the task cannot finish and if no slots are available then don't schedule it. - // TODO: Event notifications that change canFinish state should notify waitLock - synchronized (waitLock) { boolean shouldWait = false; - if (task.canFinish()) { + if (task.getTaskRunnerCallable().canFinish()) { if (numSlotsAvailable.get() == 0 && preemptionQueue.isEmpty()) { shouldWait = true; } @@ -150,9 +161,9 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { } } if (shouldWait) { + lock.wait(); // Another task at a higher priority may have come in during the wait. Lookup the // queue again to pick up the task at the highest priority. - waitLock.wait(); continue; } } @@ -161,12 +172,14 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { if (scheduled) { // wait queue could have been re-ordered in the mean time because of concurrent task // submission. So remove the specific task instead of the head task. - waitQueue.remove(task); + synchronized (lock) { + waitQueue.remove(task); + } } - synchronized (waitLock) { + synchronized (lock) { while (waitQueue.isEmpty()) { - waitLock.wait(); + lock.wait(); } } } @@ -194,71 +207,92 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { @Override public void schedule(TaskRunnerCallable task) throws RejectedExecutionException { - TaskRunnerCallable evictedTask = waitQueue.offer(task); + TaskWrapper taskWrapper = new TaskWrapper(task); + knownTasks.add(taskWrapper); + TaskWrapper evictedTask; + try { + // Don't need a lock. Not subscribed for notifications yet, and marked as inWaitQueue + evictedTask = waitQueue.offer(taskWrapper); + } catch (RejectedExecutionException e) { + knownTasks.remove(taskWrapper); + throw e; + } if (evictedTask == null) { if (isInfoEnabled) { LOG.info(task.getRequestId() + " added to wait queue."); } - if (isDebugEnabled) { LOG.debug("Wait Queue: {}", waitQueue); } - synchronized (waitLock) { - waitLock.notify(); - } } else { - evictedTask.killTask(); + evictedTask.maybeUnregisterForFinishedStateNotifications(); + evictedTask.getTaskRunnerCallable().killTask(); if (isInfoEnabled) { - LOG.info(task.getRequestId() + " evicted from wait queue because of low priority"); + LOG.info("{} evicted from wait queue in favor of {} because of lower priority", + evictedTask.getRequestId(), task.getRequestId()); } } + synchronized (lock) { + lock.notify(); + } } - private boolean trySchedule(final TaskRunnerCallable task) { + private boolean trySchedule(final TaskWrapper taskWrapper) { boolean scheduled = false; try { - ListenableFuture<TaskRunner2Result> future = executorService.submit(task); - FutureCallback<TaskRunner2Result> wrappedCallback = new InternalCompletionListener(task); - // Callback on a separate thread so that when a task completes, the thread in the main queue - // is actually available for execution and will not potentially result in a RejectedExecution - Futures.addCallback(future, wrappedCallback, executionCompletionExecutorService); + synchronized (lock) { + boolean canFinish = taskWrapper.getTaskRunnerCallable().canFinish(); + boolean stateChanged = taskWrapper.maybeRegisterForFinishedStateNotifications(canFinish); + ListenableFuture<TaskRunner2Result> future = executorService.submit(taskWrapper.getTaskRunnerCallable()); + taskWrapper.setIsInWaitQueue(false); + FutureCallback<TaskRunner2Result> wrappedCallback = new InternalCompletionListener(taskWrapper); + // Callback on a separate thread so that when a task completes, the thread in the main queue + // is actually available for execution and will not potentially result in a RejectedExecution + Futures.addCallback(future, wrappedCallback, executionCompletionExecutorService); - if (isInfoEnabled) { - LOG.info(task.getRequestId() + " scheduled for execution."); - } - - // only tasks that cannot finish immediately are pre-emptable. In other words, if all inputs - // to the tasks are not ready yet, the task is eligible for pre-emptable. - if (enablePreemption && !task.canFinish()) { if (isInfoEnabled) { - LOG.info(task.getRequestId() + " is not finishable. Adding it to pre-emption queue."); + LOG.info("{} scheduled for execution. canFinish={}", taskWrapper.getRequestId(), canFinish); } - preemptionQueue.add(task); - } + // only tasks that cannot finish immediately are pre-emptable. In other words, if all inputs + // to the tasks are not ready yet, the task is eligible for pre-emptable. + if (enablePreemption) { + if (!canFinish && !stateChanged) { + if (isInfoEnabled) { + LOG.info("{} is not finishable. Adding it to pre-emption queue", taskWrapper.getRequestId()); + } + addToPreemptionQueue(taskWrapper); + } + } + } numSlotsAvailable.decrementAndGet(); scheduled = true; } catch (RejectedExecutionException e) { - - if (enablePreemption && task.canFinish() && !preemptionQueue.isEmpty()) { + if (enablePreemption && taskWrapper.getTaskRunnerCallable().canFinish() && !preemptionQueue.isEmpty()) { if (isDebugEnabled) { LOG.debug("Preemption Queue: " + preemptionQueue); } - TaskRunnerCallable pRequest = preemptionQueue.remove(); - if (pRequest != null) { + TaskWrapper pRequest = removeAndGetFromPreemptionQueue(); - if (isInfoEnabled) { - LOG.info("Invoking kill task for {} due to pre-emption to run {}", - pRequest.getRequestId(), task.getRequestId()); + // Avoid preempting tasks which are finishable - callback still to be processed. + if (pRequest != null) { + if (pRequest.getTaskRunnerCallable().canFinish()) { + LOG.info( + "Removed {} from preemption queue, but not preempting since it's now finishable", + pRequest.getRequestId()); + } else { + if (isInfoEnabled) { + LOG.info("Invoking kill task for {} due to pre-emption to run {}", + pRequest.getRequestId(), taskWrapper.getRequestId()); + } + // The task will either be killed or is already in the process of completing, which will + // trigger the next scheduling run, or result in available slots being higher than 0, + // which will cause the scheduler loop to continue. + pRequest.getTaskRunnerCallable().killTask(); } - - // The task will either be killed or is already in the process of completing, which will - // trigger the next scheduling run, or result in available slots being higher than 0, - // which will cause the scheduler loop to continue. - pRequest.killTask(); } } } @@ -266,23 +300,76 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { return scheduled; } + private void finishableStateUpdated(TaskWrapper taskWrapper, boolean newFinishableState) { + synchronized (lock) { + if (taskWrapper.isInWaitQueue()) { + // Re-order the wait queue + LOG.info("DEBUG: Re-ordering the wait queue since {} finishable state moved to {}", + taskWrapper.getRequestId(), newFinishableState); + if (waitQueue.remove(taskWrapper)) { + // Put element back onlt if it existed. + waitQueue.offer(taskWrapper); + } else { + LOG.warn("Failed to remove {} from waitQueue", + taskWrapper.getTaskRunnerCallable().getRequestId()); + } + } + + if (newFinishableState == true && taskWrapper.isInPreemptionQueue()) { + LOG.info("DEBUG: Removing {} from preemption queue because it's state changed to {}", + taskWrapper.getRequestId(), newFinishableState); + preemptionQueue.remove(taskWrapper.getTaskRunnerCallable()); + } else if (newFinishableState == false && !taskWrapper.isInPreemptionQueue() && + !taskWrapper.isInWaitQueue()) { + LOG.info("DEBUG: Adding {} to preemption queue since finishable state changed to {}", + taskWrapper.getRequestId(), newFinishableState); + preemptionQueue.offer(taskWrapper); + } + lock.notify(); + } + } + + private void addToPreemptionQueue(TaskWrapper taskWrapper) { + synchronized (lock) { + preemptionQueue.add(taskWrapper); + taskWrapper.setIsInPreemptableQueue(true); + } + } + + private TaskWrapper removeAndGetFromPreemptionQueue() { + TaskWrapper taskWrapper; + synchronized (lock) { + taskWrapper = preemptionQueue.remove(); + if (taskWrapper != null) { + taskWrapper.setIsInPreemptableQueue(false); + } + } + return taskWrapper; + } + private final class InternalCompletionListener implements FutureCallback<TaskRunner2Result> { - private TaskRunnerCallable task; + private final TaskWrapper taskWrapper; - public InternalCompletionListener(TaskRunnerCallable task) { - this.task = task; + public InternalCompletionListener(TaskWrapper taskWrapper) { + this.taskWrapper = taskWrapper; } @Override public void onSuccess(TaskRunner2Result result) { - task.getCallback().onSuccess(result); + knownTasks.remove(taskWrapper); + taskWrapper.setIsInPreemptableQueue(false); + taskWrapper.maybeUnregisterForFinishedStateNotifications(); + taskWrapper.getTaskRunnerCallable().getCallback().onSuccess(result); updatePreemptionListAndNotify(result.getEndReason()); } @Override public void onFailure(Throwable t) { - task.getCallback().onFailure(t); + knownTasks.remove(taskWrapper); + taskWrapper.setIsInPreemptableQueue(false); + taskWrapper.maybeUnregisterForFinishedStateNotifications(); + taskWrapper.getTaskRunnerCallable().getCallback().onFailure(t); updatePreemptionListAndNotify(null); LOG.error("Failed notification received: Stacktrace: " + ExceptionUtils.getStackTrace(t)); } @@ -291,17 +378,18 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { // if this task was added to pre-emption list, remove it if (enablePreemption) { String state = reason == null ? "FAILED" : reason.name(); - preemptionQueue.remove(task.getRequest()); - if (isInfoEnabled) { - LOG.info(TaskRunnerCallable.getTaskIdentifierString(task.getRequest()) + boolean removed = preemptionQueue.remove(taskWrapper); + if (removed && isInfoEnabled) { + LOG.info(TaskRunnerCallable + .getTaskIdentifierString(taskWrapper.getTaskRunnerCallable().getRequest()) + " request " + state + "! Removed from preemption list."); } } numSlotsAvailable.incrementAndGet(); - if (!waitQueue.isEmpty()) { - synchronized (waitLock) { - waitLock.notify(); + synchronized (lock) { + if (!waitQueue.isEmpty()) { + lock.notify(); } } } @@ -340,10 +428,12 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { } @VisibleForTesting - public static class WaitQueueComparator implements Comparator<TaskRunnerCallable> { + public static class WaitQueueComparator implements Comparator<TaskWrapper> { @Override - public int compare(TaskRunnerCallable o1, TaskRunnerCallable o2) { + public int compare(TaskWrapper t1, TaskWrapper t2) { + TaskRunnerCallable o1 = t1.getTaskRunnerCallable(); + TaskRunnerCallable o2 = t2.getTaskRunnerCallable(); boolean newCanFinish = o1.canFinish(); boolean oldCanFinish = o2.canFinish(); if (newCanFinish == true && oldCanFinish == false) { @@ -368,10 +458,12 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { } @VisibleForTesting - public static class PreemptionQueueComparator implements Comparator<TaskRunnerCallable> { + public static class PreemptionQueueComparator implements Comparator<TaskWrapper> { @Override - public int compare(TaskRunnerCallable o1, TaskRunnerCallable o2) { + public int compare(TaskWrapper t1, TaskWrapper t2) { + TaskRunnerCallable o1 = t1.getTaskRunnerCallable(); + TaskRunnerCallable o2 = t2.getTaskRunnerCallable(); if (o1.getVertexParallelism() > o2.getVertexParallelism()) { return 1; } else if (o1.getVertexParallelism() < o2.getVertexParallelism()) { @@ -380,4 +472,83 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { return 0; } } + + + private class TaskWrapper implements FinishableStateUpdateHandler { + private final TaskRunnerCallable taskRunnerCallable; + private boolean inWaitQueue = true; + private boolean inPreemptionQueue = false; + private boolean registeredForNotifications = false; + + public TaskWrapper(TaskRunnerCallable taskRunnerCallable) { + this.taskRunnerCallable = taskRunnerCallable; + } + + // Methods are synchronized primarily for visibility. + /** + * + * @param currentFinishableState + * @return true if the current state is the same as the currentFinishableState. false if the state has already changed. + */ + public synchronized boolean maybeRegisterForFinishedStateNotifications( + boolean currentFinishableState) { + if (!registeredForNotifications) { + registeredForNotifications = true; + return taskRunnerCallable.getFragmentInfo() + .registerForFinishableStateUpdates(this, currentFinishableState); + } else { + return true; + } + } + + public synchronized void maybeUnregisterForFinishedStateNotifications() { + if (registeredForNotifications) { + registeredForNotifications = false; + taskRunnerCallable.getFragmentInfo().unregisterForFinishableStateUpdates(this); + } + } + + public TaskRunnerCallable getTaskRunnerCallable() { + return taskRunnerCallable; + } + + public synchronized boolean isInWaitQueue() { + return inWaitQueue; + } + + public synchronized boolean isInPreemptionQueue() { + return inPreemptionQueue; + } + + public synchronized void setIsInWaitQueue(boolean value) { + this.inWaitQueue = value; + } + + public synchronized void setIsInPreemptableQueue(boolean value) { + this.inPreemptionQueue = value; + } + + public String getRequestId() { + return taskRunnerCallable.getRequestId(); + } + + @Override + public String toString() { + return "TaskWrapper{" + + "task=" + taskRunnerCallable.getRequestId() + + ", inWaitQueue=" + inWaitQueue + + ", inPreemptionQueue=" + inPreemptionQueue + + ", registeredForNotifications=" + registeredForNotifications + + '}'; + } + + @Override + public void finishableStateUpdated(boolean finishableState) { + // This method should not by synchronized. Can lead to deadlocks since it calls a sync method. + // Meanwhile the scheduler could try updating states via a synchronized method. + LOG.info("DEBUG: Received finishable state update for {}, state={}", + taskRunnerCallable.getRequestId(), finishableState); + TaskExecutorService.this.finishableStateUpdated(this, finishableState); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/baddfa5c/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 2ea39b7..007c83d 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -324,6 +324,10 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { return requestId; } + public QueryFragmentInfo getFragmentInfo() { + return fragmentInfo; + } + public TaskRunnerCallback getCallback() { return new TaskRunnerCallback(request, this); }