Repository: hive Updated Branches: refs/heads/llap e6b1556e3 -> 8b442b266
HIVE-10682. LLAP: Make use of the task runner which allows killing tasks. (Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/8b442b26 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/8b442b26 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/8b442b26 Branch: refs/heads/llap Commit: 8b442b266f1d5e95b2746109ac188b6ec6545cef Parents: e6b1556 Author: Siddharth Seth <ss...@apache.org> Authored: Mon May 11 23:38:49 2015 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Mon May 11 23:38:49 2015 -0700 ---------------------------------------------------------------------- .../llap/daemon/impl/TaskExecutorService.java | 16 ++-- .../llap/daemon/impl/TaskRunnerCallable.java | 84 +++++++++----------- .../daemon/impl/TestTaskExecutorService.java | 6 +- 3 files changed, 49 insertions(+), 57 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/8b442b26/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index ed8df95..11ba793 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -31,7 +31,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import org.apache.log4j.Logger; -import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult; +import org.apache.tez.runtime.task.TaskRunner2Result; import com.google.common.annotations.VisibleForTesting; import com.google.common.util.concurrent.FutureCallback; @@ -187,8 +187,8 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { boolean scheduled = false; try { - ListenableFuture<ContainerExecutionResult> future = executorService.submit(task); - FutureCallback<ContainerExecutionResult> wrappedCallback = + ListenableFuture<TaskRunner2Result> future = executorService.submit(task); + FutureCallback<TaskRunner2Result> wrappedCallback = new InternalCompletionListener(task.getCallback()); Futures.addCallback(future, wrappedCallback); @@ -252,8 +252,8 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { // try to submit the task from wait queue to executor service. If it gets rejected the // task from wait queue will hold on to its position for next try. try { - ListenableFuture<ContainerExecutionResult> future = executorService.submit(task); - FutureCallback<ContainerExecutionResult> wrappedCallback = + ListenableFuture<TaskRunner2Result> future = executorService.submit(task); + FutureCallback<TaskRunner2Result> wrappedCallback = new InternalCompletionListener(task.getCallback()); Futures.addCallback(future, wrappedCallback); numSlotsAvailable.decrementAndGet(); @@ -285,14 +285,14 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { } private synchronized void addTaskToPreemptionList(TaskRunnerCallable task, - ListenableFuture<ContainerExecutionResult> future) { + ListenableFuture<TaskRunner2Result> future) { idToTaskMap.put(task.getRequestId(), task); preemptionMap.put(task, future); preemptionQueue.add(task); } private final class InternalCompletionListener implements - FutureCallback<ContainerExecutionResult> { + FutureCallback<TaskRunner2Result> { private TaskRunnerCallable.TaskRunnerCallback wrappedCallback; public InternalCompletionListener(TaskRunnerCallable.TaskRunnerCallback wrappedCallback) { @@ -300,7 +300,7 @@ public class TaskExecutorService implements Scheduler<TaskRunnerCallable> { } @Override - public void onSuccess(ContainerExecutionResult result) { + public void onSuccess(TaskRunner2Result result) { wrappedCallback.onSuccess(result); updatePreemptionListAndNotify(true); } http://git-wip-us.apache.org/repos/asf/hive/blob/8b442b26/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index 97a8b78..7e7c133 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hive.llap.daemon.impl; -import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; @@ -27,6 +26,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.conf.Configuration; @@ -45,12 +45,10 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; -import org.apache.log4j.Logger; import org.apache.tez.common.TezCommonUtils; import org.apache.tez.common.security.JobTokenIdentifier; import org.apache.tez.common.security.TokenCache; import org.apache.tez.dag.api.TezConstants; -import org.apache.tez.dag.api.TezException; import org.apache.tez.mapreduce.input.MRInputLegacy; import org.apache.tez.runtime.api.ExecutionContext; import org.apache.tez.runtime.api.impl.InputSpec; @@ -58,8 +56,7 @@ import org.apache.tez.runtime.api.impl.TaskSpec; import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl; import org.apache.tez.runtime.internals.api.TaskReporterInterface; import org.apache.tez.runtime.library.input.UnorderedKVInput; -import org.apache.tez.runtime.task.TezChild; -import org.apache.tez.runtime.task.TezTaskRunner; +import org.apache.tez.runtime.task.TaskRunner2Result; import com.google.common.base.Stopwatch; import com.google.common.collect.HashMultimap; @@ -68,12 +65,15 @@ import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.ListeningExecutorService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.tez.runtime.task.TezTaskRunner2; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * */ -public class TaskRunnerCallable extends CallableWithNdc<TezChild.ContainerExecutionResult> { - private static final Logger LOG = Logger.getLogger(TaskRunnerCallable.class); +public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { + private static final Logger LOG = LoggerFactory.getLogger(TaskRunnerCallable.class); private final LlapDaemonProtocolProtos.SubmitWorkRequestProto request; private final Configuration conf; private final String[] localDirs; @@ -88,7 +88,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TezChild.ContainerExecut private final AMReporter amReporter; private final ConcurrentMap<String, LlapDaemonProtocolProtos.SourceStateProto> sourceCompletionMap; private final TaskSpec taskSpec; - private volatile TezTaskRunner taskRunner; + private volatile TezTaskRunner2 taskRunner; private volatile TaskReporterInterface taskReporter; private volatile ListeningExecutorService executor; private LlapTaskUmbilicalProtocol umbilical; @@ -126,7 +126,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TezChild.ContainerExecut } @Override - protected TezChild.ContainerExecutionResult callInternal() throws Exception { + protected TaskRunner2Result callInternal() throws Exception { this.startTime = System.currentTimeMillis(); this.threadName = Thread.currentThread().getName(); if (LOG.isDebugEnabled()) { @@ -177,43 +177,29 @@ public class TaskRunnerCallable extends CallableWithNdc<TezChild.ContainerExecut new AtomicLong(0), request.getContainerIdString()); - taskRunner = new TezTaskRunner(conf, taskUgi, localDirs, + taskRunner = new TezTaskRunner2(conf, taskUgi, localDirs, taskSpec, request.getAppAttemptNumber(), serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, objectRegistry, pid, executionContext, memoryAvailable); - boolean shouldDie; try { - shouldDie = !taskRunner.run(); - if (shouldDie) { - LOG.info("Got a shouldDie notification via heartbeats. Shutting down"); - return new TezChild.ContainerExecutionResult( - TezChild.ContainerExecutionResult.ExitStatus.ASKED_TO_DIE, null, - "Asked to die by the AM"); + TaskRunner2Result result = taskRunner.run(); + if (result.isContainerShutdownRequested()) { + LOG.warn("Unexpected container shutdown requested while running task. Ignoring"); } - } catch (IOException e) { - return new TezChild.ContainerExecutionResult( - TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE, - e, "TaskExecutionFailure: " + e.getMessage()); - } catch (TezException e) { - return new TezChild.ContainerExecutionResult( - TezChild.ContainerExecutionResult.ExitStatus.EXECUTION_FAILURE, - e, "TaskExecutionFailure: " + e.getMessage()); + return result; + } finally { // TODO Fix UGI and FS Handling. Closing UGI here causes some errors right now. // FileSystem.closeAllForUGI(taskUgi); + LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" + + sw.stop().elapsed(TimeUnit.MILLISECONDS)); + if (LOG.isDebugEnabled()) { + LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); + } } - LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" + - sw.stop().elapsedMillis()); - if (LOG.isDebugEnabled()) { - LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() + ": " + canFinish()); - } - - return new TezChild.ContainerExecutionResult( - TezChild.ContainerExecutionResult.ExitStatus.SUCCESS, null, - null); } /** @@ -311,7 +297,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TezChild.ContainerExecut return new TaskRunnerCallback(request, this); } - final class TaskRunnerCallback implements FutureCallback<TezChild.ContainerExecutionResult> { + final class TaskRunnerCallback implements FutureCallback<TaskRunner2Result> { private final LlapDaemonProtocolProtos.SubmitWorkRequestProto request; private final TaskRunnerCallable taskRunnerCallable; @@ -330,25 +316,29 @@ public class TaskRunnerCallable extends CallableWithNdc<TezChild.ContainerExecut // TODO Slightly more useful error handling @Override - public void onSuccess(TezChild.ContainerExecutionResult result) { - switch (result.getExitStatus()) { + public void onSuccess(TaskRunner2Result result) { + switch(result.getEndReason()) { + // Only the KILLED case requires a message to be sent out to the AM. case SUCCESS: - LOG.info("Successfully finished: " + requestId); + LOG.info("Successfully finished {}", requestId); metrics.incrExecutorTotalSuccess(); break; - case EXECUTION_FAILURE: - LOG.info("Failed to run: " + requestId); - metrics.incrExecutorTotalExecutionFailed(); + case CONTAINER_STOP_REQUESTED: + LOG.warn("Unexpected CONTAINER_STOP_REQUEST for {}", requestId); break; - case INTERRUPTED: - LOG.info("Interrupted while running: " + requestId); - metrics.incrExecutorTotalInterrupted(); + case KILL_REQUESTED: + // TODO Send a kill out to the AM. break; - case ASKED_TO_DIE: - LOG.info("Asked to die while running: " + requestId); - metrics.incrExecutorTotalAskedToDie(); + case COMMUNICATION_FAILURE: + LOG.info("Failed to run {} due to communication failure", requestId); + metrics.incrExecutorTotalExecutionFailed(); + break; + case TASK_ERROR: + LOG.info("Failed to run {} due to task error", requestId); + metrics.incrExecutorTotalExecutionFailed(); break; } + taskRunnerCallable.shutdown(); HistoryLogger .logFragmentEnd(request.getApplicationIdString(), request.getContainerIdString(), http://git-wip-us.apache.org/repos/asf/hive/blob/8b442b26/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java index 44a4633..95750c4 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java @@ -35,6 +35,8 @@ import org.apache.tez.dag.records.TezTaskAttemptID; import org.apache.tez.dag.records.TezTaskID; import org.apache.tez.dag.records.TezVertexID; import org.apache.tez.runtime.api.impl.ExecutionContextImpl; +import org.apache.tez.runtime.task.EndReason; +import org.apache.tez.runtime.task.TaskRunner2Result; import org.apache.tez.runtime.task.TezChild; import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult; import org.apache.tez.runtime.task.TezChild.ContainerExecutionResult.ExitStatus; @@ -58,10 +60,10 @@ public class TestTaskExecutorService { } @Override - protected TezChild.ContainerExecutionResult callInternal() throws Exception { + protected TaskRunner2Result callInternal() throws Exception { System.out.println(requestId + " is executing.."); Thread.sleep(workTime); - return new ContainerExecutionResult(ExitStatus.SUCCESS, null, null); + return new TaskRunner2Result(EndReason.SUCCESS, null, false); } @Override