Repository: hive Updated Branches: refs/heads/llap 8b442b266 -> e7994d858
HIVE-10683. LLAP: Add a mechanism for daemons to inform the AM about killed tasks. (Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e7994d85 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e7994d85 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e7994d85 Branch: refs/heads/llap Commit: e7994d85812cdf05121d1b57811d172d83aebb1f Parents: 8b442b2 Author: Siddharth Seth <ss...@apache.org> Authored: Tue May 12 15:15:16 2015 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Tue May 12 15:15:16 2015 -0700 ---------------------------------------------------------------------- .../hive/llap/daemon/KilledTaskHandler.java | 28 ++++++++ .../hive/llap/daemon/impl/AMReporter.java | 54 +++++++++++++- .../llap/daemon/impl/ContainerRunnerImpl.java | 20 ++++-- .../hive/llap/daemon/impl/QueryTracker.java | 2 + .../llap/daemon/impl/TaskExecutorService.java | 18 ++--- .../llap/daemon/impl/TaskRunnerCallable.java | 76 +++++++++++++++++--- .../llap/metrics/LlapDaemonExecutorInfo.java | 3 +- .../llap/metrics/LlapDaemonExecutorMetrics.java | 19 ++--- .../protocol/LlapTaskUmbilicalProtocol.java | 2 + .../llap/tezplugins/LlapTaskCommunicator.java | 14 +++- .../daemon/impl/TestTaskExecutorService.java | 4 +- 11 files changed, 196 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java new file mode 100644 index 0000000..8b481c8 --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java @@ -0,0 +1,28 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon; + +import org.apache.hadoop.security.token.Token; +import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.dag.records.TezTaskAttemptID; + +public interface KilledTaskHandler { + + // TODO Ideally, this should only need to send in the TaskAttemptId. Everything else should be + // inferred from this. + // Passing in parameters until there's some dag information stored and tracked in the daemon. + void taskKilled(String amLocation, int port, String user, + Token<JobTokenIdentifier> jobToken, TezTaskAttemptID taskAttemptId); +} http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index 3dc8fb8..39b3634 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -46,11 +46,12 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.service.AbstractService; import org.apache.tez.common.security.JobTokenIdentifier; +import org.apache.tez.dag.records.TezTaskAttemptID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** - * Sends status updates to various AMs. + * Responsible for communicating with various AMs. */ public class AMReporter extends AbstractService { @@ -80,6 +81,8 @@ public class AMReporter extends AbstractService { private final DelayQueue<AMNodeInfo> pendingHeartbeatQueeu = new DelayQueue(); private final long heartbeatInterval; private final AtomicBoolean isShutdown = new AtomicBoolean(false); + // Tracks appMasters to which heartbeats are being sent. This should not be used for any other + // messages like taskKilled, etc. private final Map<LlapNodeId, AMNodeInfo> knownAppMasters = new HashMap<>(); volatile ListenableFuture<Void> queueLookupFuture; @@ -167,6 +170,28 @@ public class AMReporter extends AbstractService { } } + public void taskKilled(String amLocation, int port, String user, Token<JobTokenIdentifier> jobToken, + final TezTaskAttemptID taskAttemptId) { + // Not re-using the connection for the AM heartbeat - which may or may not be open by this point. + // knownAppMasters is used for sending heartbeats for queued tasks. Killed messages use a new connection. + LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); + AMNodeInfo amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, conf); + ListenableFuture<Void> future = + executor.submit(new KillTaskCallable(taskAttemptId, amNodeInfo)); + Futures.addCallback(future, new FutureCallback<Void>() { + @Override + public void onSuccess(Void result) { + LOG.info("Sent taskKilled for {}", taskAttemptId); + } + + @Override + public void onFailure(Throwable t) { + LOG.warn("Failed to send taskKilled for {}. The attempt will likely time out.", + taskAttemptId); + } + }); + } + private class QueueLookupCallable extends CallableWithNdc<Void> { @Override @@ -199,6 +224,31 @@ public class AMReporter extends AbstractService { } } + private class KillTaskCallable extends CallableWithNdc<Void> { + final AMNodeInfo amNodeInfo; + final TezTaskAttemptID taskAttemptId; + + public KillTaskCallable(TezTaskAttemptID taskAttemptId, + AMNodeInfo amNodeInfo) { + this.taskAttemptId = taskAttemptId; + this.amNodeInfo = amNodeInfo; + } + + @Override + protected Void callInternal() { + try { + amNodeInfo.getUmbilical().taskKilled(taskAttemptId); + } catch (IOException e) { + LOG.warn("Failed to send taskKilled message for task {}. Will re-run after it times out", taskAttemptId); + } catch (InterruptedException e) { + if (!isShutdown.get()) { + LOG.info("Interrupted while trying to send taskKilled message for task {}", taskAttemptId); + } + } + return null; + } + } + private class AMHeartbeatCallable extends CallableWithNdc<Void> { final AMNodeInfo amNodeInfo; @@ -224,7 +274,7 @@ public class AMReporter extends AbstractService { LOG.warn("Failed to communicate with AM. May retry later: " + amNodeInfo.amNodeId, e); } catch (InterruptedException e) { if (!isShutdown.get()) { - LOG.warn("Failed to communicate with AM: " + amNodeInfo.amNodeId, e); + LOG.warn("Interrupted while trying to send heartbeat to AM: " + amNodeInfo.amNodeId, e); } } } else { http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index c9e5829..e544789 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; +import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto; @@ -72,6 +73,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun private final LlapDaemonExecutorMetrics metrics; private final Configuration conf; private final TaskRunnerCallable.ConfParams confParams; + private final KilledTaskHandler killedTaskHandler = new KilledTaskHandlerImpl(); // Map of dagId to vertices and associated state. private final ConcurrentMap<String, ConcurrentMap<String, SourceStateProto>> sourceCompletionMap = new ConcurrentHashMap<>(); @@ -110,7 +112,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun ); LOG.info("ContainerRunnerImpl config: " + - "memoryPerExecutorDerviced=" + memoryPerExecutor + "memoryPerExecutorDerviced=" + memoryPerExecutor ); } @@ -193,7 +195,7 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun ConcurrentMap<String, SourceStateProto> sourceCompletionMap = getSourceCompletionMap(request.getFragmentSpec().getDagName()); TaskRunnerCallable callable = new TaskRunnerCallable(request, new Configuration(getConfig()), new ExecutionContextImpl(localAddress.get().getHostName()), env, localDirs, - credentials, memoryPerExecutor, amReporter, sourceCompletionMap, confParams, metrics); + credentials, memoryPerExecutor, amReporter, sourceCompletionMap, confParams, metrics, killedTaskHandler); executorService.schedule(callable); metrics.incrExecutorTotalRequestsHandled(); metrics.incrExecutorNumQueuedRequests(); @@ -219,11 +221,6 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun // TODO Implement when this gets used. } - - private void notifyAMOfRejection(TaskRunnerCallable callable) { - LOG.error("Notifying AM of request rejection is not implemented yet!"); - } - private String stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) { StringBuilder sb = new StringBuilder(); sb.append("dagName=").append(request.getDagName()) @@ -294,4 +291,13 @@ public class ContainerRunnerImpl extends AbstractService implements ContainerRun } return dagMap; } + + private class KilledTaskHandlerImpl implements KilledTaskHandler { + + @Override + public void taskKilled(String amLocation, int port, String user, + Token<JobTokenIdentifier> jobToken, TezTaskAttemptID taskAttemptId) { + amReporter.taskKilled(amLocation, port, user, jobToken, taskAttemptId); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index 16d745b..5c8116e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -61,6 +61,8 @@ public class QueryTracker { queryInfo = new QueryInfo(queryId, appIdString, dagName, dagIdentifier, user); queryInfoMap.putIfAbsent(dagName, queryInfo); } + // TODO Start tracking individual fragments, so that taskKilled etc messages + // can be routed through this layer to simplify the interfaces. } String[] getLocalDirs(String queryId, String dagName, String user) throws IOException { http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 11ba793..42a3528 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -30,7 +30,6 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.log4j.Logger; import org.apache.tez.runtime.task.TaskRunner2Result; import com.google.common.annotations.VisibleForTesting; @@ -40,6 +39,8 @@ import com.google.common.util.concurrent.ListenableFuture; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Task executor service provides method for scheduling tasks. Tasks submitted to executor service @@ -61,7 +62,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder; * new tasks. Shutting down of the task executor service can be done gracefully or immediately. */ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { - private static final Logger LOG = Logger.getLogger(TaskExecutorService.class); + private static final Logger LOG = LoggerFactory.getLogger(TaskExecutorService.class); private static final boolean isInfoEnabled = LOG.isInfoEnabled(); private static final boolean isDebugEnabled = LOG.isDebugEnabled(); private static final boolean isTraceEnabled = LOG.isTraceEnabled(); @@ -183,7 +184,7 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { } } - private boolean trySchedule(TaskRunnerCallable task) { + private boolean trySchedule(final TaskRunnerCallable task) { boolean scheduled = false; try { @@ -236,7 +237,9 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { LOG.debug("Pre-emption invoked for " + pRequest.getRequestId() + " by interrupting the thread."); } - pFuture.cancel(true); + pRequest.killTask(); + // TODO. Ideally, should wait for the thread to complete and fall off before assuming the + // slot is available for the next task. removeTaskFromPreemptionList(pRequest, pRequest.getRequestId()); // future is cancelled or completed normally, in which case schedule the new request @@ -244,8 +247,6 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { if (isDebugEnabled) { LOG.debug(pRequest.getRequestId() + " request preempted by " + task.getRequestId()); } - - notifyAM(pRequest); } } @@ -333,11 +334,6 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { } - private void notifyAM(TaskRunnerCallable request) { - // TODO: Report to AM of pre-emption and rejection - LOG.info("Notifying to AM of preemption is not implemented yet!"); - } - // TODO: llap daemon should call this to gracefully shutdown the task executor service public void shutDown(boolean awaitTermination) { if (awaitTermination) { http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 7e7c133..b16a5c4 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.CallableWithNdc; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; +import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto; @@ -56,6 +57,7 @@ import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; import org.apache.tez.runtime.internals.api.TaskReporterInterface; import org.apache.tez.runtime.library.input.UnorderedKVInput; +import org.apache.tez.runtime.task.EndReason; import org.apache.tez.runtime.task.TaskRunner2Result; import com.google.common.base.Stopwatch; @@ -88,6 +90,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { private final AMReporter amReporter; private final ConcurrentMap<String, LlapDaemonProtocolProtos.SourceStateProto> sourceCompletionMap; private final TaskSpec taskSpec; + private final KilledTaskHandler killedTaskHandler; private volatile TezTaskRunner2 taskRunner; private volatile TaskReporterInterface taskReporter; private volatile ListeningExecutorService executor; @@ -96,13 +99,17 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { private volatile String threadName; private LlapDaemonExecutorMetrics metrics; protected String requestId; + private boolean shouldRunTask = true; + final Stopwatch runtimeWatch = new Stopwatch(); + final Stopwatch killtimerWatch = new Stopwatch(); TaskRunnerCallable(LlapDaemonProtocolProtos.SubmitWorkRequestProto request, Configuration conf, ExecutionContext executionContext, Map<String, String> envMap, String[] localDirs, Credentials credentials, long memoryAvailable, AMReporter amReporter, ConcurrentMap<String, LlapDaemonProtocolProtos.SourceStateProto> sourceCompletionMap, - ConfParams confParams, LlapDaemonExecutorMetrics metrics) { + ConfParams confParams, LlapDaemonExecutorMetrics metrics, + KilledTaskHandler killedTaskHandler) { this.request = request; this.conf = conf; this.executionContext = executionContext; @@ -123,6 +130,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { } this.metrics = metrics; this.requestId = getTaskAttemptId(request); + this.killedTaskHandler = killedTaskHandler; } @Override @@ -146,7 +154,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { executor = MoreExecutors.listeningDecorator(executorReal); // TODO Consolidate this code with TezChild. - Stopwatch sw = new Stopwatch().start(); + runtimeWatch.start(); UserGroupInformation taskUgi = UserGroupInformation.createRemoteUser(request.getUser()); taskUgi.addCredentials(credentials); @@ -177,12 +185,21 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { new AtomicLong(0), request.getContainerIdString()); - taskRunner = new TezTaskRunner2(conf, taskUgi, localDirs, - taskSpec, - request.getAppAttemptNumber(), - serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry, - pid, - executionContext, memoryAvailable); + synchronized (this) { + if (shouldRunTask) { + taskRunner = new TezTaskRunner2(conf, taskUgi, localDirs, + taskSpec, + request.getAppAttemptNumber(), + serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, + objectRegistry, + pid, + executionContext, memoryAvailable); + } + } + if (taskRunner == null) { + LOG.info("Not starting task {} since it was killed earlier", taskSpec.getTaskAttemptID()); + return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false); + } try { TaskRunner2Result result = taskRunner.run(); @@ -195,7 +212,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { // TODO Fix UGI and FS Handling. Closing UGI here causes some errors right now. // FileSystem.closeAllForUGI(taskUgi); LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" + - sw.stop().elapsed(TimeUnit.MILLISECONDS)); + runtimeWatch.stop().elapsed(TimeUnit.MILLISECONDS)); if (LOG.isDebugEnabled()) { LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); } @@ -203,6 +220,35 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { } /** + * Attempt to kill a running task. If the task has not started running, it will not start. + * If it's already running, a kill request will be sent to it. + * + * The AM will be informed about the task kill. + */ + public void killTask() { + synchronized (this) { + LOG.info("Killing task with id {}, taskRunnerSetup={}", taskSpec.getTaskAttemptID(), (taskRunner != null)); + if (taskRunner != null) { + killtimerWatch.start(); + LOG.info("Issuing kill to task {}" + taskSpec.getTaskAttemptID()); + taskRunner.killTask(); + shouldRunTask = false; + } + } + // Sending a kill message to the AM right here. Don't need to wait for the task to complete. + reportTaskKilled(); + } + + /** + * Inform the AM that this task has been killed. + */ + public void reportTaskKilled() { + killedTaskHandler + .taskKilled(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken, + taskSpec.getTaskAttemptID()); + } + + /** * Check whether a task can run to completion or may end up blocking on it's sources. * This currently happens via looking up source state. * TODO: Eventually, this should lookup the Hive Processor to figure out whether @@ -314,7 +360,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { return requestId; } - // TODO Slightly more useful error handling + // Errors are handled on the way over. FAIL/SUCCESS is informed via regular heartbeats. Killed + // via a kill message when a task kill is requested by the daemon. @Override public void onSuccess(TaskRunner2Result result) { switch(result.getEndReason()) { @@ -327,7 +374,14 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { LOG.warn("Unexpected CONTAINER_STOP_REQUEST for {}", requestId); break; case KILL_REQUESTED: - // TODO Send a kill out to the AM. + LOG.info("Killed task {}", requestId); + if (killtimerWatch.isRunning()) { + killtimerWatch.stop(); + long elapsed = killtimerWatch.elapsed(TimeUnit.MILLISECONDS); + LOG.info("Time to die for task {}", elapsed); + } + metrics.incrPreemptionTimeLost(runtimeWatch.elapsed(TimeUnit.MILLISECONDS)); + metrics.incrExecutorTotalKilled(); break; case COMMUNICATION_FAILURE: LOG.info("Failed to run {} due to communication failure", requestId); http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java index d7bed53..e4739dc 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java @@ -33,7 +33,8 @@ public enum LlapDaemonExecutorInfo implements MetricsInfo { ExecutorTotalSuccess("Total number of requests handled by the container that succeeded"), ExecutorTotalExecutionFailure("Total number of requests handled by the container that failed execution"), ExecutorTotalInterrupted("Total number of requests handled by the container that got interrupted"), - ExecutorTotalAskedToDie("Total number of requests handled by the container that were asked to die"); + ExecutorTotalAskedToDie("Total number of requests handled by the container that were asked to die"), + PreemptionTimeLost("Total time lost due to task preemptions"); private final String desc; http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java index 22c9fe0..33b8f9d 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java @@ -26,6 +26,7 @@ import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.Executo import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalRequestsHandled; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalSuccess; import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMetrics; +import static org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.PreemptionTimeLost; import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName; import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId; @@ -72,11 +73,12 @@ public class LlapDaemonExecutorMetrics implements MetricsSource { @Metric MutableCounterLong executorTotalSuccess; @Metric - MutableCounterLong executorTotalInterrupted; + MutableCounterLong executorTotalIKilled; @Metric MutableCounterLong executorTotalExecutionFailed; @Metric - MutableCounterLong executorTotalAskedToDie; + MutableCounterLong preemptionTimeLost; + private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String sessionId, int numExecutors) { @@ -141,14 +143,15 @@ public class LlapDaemonExecutorMetrics implements MetricsSource { executorTotalExecutionFailed.incr(); } - public void incrExecutorTotalInterrupted() { - executorTotalInterrupted.incr(); + public void incrPreemptionTimeLost(long value) { + preemptionTimeLost.incr(value); } - public void incrExecutorTotalAskedToDie() { - executorTotalAskedToDie.incr(); + public void incrExecutorTotalKilled() { + executorTotalIKilled.incr(); } + private void getExecutorStats(MetricsRecordBuilder rb) { updateThreadMetrics(rb); @@ -156,8 +159,8 @@ public class LlapDaemonExecutorMetrics implements MetricsSource { .addCounter(ExecutorNumQueuedRequests, executorNumQueuedRequests.value()) .addCounter(ExecutorTotalSuccess, executorTotalSuccess.value()) .addCounter(ExecutorTotalExecutionFailure, executorTotalExecutionFailed.value()) - .addCounter(ExecutorTotalInterrupted, executorTotalInterrupted.value()) - .addCounter(ExecutorTotalAskedToDie, executorTotalAskedToDie.value()); + .addCounter(ExecutorTotalInterrupted, executorTotalIKilled.value()) + .addCounter(PreemptionTimeLost, preemptionTimeLost.value()); } private void updateThreadMetrics(MetricsRecordBuilder rb) { http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java b/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java index 886194a..2f5e11d 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java @@ -34,4 +34,6 @@ public interface LlapTaskUmbilicalProtocol extends VersionedProtocol { public void nodeHeartbeat(Text hostname, int port); + public void taskKilled(TezTaskAttemptID taskAttemptId); + } http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 99459e4..d614548 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -278,9 +278,9 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) { super.unregisterRunningTaskAttempt(taskAttemptID); entityTracker.unregisterTaskAttempt(taskAttemptID); - // TODO Inform the daemon that this task is no longer running. - // Currently, a task will end up moving into the RUNNING queue and will - // be told that it needs to die since it isn't recognized. + // This will also be invoked for tasks which have been KILLED / rejected by the daemon. + // Informing the daemon becomes necessary once the LlapScheduler supports preemption + // and/or starts attempting to kill tasks which may be running on a node. } @Override @@ -407,6 +407,14 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } @Override + public void taskKilled(TezTaskAttemptID taskAttemptId) { + // TODO Unregister the task for state updates, which could in turn unregister the node. + getTaskCommunicatorContext().taskKilled(taskAttemptId, + TaskAttemptEndReason.INTERRUPTED_BY_SYSTEM, "Attempt preempted"); + entityTracker.unregisterTaskAttempt(taskAttemptId); + } + + @Override public long getProtocolVersion(String protocol, long clientVersion) throws IOException { return versionID; } http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index 95750c4..3b89b48 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -18,12 +18,14 @@ package org.apache.hadoop.hive.llap.daemon.impl; import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; import java.util.concurrent.BlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RejectedExecutionException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; @@ -54,7 +56,7 @@ public class TestTaskExecutorService { public MockRequest(LlapDaemonProtocolProtos.SubmitWorkRequestProto requestProto, boolean canFinish, int workTime) { super(requestProto, conf, new ExecutionContextImpl("localhost"), null, null, cred, 0, null, - null, null, null); + null, null, null, mock(KilledTaskHandler.class)); this.workTime = workTime; this.canFinish = canFinish; }