Repository: hive
Updated Branches:
  refs/heads/llap 8b442b266 -> e7994d858


HIVE-10683. LLAP: Add a mechanism for daemons to inform the AM about killed 
tasks. (Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e7994d85
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e7994d85
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e7994d85

Branch: refs/heads/llap
Commit: e7994d85812cdf05121d1b57811d172d83aebb1f
Parents: 8b442b2
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue May 12 15:15:16 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue May 12 15:15:16 2015 -0700

----------------------------------------------------------------------
 .../hive/llap/daemon/KilledTaskHandler.java     | 28 ++++++++
 .../hive/llap/daemon/impl/AMReporter.java       | 54 +++++++++++++-
 .../llap/daemon/impl/ContainerRunnerImpl.java   | 20 ++++--
 .../hive/llap/daemon/impl/QueryTracker.java     |  2 +
 .../llap/daemon/impl/TaskExecutorService.java   | 18 ++---
 .../llap/daemon/impl/TaskRunnerCallable.java    | 76 +++++++++++++++++---
 .../llap/metrics/LlapDaemonExecutorInfo.java    |  3 +-
 .../llap/metrics/LlapDaemonExecutorMetrics.java | 19 ++---
 .../protocol/LlapTaskUmbilicalProtocol.java     |  2 +
 .../llap/tezplugins/LlapTaskCommunicator.java   | 14 +++-
 .../daemon/impl/TestTaskExecutorService.java    |  4 +-
 11 files changed, 196 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
new file mode 100644
index 0000000..8b481c8
--- /dev/null
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon;
+
+import org.apache.hadoop.security.token.Token;
+import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.dag.records.TezTaskAttemptID;
+
+public interface KilledTaskHandler {
+
+  // TODO Ideally, this should only need to send in the TaskAttemptId. 
Everything else should be
+  // inferred from this.
+  // Passing in parameters until there's some dag information stored and 
tracked in the daemon.
+  void taskKilled(String amLocation, int port, String user,
+                  Token<JobTokenIdentifier> jobToken, TezTaskAttemptID 
taskAttemptId);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index 3dc8fb8..39b3634 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -46,11 +46,12 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.service.AbstractService;
 import org.apache.tez.common.security.JobTokenIdentifier;
+import org.apache.tez.dag.records.TezTaskAttemptID;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
- * Sends status updates to various AMs.
+ * Responsible for communicating with various AMs.
  */
 public class AMReporter extends AbstractService {
 
@@ -80,6 +81,8 @@ public class AMReporter extends AbstractService {
   private final DelayQueue<AMNodeInfo> pendingHeartbeatQueeu = new 
DelayQueue();
   private final long heartbeatInterval;
   private final AtomicBoolean isShutdown = new AtomicBoolean(false);
+  // Tracks appMasters to which heartbeats are being sent. This should not be 
used for any other
+  // messages like taskKilled, etc.
   private final Map<LlapNodeId, AMNodeInfo> knownAppMasters = new HashMap<>();
   volatile ListenableFuture<Void> queueLookupFuture;
 
@@ -167,6 +170,28 @@ public class AMReporter extends AbstractService {
     }
   }
 
+  public void taskKilled(String amLocation, int port, String user, 
Token<JobTokenIdentifier> jobToken,
+                         final TezTaskAttemptID taskAttemptId) {
+    // Not re-using the connection for the AM heartbeat - which may or may not 
be open by this point.
+    // knownAppMasters is used for sending heartbeats for queued tasks. Killed 
messages use a new connection.
+    LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
+    AMNodeInfo amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, conf);
+    ListenableFuture<Void> future =
+        executor.submit(new KillTaskCallable(taskAttemptId, amNodeInfo));
+    Futures.addCallback(future, new FutureCallback<Void>() {
+      @Override
+      public void onSuccess(Void result) {
+        LOG.info("Sent taskKilled for {}", taskAttemptId);
+      }
+
+      @Override
+      public void onFailure(Throwable t) {
+        LOG.warn("Failed to send taskKilled for {}. The attempt will likely 
time out.",
+            taskAttemptId);
+      }
+    });
+  }
+
   private class QueueLookupCallable extends CallableWithNdc<Void> {
 
     @Override
@@ -199,6 +224,31 @@ public class AMReporter extends AbstractService {
     }
   }
 
+  private class KillTaskCallable extends CallableWithNdc<Void> {
+    final AMNodeInfo amNodeInfo;
+    final TezTaskAttemptID taskAttemptId;
+
+    public KillTaskCallable(TezTaskAttemptID taskAttemptId,
+                            AMNodeInfo amNodeInfo) {
+      this.taskAttemptId = taskAttemptId;
+      this.amNodeInfo = amNodeInfo;
+    }
+
+    @Override
+    protected Void callInternal() {
+      try {
+        amNodeInfo.getUmbilical().taskKilled(taskAttemptId);
+      } catch (IOException e) {
+        LOG.warn("Failed to send taskKilled message for task {}. Will re-run 
after it times out", taskAttemptId);
+      } catch (InterruptedException e) {
+        if (!isShutdown.get()) {
+          LOG.info("Interrupted while trying to send taskKilled message for 
task {}", taskAttemptId);
+        }
+      }
+      return null;
+    }
+  }
+
   private class AMHeartbeatCallable extends CallableWithNdc<Void> {
 
     final AMNodeInfo amNodeInfo;
@@ -224,7 +274,7 @@ public class AMReporter extends AbstractService {
           LOG.warn("Failed to communicate with AM. May retry later: " + 
amNodeInfo.amNodeId, e);
         } catch (InterruptedException e) {
           if (!isShutdown.get()) {
-            LOG.warn("Failed to communicate with AM: " + amNodeInfo.amNodeId, 
e);
+            LOG.warn("Interrupted while trying to send heartbeat to AM: " + 
amNodeInfo.amNodeId, e);
           }
         }
       } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index c9e5829..e544789 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -28,6 +28,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
+import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
@@ -72,6 +73,7 @@ public class ContainerRunnerImpl extends AbstractService 
implements ContainerRun
   private final LlapDaemonExecutorMetrics metrics;
   private final Configuration conf;
   private final TaskRunnerCallable.ConfParams confParams;
+  private final KilledTaskHandler killedTaskHandler = new 
KilledTaskHandlerImpl();
 
   // Map of dagId to vertices and associated state.
   private final ConcurrentMap<String, ConcurrentMap<String, SourceStateProto>> 
sourceCompletionMap = new ConcurrentHashMap<>();
@@ -110,7 +112,7 @@ public class ContainerRunnerImpl extends AbstractService 
implements ContainerRun
     );
 
     LOG.info("ContainerRunnerImpl config: " +
-        "memoryPerExecutorDerviced=" + memoryPerExecutor
+            "memoryPerExecutorDerviced=" + memoryPerExecutor
     );
   }
 
@@ -193,7 +195,7 @@ public class ContainerRunnerImpl extends AbstractService 
implements ContainerRun
       ConcurrentMap<String, SourceStateProto> sourceCompletionMap = 
getSourceCompletionMap(request.getFragmentSpec().getDagName());
       TaskRunnerCallable callable = new TaskRunnerCallable(request, new 
Configuration(getConfig()),
           new ExecutionContextImpl(localAddress.get().getHostName()), env, 
localDirs,
-          credentials, memoryPerExecutor, amReporter, sourceCompletionMap, 
confParams, metrics);
+          credentials, memoryPerExecutor, amReporter, sourceCompletionMap, 
confParams, metrics, killedTaskHandler);
       executorService.schedule(callable);
       metrics.incrExecutorTotalRequestsHandled();
       metrics.incrExecutorNumQueuedRequests();
@@ -219,11 +221,6 @@ public class ContainerRunnerImpl extends AbstractService 
implements ContainerRun
     // TODO Implement when this gets used.
   }
 
-
-  private void notifyAMOfRejection(TaskRunnerCallable callable) {
-    LOG.error("Notifying AM of request rejection is not implemented yet!");
-  }
-
   private String 
stringifySourceStateUpdateRequest(SourceStateUpdatedRequestProto request) {
     StringBuilder sb = new StringBuilder();
     sb.append("dagName=").append(request.getDagName())
@@ -294,4 +291,13 @@ public class ContainerRunnerImpl extends AbstractService 
implements ContainerRun
     }
     return dagMap;
   }
+
+  private class KilledTaskHandlerImpl implements KilledTaskHandler {
+
+    @Override
+    public void taskKilled(String amLocation, int port, String user,
+                           Token<JobTokenIdentifier> jobToken, 
TezTaskAttemptID taskAttemptId) {
+      amReporter.taskKilled(amLocation, port, user, jobToken, taskAttemptId);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index 16d745b..5c8116e 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -61,6 +61,8 @@ public class QueryTracker {
       queryInfo = new QueryInfo(queryId, appIdString, dagName, dagIdentifier, 
user);
       queryInfoMap.putIfAbsent(dagName, queryInfo);
     }
+    // TODO Start tracking individual fragments, so that taskKilled etc 
messages
+    // can be routed through this layer to simplify the interfaces.
   }
 
   String[] getLocalDirs(String queryId, String dagName, String user) throws 
IOException {

http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 11ba793..42a3528 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -30,7 +30,6 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import org.apache.log4j.Logger;
 import org.apache.tez.runtime.task.TaskRunner2Result;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -40,6 +39,8 @@ import com.google.common.util.concurrent.ListenableFuture;
 import com.google.common.util.concurrent.ListeningExecutorService;
 import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Task executor service provides method for scheduling tasks. Tasks submitted 
to executor service
@@ -61,7 +62,7 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
  * new tasks. Shutting down of the task executor service can be done 
gracefully or immediately.
  */
 public class TaskExecutorService implements Scheduler<TaskRunnerCallable> {
-  private static final Logger LOG = 
Logger.getLogger(TaskExecutorService.class);
+  private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutorService.class);
   private static final boolean isInfoEnabled = LOG.isInfoEnabled();
   private static final boolean isDebugEnabled = LOG.isDebugEnabled();
   private static final boolean isTraceEnabled = LOG.isTraceEnabled();
@@ -183,7 +184,7 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
     }
   }
 
-  private boolean trySchedule(TaskRunnerCallable task) {
+  private boolean trySchedule(final TaskRunnerCallable task) {
 
     boolean scheduled = false;
     try {
@@ -236,7 +237,9 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
               LOG.debug("Pre-emption invoked for " + pRequest.getRequestId()
                   + " by interrupting the thread.");
             }
-            pFuture.cancel(true);
+            pRequest.killTask();
+            // TODO. Ideally, should wait for the thread to complete and fall 
off before assuming the
+            // slot is available for the next task.
             removeTaskFromPreemptionList(pRequest, pRequest.getRequestId());
 
             // future is cancelled or completed normally, in which case 
schedule the new request
@@ -244,8 +247,6 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
               if (isDebugEnabled) {
                 LOG.debug(pRequest.getRequestId() + " request preempted by " + 
task.getRequestId());
               }
-
-              notifyAM(pRequest);
             }
           }
 
@@ -333,11 +334,6 @@ public class TaskExecutorService implements 
Scheduler<TaskRunnerCallable> {
 
   }
 
-  private void notifyAM(TaskRunnerCallable request) {
-    // TODO: Report to AM of pre-emption and rejection
-    LOG.info("Notifying to AM of preemption is not implemented yet!");
-  }
-
   // TODO: llap daemon should call this to gracefully shutdown the task 
executor service
   public void shutDown(boolean awaitTermination) {
     if (awaitTermination) {

http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index 7e7c133..b16a5c4 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.CallableWithNdc;
 import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
+import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.IOSpecProto;
@@ -56,6 +57,7 @@ import org.apache.tez.runtime.api.impl.TaskSpec;
 import org.apache.tez.runtime.common.objectregistry.ObjectRegistryImpl;
 import org.apache.tez.runtime.internals.api.TaskReporterInterface;
 import org.apache.tez.runtime.library.input.UnorderedKVInput;
+import org.apache.tez.runtime.task.EndReason;
 import org.apache.tez.runtime.task.TaskRunner2Result;
 
 import com.google.common.base.Stopwatch;
@@ -88,6 +90,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
   private final AMReporter amReporter;
   private final ConcurrentMap<String, 
LlapDaemonProtocolProtos.SourceStateProto> sourceCompletionMap;
   private final TaskSpec taskSpec;
+  private final KilledTaskHandler killedTaskHandler;
   private volatile TezTaskRunner2 taskRunner;
   private volatile TaskReporterInterface taskReporter;
   private volatile ListeningExecutorService executor;
@@ -96,13 +99,17 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
   private volatile String threadName;
   private LlapDaemonExecutorMetrics metrics;
   protected String requestId;
+  private boolean shouldRunTask = true;
+  final Stopwatch runtimeWatch = new Stopwatch();
+  final Stopwatch killtimerWatch = new Stopwatch();
 
   TaskRunnerCallable(LlapDaemonProtocolProtos.SubmitWorkRequestProto request, 
Configuration conf,
       ExecutionContext executionContext, Map<String, String> envMap,
       String[] localDirs, Credentials credentials,
       long memoryAvailable, AMReporter amReporter,
       ConcurrentMap<String, LlapDaemonProtocolProtos.SourceStateProto> 
sourceCompletionMap,
-      ConfParams confParams, LlapDaemonExecutorMetrics metrics) {
+      ConfParams confParams, LlapDaemonExecutorMetrics metrics,
+      KilledTaskHandler killedTaskHandler) {
     this.request = request;
     this.conf = conf;
     this.executionContext = executionContext;
@@ -123,6 +130,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
     }
     this.metrics = metrics;
     this.requestId = getTaskAttemptId(request);
+    this.killedTaskHandler = killedTaskHandler;
   }
 
   @Override
@@ -146,7 +154,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
     executor = MoreExecutors.listeningDecorator(executorReal);
 
     // TODO Consolidate this code with TezChild.
-    Stopwatch sw = new Stopwatch().start();
+    runtimeWatch.start();
     UserGroupInformation taskUgi = 
UserGroupInformation.createRemoteUser(request.getUser());
     taskUgi.addCredentials(credentials);
 
@@ -177,12 +185,21 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
         new AtomicLong(0),
         request.getContainerIdString());
 
-    taskRunner = new TezTaskRunner2(conf, taskUgi, localDirs,
-        taskSpec,
-        request.getAppAttemptNumber(),
-        serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, 
executor, objectRegistry,
-        pid,
-        executionContext, memoryAvailable);
+    synchronized (this) {
+      if (shouldRunTask) {
+        taskRunner = new TezTaskRunner2(conf, taskUgi, localDirs,
+            taskSpec,
+            request.getAppAttemptNumber(),
+            serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, 
executor,
+            objectRegistry,
+            pid,
+            executionContext, memoryAvailable);
+      }
+    }
+    if (taskRunner == null) {
+      LOG.info("Not starting task {} since it was killed earlier", 
taskSpec.getTaskAttemptID());
+      return new TaskRunner2Result(EndReason.KILL_REQUESTED, null, false);
+    }
 
     try {
       TaskRunner2Result result = taskRunner.run();
@@ -195,7 +212,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
       // TODO Fix UGI and FS Handling. Closing UGI here causes some errors 
right now.
       //        FileSystem.closeAllForUGI(taskUgi);
       LOG.info("ExecutionTime for Container: " + 
request.getContainerIdString() + "=" +
-          sw.stop().elapsed(TimeUnit.MILLISECONDS));
+          runtimeWatch.stop().elapsed(TimeUnit.MILLISECONDS));
       if (LOG.isDebugEnabled()) {
         LOG.debug("canFinish post completion: " + taskSpec.getTaskAttemptID() 
+ ": " + canFinish());
       }
@@ -203,6 +220,35 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
   }
 
   /**
+   * Attempt to kill a running task. If the task has not started running, it 
will not start.
+   * If it's already running, a kill request will be sent to it.
+   *
+   * The AM will be informed about the task kill.
+   */
+  public void killTask() {
+    synchronized (this) {
+      LOG.info("Killing task with id {}, taskRunnerSetup={}", 
taskSpec.getTaskAttemptID(), (taskRunner != null));
+      if (taskRunner != null) {
+        killtimerWatch.start();
+        LOG.info("Issuing kill to task {}" + taskSpec.getTaskAttemptID());
+        taskRunner.killTask();
+        shouldRunTask = false;
+      }
+    }
+    // Sending a kill message to the AM right here. Don't need to wait for the 
task to complete.
+    reportTaskKilled();
+  }
+
+  /**
+   * Inform the AM that this task has been killed.
+   */
+  public void reportTaskKilled() {
+    killedTaskHandler
+        .taskKilled(request.getAmHost(), request.getAmPort(), 
request.getUser(), jobToken,
+            taskSpec.getTaskAttemptID());
+  }
+
+  /**
    * Check whether a task can run to completion or may end up blocking on it's 
sources.
    * This currently happens via looking up source state.
    * TODO: Eventually, this should lookup the Hive Processor to figure out 
whether
@@ -314,7 +360,8 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
       return requestId;
     }
 
-    // TODO Slightly more useful error handling
+    // Errors are handled on the way over. FAIL/SUCCESS is informed via 
regular heartbeats. Killed
+    // via a kill message when a task kill is requested by the daemon.
     @Override
     public void onSuccess(TaskRunner2Result result) {
       switch(result.getEndReason()) {
@@ -327,7 +374,14 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
           LOG.warn("Unexpected CONTAINER_STOP_REQUEST for {}", requestId);
           break;
         case KILL_REQUESTED:
-          // TODO Send a kill out to the AM.
+          LOG.info("Killed task {}", requestId);
+          if (killtimerWatch.isRunning()) {
+            killtimerWatch.stop();
+            long elapsed = killtimerWatch.elapsed(TimeUnit.MILLISECONDS);
+            LOG.info("Time to die for task {}", elapsed);
+          }
+          
metrics.incrPreemptionTimeLost(runtimeWatch.elapsed(TimeUnit.MILLISECONDS));
+          metrics.incrExecutorTotalKilled();
           break;
         case COMMUNICATION_FAILURE:
           LOG.info("Failed to run {} due to communication failure", requestId);

http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
index d7bed53..e4739dc 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorInfo.java
@@ -33,7 +33,8 @@ public enum LlapDaemonExecutorInfo implements MetricsInfo {
   ExecutorTotalSuccess("Total number of requests handled by the container that 
succeeded"),
   ExecutorTotalExecutionFailure("Total number of requests handled by the 
container that failed execution"),
   ExecutorTotalInterrupted("Total number of requests handled by the container 
that got interrupted"),
-  ExecutorTotalAskedToDie("Total number of requests handled by the container 
that were asked to die");
+  ExecutorTotalAskedToDie("Total number of requests handled by the container 
that were asked to die"),
+  PreemptionTimeLost("Total time lost due to task preemptions");
 
   private final String desc;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java
index 22c9fe0..33b8f9d 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/metrics/LlapDaemonExecutorMetrics.java
@@ -26,6 +26,7 @@ import static 
org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.Executo
 import static 
org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalRequestsHandled;
 import static 
org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorTotalSuccess;
 import static 
org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.ExecutorMetrics;
+import static 
org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorInfo.PreemptionTimeLost;
 import static org.apache.hadoop.metrics2.impl.MsInfo.ProcessName;
 import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
 
@@ -72,11 +73,12 @@ public class LlapDaemonExecutorMetrics implements 
MetricsSource {
   @Metric
   MutableCounterLong executorTotalSuccess;
   @Metric
-  MutableCounterLong executorTotalInterrupted;
+  MutableCounterLong executorTotalIKilled;
   @Metric
   MutableCounterLong executorTotalExecutionFailed;
   @Metric
-  MutableCounterLong executorTotalAskedToDie;
+  MutableCounterLong preemptionTimeLost;
+
 
   private LlapDaemonExecutorMetrics(String displayName, JvmMetrics jm, String 
sessionId,
       int numExecutors) {
@@ -141,14 +143,15 @@ public class LlapDaemonExecutorMetrics implements 
MetricsSource {
     executorTotalExecutionFailed.incr();
   }
 
-  public void incrExecutorTotalInterrupted() {
-    executorTotalInterrupted.incr();
+  public void incrPreemptionTimeLost(long value) {
+    preemptionTimeLost.incr(value);
   }
 
-  public void incrExecutorTotalAskedToDie() {
-    executorTotalAskedToDie.incr();
+  public void incrExecutorTotalKilled() {
+    executorTotalIKilled.incr();
   }
 
+
   private void getExecutorStats(MetricsRecordBuilder rb) {
     updateThreadMetrics(rb);
 
@@ -156,8 +159,8 @@ public class LlapDaemonExecutorMetrics implements 
MetricsSource {
         .addCounter(ExecutorNumQueuedRequests, 
executorNumQueuedRequests.value())
         .addCounter(ExecutorTotalSuccess, executorTotalSuccess.value())
         .addCounter(ExecutorTotalExecutionFailure, 
executorTotalExecutionFailed.value())
-        .addCounter(ExecutorTotalInterrupted, executorTotalInterrupted.value())
-        .addCounter(ExecutorTotalAskedToDie, executorTotalAskedToDie.value());
+        .addCounter(ExecutorTotalInterrupted, executorTotalIKilled.value())
+        .addCounter(PreemptionTimeLost, preemptionTimeLost.value());
   }
 
   private void updateThreadMetrics(MetricsRecordBuilder rb) {

http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
index 886194a..2f5e11d 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
@@ -34,4 +34,6 @@ public interface LlapTaskUmbilicalProtocol extends 
VersionedProtocol {
 
   public void nodeHeartbeat(Text hostname, int port);
 
+  public void taskKilled(TezTaskAttemptID taskAttemptId);
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 99459e4..d614548 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -278,9 +278,9 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
   public void unregisterRunningTaskAttempt(TezTaskAttemptID taskAttemptID) {
     super.unregisterRunningTaskAttempt(taskAttemptID);
     entityTracker.unregisterTaskAttempt(taskAttemptID);
-    // TODO Inform the daemon that this task is no longer running.
-    // Currently, a task will end up moving into the RUNNING queue and will
-    // be told that it needs to die since it isn't recognized.
+    // This will also be invoked for tasks which have been KILLED / rejected 
by the daemon.
+    // Informing the daemon becomes necessary once the LlapScheduler supports 
preemption
+    // and/or starts attempting to kill tasks which may be running on a node.
   }
 
   @Override
@@ -407,6 +407,14 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     }
 
     @Override
+    public void taskKilled(TezTaskAttemptID taskAttemptId) {
+      // TODO Unregister the task for state updates, which could in turn 
unregister the node.
+      getTaskCommunicatorContext().taskKilled(taskAttemptId,
+          TaskAttemptEndReason.INTERRUPTED_BY_SYSTEM, "Attempt preempted");
+      entityTracker.unregisterTaskAttempt(taskAttemptId);
+    }
+
+    @Override
     public long getProtocolVersion(String protocol, long clientVersion) throws 
IOException {
       return versionID;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/e7994d85/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
index 95750c4..3b89b48 100644
--- 
a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
+++ 
b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TestTaskExecutorService.java
@@ -18,12 +18,14 @@
 package org.apache.hadoop.hive.llap.daemon.impl;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.EntityDescriptorProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
@@ -54,7 +56,7 @@ public class TestTaskExecutorService {
     public MockRequest(LlapDaemonProtocolProtos.SubmitWorkRequestProto 
requestProto,
         boolean canFinish, int workTime) {
       super(requestProto, conf, new ExecutionContextImpl("localhost"), null, 
null, cred, 0, null,
-          null, null, null);
+          null, null, null, mock(KilledTaskHandler.class));
       this.workTime = workTime;
       this.canFinish = canFinish;
     }

Reply via email to