Repository: hive
Updated Branches:
  refs/heads/llap b8cb9a1b9 -> c5dc87a8e


HIVE-10762. LLAP: Kill any fragments running in a daemon when a query 
completes. (Siddharth Seth)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c5dc87a8
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c5dc87a8
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c5dc87a8

Branch: refs/heads/llap
Commit: c5dc87a8e8efb025925c236898f425223f23712a
Parents: b8cb9a1
Author: Siddharth Seth <ss...@apache.org>
Authored: Tue Jun 16 23:48:13 2015 -0700
Committer: Siddharth Seth <ss...@apache.org>
Committed: Tue Jun 16 23:48:13 2015 -0700

----------------------------------------------------------------------
 .../llap/configuration/LlapConfiguration.java   |  24 +++-
 .../hive/llap/daemon/KilledTaskHandler.java     |   3 +-
 .../hive/llap/daemon/QueryFailedHandler.java    |  20 +++
 .../hive/llap/daemon/impl/AMReporter.java       | 122 ++++++++++++++++---
 .../llap/daemon/impl/ContainerRunnerImpl.java   |  34 +++++-
 .../hive/llap/daemon/impl/LlapDaemon.java       |  11 +-
 .../llap/daemon/impl/QueryFragmentInfo.java     |   4 +
 .../hadoop/hive/llap/daemon/impl/QueryInfo.java |   5 +
 .../hive/llap/daemon/impl/QueryTracker.java     |  17 ++-
 .../llap/daemon/impl/TaskExecutorService.java   |  10 +-
 .../llap/daemon/impl/TaskRunnerCallable.java    |   6 +-
 .../protocol/LlapTaskUmbilicalProtocol.java     |   4 +-
 .../llap/tezplugins/LlapTaskCommunicator.java   |   5 +-
 13 files changed, 229 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
index dd24661..f5aa2a6 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java
@@ -51,8 +51,26 @@ public class LlapConfiguration extends Configuration {
   public static final boolean LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED_DEFAULT 
= false;
 
   // This needs to be kept below the task timeout interval, but otherwise as 
high as possible to avoid unnecessary traffic.
-  public static final String LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS = 
LLAP_DAEMON_PREFIX + "liveness.heartbeat.interval-ms";
-  public static final long LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT 
= 10000l;
+  public static final String LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS = 
LLAP_DAEMON_PREFIX + "am.liveness.heartbeat.interval-ms";
+  public static final long 
LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT = 10000l;
+
+  /**
+   * Amount of time to wait on connection failures to the AM from an LLAP 
daemon before considering
+   * the AM to be dead
+   */
+  public static final String LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS 
=
+      LLAP_PREFIX + "am.liveness.connection.timeout-millis";
+  public static final long 
LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS_DEFAULT = 10000l;
+
+  // Not used yet - since the Writable RPC engine does not support this policy.
+  /**
+   * Sleep duration while waiting to retry connection failures to the AM from 
the daemon for the
+   * general keep-alive thread
+   */
+  public static final String 
LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS =
+      LLAP_PREFIX + "am.liveness.connection.sleep-between-retries-millis";
+  public static final long 
LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT =
+      2000l;
 
 
   // Section for configs used in AM and executors
@@ -137,6 +155,8 @@ public class LlapConfiguration extends Configuration {
       LLAP_PREFIX + 
"task.communicator.connection.sleep-between-retries-millis";
   public static final long 
LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT = 2000l;
 
+
+
   public static final String LLAP_DAEMON_SERVICE_PORT = LLAP_DAEMON_PREFIX + 
"service.port";
   public static final int LLAP_DAEMON_SERVICE_PORT_DEFAULT = 15002;
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
index 8b481c8..7cb433b 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java
@@ -24,5 +24,6 @@ public interface KilledTaskHandler {
   // inferred from this.
   // Passing in parameters until there's some dag information stored and 
tracked in the daemon.
   void taskKilled(String amLocation, int port, String user,
-                  Token<JobTokenIdentifier> jobToken, TezTaskAttemptID 
taskAttemptId);
+                  Token<JobTokenIdentifier> jobToken, String queryId, String 
dagName,
+                  TezTaskAttemptID taskAttemptId);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java
new file mode 100644
index 0000000..4e62a68
--- /dev/null
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java
@@ -0,0 +1,20 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.llap.daemon;
+
+public interface QueryFailedHandler {
+
+  public void queryFailed(String queryId, String dagName);
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
index 1ba18fc..8ec9f22 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java
@@ -14,6 +14,7 @@
 
 package org.apache.hadoop.hive.llap.daemon.impl;
 
+import javax.net.SocketFactory;
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedExceptionAction;
@@ -39,8 +40,11 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.common.CallableWithNdc;
 import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
 import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
@@ -79,9 +83,13 @@ public class AMReporter extends AbstractService {
   private static final Logger LOG = LoggerFactory.getLogger(AMReporter.class);
 
   private volatile LlapNodeId nodeId;
+  private final QueryFailedHandler queryFailedHandler;
   private final Configuration conf;
   private final ListeningExecutorService queueLookupExecutor;
   private final ListeningExecutorService executor;
+  private final RetryPolicy retryPolicy;
+  private final long retryTimeout;
+  private final SocketFactory socketFactory;
   private final DelayQueue<AMNodeInfo> pendingHeartbeatQueeu = new 
DelayQueue();
   private final AtomicReference<InetSocketAddress> localAddress;
   private final long heartbeatInterval;
@@ -91,9 +99,11 @@ public class AMReporter extends AbstractService {
   private final Map<LlapNodeId, AMNodeInfo> knownAppMasters = new HashMap<>();
   volatile ListenableFuture<Void> queueLookupFuture;
 
-  public AMReporter(AtomicReference<InetSocketAddress> localAddress, 
Configuration conf) {
+  public AMReporter(AtomicReference<InetSocketAddress> localAddress,
+                    QueryFailedHandler queryFailedHandler, Configuration conf) 
{
     super(AMReporter.class.getName());
     this.localAddress = localAddress;
+    this.queryFailedHandler = queryFailedHandler;
     this.conf = conf;
     ExecutorService rawExecutor = Executors.newCachedThreadPool(
         new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporter 
%d").build());
@@ -102,9 +112,25 @@ public class AMReporter extends AbstractService {
         new 
ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporterQueueDrainer").build());
     this.queueLookupExecutor = MoreExecutors.listeningDecorator(rawExecutor2);
     this.heartbeatInterval =
-        
conf.getLong(LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS,
-            
LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT);
-
+        
conf.getLong(LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS,
+            
LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT);
+
+    this.retryTimeout =
+        
conf.getLong(LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS,
+            
LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS_DEFAULT);
+    long retrySleep = conf.getLong(
+        
LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS,
+        
LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT);
+    this.retryPolicy = RetryPolicies
+        .retryUpToMaximumTimeWithFixedSleep(retryTimeout, retrySleep,
+            TimeUnit.MILLISECONDS);
+
+    this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
+
+    LOG.info("Setting up AMReporter with " +
+        "heartbeatInterval(ms)=" + heartbeatInterval +
+        ", retryTime(ms)=" + retryTimeout +
+        ", retrySleep(ms)=" + retrySleep);
   }
 
   @Override
@@ -143,23 +169,26 @@ public class AMReporter extends AbstractService {
     }
   }
 
-
-  public void registerTask(String amLocation, int port, String user, 
Token<JobTokenIdentifier> jobToken) {
+  public void registerTask(String amLocation, int port, String user,
+                           Token<JobTokenIdentifier> jobToken, String queryId, 
String dagName) {
     if (LOG.isTraceEnabled()) {
-      LOG.trace("Registering for heartbeat: " + amLocation + ":" + port);
+      LOG.trace("Registering for heartbeat: " + amLocation + ":" + port + " 
for dagName=" + dagName);
     }
     AMNodeInfo amNodeInfo;
     synchronized (knownAppMasters) {
       LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
       amNodeInfo = knownAppMasters.get(amNodeId);
       if (amNodeInfo == null) {
-        amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, conf);
+        amNodeInfo =
+            new AMNodeInfo(amNodeId, user, jobToken, dagName, retryPolicy, 
retryTimeout, socketFactory,
+                conf);
         knownAppMasters.put(amNodeId, amNodeInfo);
         // Add to the queue only the first time this is registered, and on
         // subsequent instances when it's taken off the queue.
         amNodeInfo.setNextHeartbeatTime(System.currentTimeMillis() + 
heartbeatInterval);
         pendingHeartbeatQueeu.add(amNodeInfo);
       }
+      amNodeInfo.setCurrentDagName(dagName);
       amNodeInfo.incrementAndGetTaskCount();
     }
   }
@@ -182,11 +211,13 @@ public class AMReporter extends AbstractService {
   }
 
   public void taskKilled(String amLocation, int port, String user, 
Token<JobTokenIdentifier> jobToken,
-                         final TezTaskAttemptID taskAttemptId) {
+                         final String queryId, final String dagName, final 
TezTaskAttemptID taskAttemptId) {
     // Not re-using the connection for the AM heartbeat - which may or may not 
be open by this point.
     // knownAppMasters is used for sending heartbeats for queued tasks. Killed 
messages use a new connection.
     LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port);
-    AMNodeInfo amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, conf);
+    AMNodeInfo amNodeInfo =
+        new AMNodeInfo(amNodeId, user, jobToken, dagName, retryPolicy, 
retryTimeout, socketFactory,
+            conf);
 
     // Even if the service hasn't started up. It's OK to make this invocation 
since this will
     // only happen after the AtomicReference address has been populated. Not 
adding an additional check.
@@ -212,9 +243,15 @@ public class AMReporter extends AbstractService {
     protected Void callInternal() {
       while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) {
         try {
-          AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take();
-          if (amNodeInfo.getTaskCount() == 0) {
+          final AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take();
+          if (amNodeInfo.getTaskCount() == 0 || amNodeInfo.hasAmFailed()) {
             synchronized (knownAppMasters) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug(
+                    "Removing am {} with last associated dag{} from heartbeat 
with taskCount={}, amFailed={}",
+                    amNodeInfo.amNodeId, amNodeInfo.getCurrentDagName(), 
amNodeInfo.getTaskCount(),
+                    amNodeInfo.hasAmFailed(), amNodeInfo);
+              }
               knownAppMasters.remove(amNodeInfo.amNodeId);
             }
             amNodeInfo.stopUmbilical();
@@ -223,7 +260,22 @@ public class AMReporter extends AbstractService {
             long next = System.currentTimeMillis() + heartbeatInterval;
             amNodeInfo.setNextHeartbeatTime(next);
             pendingHeartbeatQueeu.add(amNodeInfo);
-            executor.submit(new AMHeartbeatCallable(amNodeInfo));
+            ListenableFuture<Void> future = executor.submit(new 
AMHeartbeatCallable(amNodeInfo));
+            Futures.addCallback(future, new FutureCallback<Void>() {
+              @Override
+              public void onSuccess(Void result) {
+                // Nothing to do.
+              }
+
+              @Override
+              public void onFailure(Throwable t) {
+                String currentDagName = amNodeInfo.getCurrentDagName();
+                amNodeInfo.setAmFailed(true);
+                LOG.warn("Heartbeat failed to AM {}. Killing all other tasks 
for the query={}",
+                    amNodeInfo.amNodeId, currentDagName, t);
+                queryFailedHandler.queryFailed(null, currentDagName);
+              }
+            });
           }
         } catch (InterruptedException e) {
           if (isShutdown.get()) {
@@ -284,11 +336,14 @@ public class AMReporter extends AbstractService {
           amNodeInfo.getUmbilical().nodeHeartbeat(new 
Text(nodeId.getHostname()),
               nodeId.getPort());
         } catch (IOException e) {
-          // TODO Ideally, this could be used to avoid running a task - AM 
down / unreachable, so there's no point running it.
-          LOG.warn("Failed to communicate with AM. May retry later: " + 
amNodeInfo.amNodeId, e);
+          String currentDagName = amNodeInfo.getCurrentDagName();
+          amNodeInfo.setAmFailed(true);
+          LOG.warn("Failed to communicated with AM at {}. Killing remaining 
fragments for query {}",
+              amNodeInfo.amNodeId, currentDagName, e);
+          queryFailedHandler.queryFailed(null, currentDagName);
         } catch (InterruptedException e) {
           if (!isShutdown.get()) {
-            LOG.warn("Interrupted while trying to send heartbeat to AM: " + 
amNodeInfo.amNodeId, e);
+            LOG.warn("Interrupted while trying to send heartbeat to AM {}", 
amNodeInfo.amNodeId, e);
           }
         }
       } else {
@@ -308,15 +363,28 @@ public class AMReporter extends AbstractService {
     private final Token<JobTokenIdentifier> jobToken;
     private final Configuration conf;
     private final LlapNodeId amNodeId;
+    private final RetryPolicy retryPolicy;
+    private final long timeout;
+    private final SocketFactory socketFactory;
+    private final AtomicBoolean amFailed = new AtomicBoolean(false);
+    private String currentDagName;
     private LlapTaskUmbilicalProtocol umbilical;
     private long nextHeartbeatTime;
 
 
     public AMNodeInfo(LlapNodeId amNodeId, String user,
                       Token<JobTokenIdentifier> jobToken,
+                      String currentDagName,
+                      RetryPolicy retryPolicy,
+                      long timeout,
+                      SocketFactory socketFactory,
                       Configuration conf) {
       this.user = user;
       this.jobToken = jobToken;
+      this.currentDagName = currentDagName;
+      this.retryPolicy = retryPolicy;
+      this.timeout = timeout;
+      this.socketFactory = socketFactory;
       this.conf = conf;
       this.amNodeId = amNodeId;
     }
@@ -331,8 +399,10 @@ public class AMReporter extends AbstractService {
         umbilical = ugi.doAs(new 
PrivilegedExceptionAction<LlapTaskUmbilicalProtocol>() {
           @Override
           public LlapTaskUmbilicalProtocol run() throws Exception {
-            return RPC.getProxy(LlapTaskUmbilicalProtocol.class,
-                LlapTaskUmbilicalProtocol.versionID, address, conf);
+            return RPC
+                .getProxy(LlapTaskUmbilicalProtocol.class, 
LlapTaskUmbilicalProtocol.versionID,
+                    address, UserGroupInformation.getCurrentUser(), conf, 
socketFactory,
+                    (int) timeout);
           }
         });
       }
@@ -354,10 +424,26 @@ public class AMReporter extends AbstractService {
       return taskCount.decrementAndGet();
     }
 
+    void setAmFailed(boolean val) {
+      amFailed.set(val);
+    }
+
+    boolean hasAmFailed() {
+      return amFailed.get();
+    }
+
     int getTaskCount() {
       return taskCount.get();
     }
 
+    public synchronized String getCurrentDagName() {
+      return currentDagName;
+    }
+
+    public synchronized void setCurrentDagName(String currentDagName) {
+      this.currentDagName = currentDagName;
+    }
+
     synchronized void setNextHeartbeatTime(long nextTime) {
       nextHeartbeatTime = nextTime;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
index 10e192e..e26852a 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java
@@ -19,6 +19,7 @@ import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.RejectedExecutionException;
@@ -29,6 +30,7 @@ import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
 import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler;
 import org.apache.hadoop.hive.llap.daemon.HistoryLogger;
 import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler;
+import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto;
@@ -58,7 +60,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 // TODO Convert this to a CompositeService
-public class ContainerRunnerImpl extends CompositeService implements 
ContainerRunner, FragmentCompletionHandler {
+public class ContainerRunnerImpl extends CompositeService implements 
ContainerRunner, FragmentCompletionHandler, QueryFailedHandler {
 
   // TODO Setup a set of threads to process incoming requests.
   // Make sure requests for a single dag/query are handled by the same thread
@@ -212,7 +214,16 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
 
   @Override
   public void queryComplete(QueryCompleteRequestProto request) {
-    queryTracker.queryComplete(null, request.getDagName(), 
request.getDeleteDelay());
+    LOG.info("Processing queryComplete notification for {}", 
request.getDagName());
+    List<QueryFragmentInfo> knownFragments =
+        queryTracker.queryComplete(null, request.getDagName(), 
request.getDeleteDelay());
+    LOG.info("DBG: Pending fragment count for completed query {} = {}", 
request.getDagName(),
+        knownFragments.size());
+    for (QueryFragmentInfo fragmentInfo : knownFragments) {
+      LOG.info("DBG: Issuing killFragment for completed query {} {}", 
request.getDagName(),
+          fragmentInfo.getFragmentIdentifierString());
+      executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
+    }
   }
 
   @Override
@@ -288,12 +299,27 @@ public class ContainerRunnerImpl extends CompositeService 
implements ContainerRu
     queryTracker.fragmentComplete(fragmentInfo);
   }
 
+  @Override
+  public void queryFailed(String queryId, String dagName) {
+    LOG.info("Processing query failed notification for {}", dagName);
+    List<QueryFragmentInfo> knownFragments =
+        queryTracker.queryComplete(null, dagName, -1);
+    LOG.info("DBG: Pending fragment count for failed query {} = {}", dagName,
+        knownFragments.size());
+    for (QueryFragmentInfo fragmentInfo : knownFragments) {
+      LOG.info("DBG: Issuing killFragment for failed query {} {}", dagName,
+          fragmentInfo.getFragmentIdentifierString());
+      executorService.killFragment(fragmentInfo.getFragmentIdentifierString());
+    }
+  }
+
   private class KilledTaskHandlerImpl implements KilledTaskHandler {
 
     @Override
     public void taskKilled(String amLocation, int port, String user,
-                           Token<JobTokenIdentifier> jobToken, 
TezTaskAttemptID taskAttemptId) {
-      amReporter.taskKilled(amLocation, port, user, jobToken, taskAttemptId);
+                           Token<JobTokenIdentifier> jobToken, String queryId, 
String dagName,
+                           TezTaskAttemptID taskAttemptId) {
+      amReporter.taskKilled(amLocation, port, user, jobToken, queryId, 
dagName, taskAttemptId);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
index 7959945..75d1995 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
 import org.apache.hadoop.hive.llap.daemon.ContainerRunner;
+import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
 import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto;
@@ -157,7 +158,7 @@ public class LlapDaemon extends CompositeService implements 
ContainerRunner, Lla
         " sessionId: " + sessionId);
 
 
-    this.amReporter = new AMReporter(address, daemonConf);
+    this.amReporter = new AMReporter(address, new QueryFailedHandlerProxy(), 
daemonConf);
 
 
     this.server = new LlapDaemonProtocolServerImpl(numHandlers, this, address, 
rpcPort);
@@ -418,4 +419,12 @@ public class LlapDaemon extends CompositeService 
implements ContainerRunner, Lla
     }
   }
 
+
+  private class QueryFailedHandlerProxy implements QueryFailedHandler {
+
+    @Override
+    public void queryFailed(String queryId, String dagName) {
+      containerRunner.queryFailed(queryId, dagName);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
index 554864e..aa065a9 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java
@@ -71,6 +71,10 @@ public class QueryFragmentInfo {
     return attemptNumber;
   }
 
+  public String getFragmentIdentifierString() {
+    return fragmentSpec.getFragmentIdentifierString();
+  }
+
   /**
    * Check whether a task can run to completion or may end up blocking on it's 
sources.
    * This currently happens via looking up source state.

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
index 6aed60f..27f2d4c 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java
@@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap;
 
 import com.google.common.base.Preconditions;
 import com.google.common.collect.HashMultimap;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Multimap;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -101,6 +102,10 @@ public class QueryInfo {
     knownFragments.remove(fragmentInfo);
   }
 
+  public List<QueryFragmentInfo> getRegisteredFragments() {
+    return Lists.newArrayList(knownFragments);
+  }
+
   private synchronized void createLocalDirs() throws IOException {
     if (localDirs == null) {
       localDirs = new String[localDirsBase.length];

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
index d796b24..19147e3 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java
@@ -16,6 +16,7 @@ package org.apache.hadoop.hive.llap.daemon.impl;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto;
 import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler;
@@ -25,6 +26,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.Collections;
+import java.util.List;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -47,6 +49,7 @@ public class QueryTracker extends CompositeService {
 
   private final String[] localDirsBase;
   private final FileSystem localFs;
+  private final long defaultDeleteDelaySeconds;
 
   // TODO At the moment there's no way of knowing whether a query is running 
or not.
   // A race is possible between dagComplete and registerFragment - where the 
registerFragment
@@ -75,6 +78,9 @@ public class QueryTracker extends CompositeService {
       throw new RuntimeException("Failed to setup local filesystem instance", 
e);
     }
 
+    this.defaultDeleteDelaySeconds = 
conf.getLong(LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS,
+        LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS_DEFAULT);
+
     queryFileCleaner = new QueryFileCleaner(conf, localFs);
     addService(queryFileCleaner);
   }
@@ -142,7 +148,10 @@ public class QueryTracker extends CompositeService {
    * @param dagName
    * @param deleteDelay
    */
-  void queryComplete(String queryId, String dagName, long deleteDelay) {
+  List<QueryFragmentInfo> queryComplete(String queryId, String dagName, long 
deleteDelay) {
+    if (deleteDelay == -1) {
+      deleteDelay = defaultDeleteDelaySeconds;
+    }
     ReadWriteLock dagLock = getDagLock(dagName);
     dagLock.writeLock().lock();
     try {
@@ -153,6 +162,7 @@ public class QueryTracker extends CompositeService {
       QueryInfo queryInfo = queryInfoMap.remove(dagName);
       if (queryInfo == null) {
         LOG.warn("Ignoring query complete for unknown dag: {}", dagName);
+        return Collections.emptyList();
       }
       String[] localDirs = queryInfo.getLocalDirsNoCreate();
       if (localDirs != null) {
@@ -161,8 +171,13 @@ public class QueryTracker extends CompositeService {
           ShuffleHandler.get().unregisterDag(localDir, dagName, 
queryInfo.getDagIdentifier());
         }
       }
+      // Clearing this before sending a kill is OK, since canFinish will 
change to false.
+      // Ideally this should be a state machine where kills are issued to the 
executor,
+      // and the structures are cleaned up once all tasks complete. New 
requests, however, should not
+      // be allowed after a query complete is received.
       sourceCompletionMap.remove(dagName);
       dagSpecificLocks.remove(dagName);
+      return queryInfo.getRegisteredFragments();
       // TODO HIVE-10762 Issue a kill message to all running fragments for 
this container.
       // TODO HIVE-10535 Cleanup map join cache
     } finally {

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
index 4c0fb8e..0fd89de 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java
@@ -263,11 +263,17 @@ public class TaskExecutorService extends AbstractService 
implements Scheduler<Ta
     }
   }
 
-  private static class WaitQueueWorkerCallback implements FutureCallback {
+  private class WaitQueueWorkerCallback implements FutureCallback {
 
     @Override
     public void onSuccess(Object result) {
-      LOG.error("Wait queue scheduler worker exited with success!");
+      if (isShutdown.get()) {
+        LOG.info("Wait queue scheduler worker exited with success!");
+      } else {
+        LOG.error("Wait queue scheduler worker exited with success!");
+        
Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(),
+            new IllegalStateException("WaitQueue worked exited before 
shutdown"));
+      }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
index cd6a0da..9b14fa3 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java
@@ -22,7 +22,6 @@ import java.nio.ByteBuffer;
 import java.security.PrivilegedExceptionAction;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -126,7 +125,7 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
     // Register with the AMReporter when the callable is setup. Unregister 
once it starts running.
     if (jobToken != null) {
     this.amReporter.registerTask(request.getAmHost(), request.getAmPort(),
-        request.getUser(), jobToken);
+        request.getUser(), jobToken, null, 
request.getFragmentSpec().getDagName());
     }
     this.metrics = metrics;
     this.requestId = getRequestId(request);
@@ -287,7 +286,8 @@ public class TaskRunnerCallable extends 
CallableWithNdc<TaskRunner2Result> {
    */
   public void reportTaskKilled() {
     killedTaskHandler
-        .taskKilled(request.getAmHost(), request.getAmPort(), 
request.getUser(), jobToken,
+        .taskKilled(request.getAmHost(), request.getAmPort(), 
request.getUser(), jobToken, null,
+            taskSpec.getDAGName(),
             taskSpec.getTaskAttemptID());
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
index 2f5e11d..fae7654 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java
@@ -32,8 +32,8 @@ public interface LlapTaskUmbilicalProtocol extends 
VersionedProtocol {
   public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request)
       throws IOException, TezException;
 
-  public void nodeHeartbeat(Text hostname, int port);
+  public void nodeHeartbeat(Text hostname, int port) throws IOException;
 
-  public void taskKilled(TezTaskAttemptID taskAttemptId);
+  public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException;
 
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
----------------------------------------------------------------------
diff --git 
a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
 
b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
index 6a38d85..2305b8c 100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java
@@ -31,6 +31,7 @@ import com.google.protobuf.ServiceException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.llap.LlapNodeId;
 import org.apache.hadoop.hive.llap.configuration.LlapConfiguration;
+import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler;
 import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo;
 import 
org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto;
@@ -449,7 +450,7 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     }
 
     @Override
-    public void nodeHeartbeat(Text hostname, int port) {
+    public void nodeHeartbeat(Text hostname, int port) throws IOException {
       entityTracker.nodePinged(hostname.toString(), port);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Received heartbeat from [" + hostname + ":" + port +"]");
@@ -457,7 +458,7 @@ public class LlapTaskCommunicator extends 
TezTaskCommunicatorImpl {
     }
 
     @Override
-    public void taskKilled(TezTaskAttemptID taskAttemptId) {
+    public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException {
       // TODO Unregister the task for state updates, which could in turn 
unregister the node.
       getTaskCommunicatorContext().taskKilled(taskAttemptId,
           TaskAttemptEndReason.EXTERNAL_PREEMPTION, "Attempt preempted");

Reply via email to