Repository: hive Updated Branches: refs/heads/llap b8cb9a1b9 -> c5dc87a8e
HIVE-10762. LLAP: Kill any fragments running in a daemon when a query completes. (Siddharth Seth) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c5dc87a8 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c5dc87a8 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c5dc87a8 Branch: refs/heads/llap Commit: c5dc87a8e8efb025925c236898f425223f23712a Parents: b8cb9a1 Author: Siddharth Seth <ss...@apache.org> Authored: Tue Jun 16 23:48:13 2015 -0700 Committer: Siddharth Seth <ss...@apache.org> Committed: Tue Jun 16 23:48:13 2015 -0700 ---------------------------------------------------------------------- .../llap/configuration/LlapConfiguration.java | 24 +++- .../hive/llap/daemon/KilledTaskHandler.java | 3 +- .../hive/llap/daemon/QueryFailedHandler.java | 20 +++ .../hive/llap/daemon/impl/AMReporter.java | 122 ++++++++++++++++--- .../llap/daemon/impl/ContainerRunnerImpl.java | 34 +++++- .../hive/llap/daemon/impl/LlapDaemon.java | 11 +- .../llap/daemon/impl/QueryFragmentInfo.java | 4 + .../hadoop/hive/llap/daemon/impl/QueryInfo.java | 5 + .../hive/llap/daemon/impl/QueryTracker.java | 17 ++- .../llap/daemon/impl/TaskExecutorService.java | 10 +- .../llap/daemon/impl/TaskRunnerCallable.java | 6 +- .../protocol/LlapTaskUmbilicalProtocol.java | 4 +- .../llap/tezplugins/LlapTaskCommunicator.java | 5 +- 13 files changed, 229 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java index dd24661..f5aa2a6 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/configuration/LlapConfiguration.java @@ -51,8 +51,26 @@ public class LlapConfiguration extends Configuration { public static final boolean LLAP_DAEMON_SHUFFLE_DIR_WATCHER_ENABLED_DEFAULT = false; // This needs to be kept below the task timeout interval, but otherwise as high as possible to avoid unnecessary traffic. - public static final String LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS = LLAP_DAEMON_PREFIX + "liveness.heartbeat.interval-ms"; - public static final long LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT = 10000l; + public static final String LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS = LLAP_DAEMON_PREFIX + "am.liveness.heartbeat.interval-ms"; + public static final long LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT = 10000l; + + /** + * Amount of time to wait on connection failures to the AM from an LLAP daemon before considering + * the AM to be dead + */ + public static final String LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS = + LLAP_PREFIX + "am.liveness.connection.timeout-millis"; + public static final long LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS_DEFAULT = 10000l; + + // Not used yet - since the Writable RPC engine does not support this policy. + /** + * Sleep duration while waiting to retry connection failures to the AM from the daemon for the + * general keep-alive thread + */ + public static final String LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS = + LLAP_PREFIX + "am.liveness.connection.sleep-between-retries-millis"; + public static final long LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT = + 2000l; // Section for configs used in AM and executors @@ -137,6 +155,8 @@ public class LlapConfiguration extends Configuration { LLAP_PREFIX + "task.communicator.connection.sleep-between-retries-millis"; public static final long LLAP_TASK_COMMUNICATOR_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT = 2000l; + + public static final String LLAP_DAEMON_SERVICE_PORT = LLAP_DAEMON_PREFIX + "service.port"; public static final int LLAP_DAEMON_SERVICE_PORT_DEFAULT = 15002; http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java index 8b481c8..7cb433b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/KilledTaskHandler.java @@ -24,5 +24,6 @@ public interface KilledTaskHandler { // inferred from this. // Passing in parameters until there's some dag information stored and tracked in the daemon. void taskKilled(String amLocation, int port, String user, - Token<JobTokenIdentifier> jobToken, TezTaskAttemptID taskAttemptId); + Token<JobTokenIdentifier> jobToken, String queryId, String dagName, + TezTaskAttemptID taskAttemptId); } http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java new file mode 100644 index 0000000..4e62a68 --- /dev/null +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/QueryFailedHandler.java @@ -0,0 +1,20 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.llap.daemon; + +public interface QueryFailedHandler { + + public void queryFailed(String queryId, String dagName); +} http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index 1ba18fc..8ec9f22 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -14,6 +14,7 @@ package org.apache.hadoop.hive.llap.daemon.impl; +import javax.net.SocketFactory; import java.io.IOException; import java.net.InetSocketAddress; import java.security.PrivilegedExceptionAction; @@ -39,8 +40,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.CallableWithNdc; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler; import org.apache.hadoop.hive.llap.protocol.LlapTaskUmbilicalProtocol; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; @@ -79,9 +83,13 @@ public class AMReporter extends AbstractService { private static final Logger LOG = LoggerFactory.getLogger(AMReporter.class); private volatile LlapNodeId nodeId; + private final QueryFailedHandler queryFailedHandler; private final Configuration conf; private final ListeningExecutorService queueLookupExecutor; private final ListeningExecutorService executor; + private final RetryPolicy retryPolicy; + private final long retryTimeout; + private final SocketFactory socketFactory; private final DelayQueue<AMNodeInfo> pendingHeartbeatQueeu = new DelayQueue(); private final AtomicReference<InetSocketAddress> localAddress; private final long heartbeatInterval; @@ -91,9 +99,11 @@ public class AMReporter extends AbstractService { private final Map<LlapNodeId, AMNodeInfo> knownAppMasters = new HashMap<>(); volatile ListenableFuture<Void> queueLookupFuture; - public AMReporter(AtomicReference<InetSocketAddress> localAddress, Configuration conf) { + public AMReporter(AtomicReference<InetSocketAddress> localAddress, + QueryFailedHandler queryFailedHandler, Configuration conf) { super(AMReporter.class.getName()); this.localAddress = localAddress; + this.queryFailedHandler = queryFailedHandler; this.conf = conf; ExecutorService rawExecutor = Executors.newCachedThreadPool( new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporter %d").build()); @@ -102,9 +112,25 @@ public class AMReporter extends AbstractService { new ThreadFactoryBuilder().setDaemon(true).setNameFormat("AMReporterQueueDrainer").build()); this.queueLookupExecutor = MoreExecutors.listeningDecorator(rawExecutor2); this.heartbeatInterval = - conf.getLong(LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS, - LlapConfiguration.LLAP_DAEMON_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT); - + conf.getLong(LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS, + LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_HEARTBEAT_INTERVAL_MS_DEFAULT); + + this.retryTimeout = + conf.getLong(LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS, + LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_TIMEOUT_MILLIS_DEFAULT); + long retrySleep = conf.getLong( + LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS, + LlapConfiguration.LLAP_DAEMON_AM_LIVENESS_CONNECTION_SLEEP_BETWEEN_RETRIES_MILLIS_DEFAULT); + this.retryPolicy = RetryPolicies + .retryUpToMaximumTimeWithFixedSleep(retryTimeout, retrySleep, + TimeUnit.MILLISECONDS); + + this.socketFactory = NetUtils.getDefaultSocketFactory(conf); + + LOG.info("Setting up AMReporter with " + + "heartbeatInterval(ms)=" + heartbeatInterval + + ", retryTime(ms)=" + retryTimeout + + ", retrySleep(ms)=" + retrySleep); } @Override @@ -143,23 +169,26 @@ public class AMReporter extends AbstractService { } } - - public void registerTask(String amLocation, int port, String user, Token<JobTokenIdentifier> jobToken) { + public void registerTask(String amLocation, int port, String user, + Token<JobTokenIdentifier> jobToken, String queryId, String dagName) { if (LOG.isTraceEnabled()) { - LOG.trace("Registering for heartbeat: " + amLocation + ":" + port); + LOG.trace("Registering for heartbeat: " + amLocation + ":" + port + " for dagName=" + dagName); } AMNodeInfo amNodeInfo; synchronized (knownAppMasters) { LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); amNodeInfo = knownAppMasters.get(amNodeId); if (amNodeInfo == null) { - amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, conf); + amNodeInfo = + new AMNodeInfo(amNodeId, user, jobToken, dagName, retryPolicy, retryTimeout, socketFactory, + conf); knownAppMasters.put(amNodeId, amNodeInfo); // Add to the queue only the first time this is registered, and on // subsequent instances when it's taken off the queue. amNodeInfo.setNextHeartbeatTime(System.currentTimeMillis() + heartbeatInterval); pendingHeartbeatQueeu.add(amNodeInfo); } + amNodeInfo.setCurrentDagName(dagName); amNodeInfo.incrementAndGetTaskCount(); } } @@ -182,11 +211,13 @@ public class AMReporter extends AbstractService { } public void taskKilled(String amLocation, int port, String user, Token<JobTokenIdentifier> jobToken, - final TezTaskAttemptID taskAttemptId) { + final String queryId, final String dagName, final TezTaskAttemptID taskAttemptId) { // Not re-using the connection for the AM heartbeat - which may or may not be open by this point. // knownAppMasters is used for sending heartbeats for queued tasks. Killed messages use a new connection. LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); - AMNodeInfo amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, conf); + AMNodeInfo amNodeInfo = + new AMNodeInfo(amNodeId, user, jobToken, dagName, retryPolicy, retryTimeout, socketFactory, + conf); // Even if the service hasn't started up. It's OK to make this invocation since this will // only happen after the AtomicReference address has been populated. Not adding an additional check. @@ -212,9 +243,15 @@ public class AMReporter extends AbstractService { protected Void callInternal() { while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { try { - AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take(); - if (amNodeInfo.getTaskCount() == 0) { + final AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take(); + if (amNodeInfo.getTaskCount() == 0 || amNodeInfo.hasAmFailed()) { synchronized (knownAppMasters) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Removing am {} with last associated dag{} from heartbeat with taskCount={}, amFailed={}", + amNodeInfo.amNodeId, amNodeInfo.getCurrentDagName(), amNodeInfo.getTaskCount(), + amNodeInfo.hasAmFailed(), amNodeInfo); + } knownAppMasters.remove(amNodeInfo.amNodeId); } amNodeInfo.stopUmbilical(); @@ -223,7 +260,22 @@ public class AMReporter extends AbstractService { long next = System.currentTimeMillis() + heartbeatInterval; amNodeInfo.setNextHeartbeatTime(next); pendingHeartbeatQueeu.add(amNodeInfo); - executor.submit(new AMHeartbeatCallable(amNodeInfo)); + ListenableFuture<Void> future = executor.submit(new AMHeartbeatCallable(amNodeInfo)); + Futures.addCallback(future, new FutureCallback<Void>() { + @Override + public void onSuccess(Void result) { + // Nothing to do. + } + + @Override + public void onFailure(Throwable t) { + String currentDagName = amNodeInfo.getCurrentDagName(); + amNodeInfo.setAmFailed(true); + LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}", + amNodeInfo.amNodeId, currentDagName, t); + queryFailedHandler.queryFailed(null, currentDagName); + } + }); } } catch (InterruptedException e) { if (isShutdown.get()) { @@ -284,11 +336,14 @@ public class AMReporter extends AbstractService { amNodeInfo.getUmbilical().nodeHeartbeat(new Text(nodeId.getHostname()), nodeId.getPort()); } catch (IOException e) { - // TODO Ideally, this could be used to avoid running a task - AM down / unreachable, so there's no point running it. - LOG.warn("Failed to communicate with AM. May retry later: " + amNodeInfo.amNodeId, e); + String currentDagName = amNodeInfo.getCurrentDagName(); + amNodeInfo.setAmFailed(true); + LOG.warn("Failed to communicated with AM at {}. Killing remaining fragments for query {}", + amNodeInfo.amNodeId, currentDagName, e); + queryFailedHandler.queryFailed(null, currentDagName); } catch (InterruptedException e) { if (!isShutdown.get()) { - LOG.warn("Interrupted while trying to send heartbeat to AM: " + amNodeInfo.amNodeId, e); + LOG.warn("Interrupted while trying to send heartbeat to AM {}", amNodeInfo.amNodeId, e); } } } else { @@ -308,15 +363,28 @@ public class AMReporter extends AbstractService { private final Token<JobTokenIdentifier> jobToken; private final Configuration conf; private final LlapNodeId amNodeId; + private final RetryPolicy retryPolicy; + private final long timeout; + private final SocketFactory socketFactory; + private final AtomicBoolean amFailed = new AtomicBoolean(false); + private String currentDagName; private LlapTaskUmbilicalProtocol umbilical; private long nextHeartbeatTime; public AMNodeInfo(LlapNodeId amNodeId, String user, Token<JobTokenIdentifier> jobToken, + String currentDagName, + RetryPolicy retryPolicy, + long timeout, + SocketFactory socketFactory, Configuration conf) { this.user = user; this.jobToken = jobToken; + this.currentDagName = currentDagName; + this.retryPolicy = retryPolicy; + this.timeout = timeout; + this.socketFactory = socketFactory; this.conf = conf; this.amNodeId = amNodeId; } @@ -331,8 +399,10 @@ public class AMReporter extends AbstractService { umbilical = ugi.doAs(new PrivilegedExceptionAction<LlapTaskUmbilicalProtocol>() { @Override public LlapTaskUmbilicalProtocol run() throws Exception { - return RPC.getProxy(LlapTaskUmbilicalProtocol.class, - LlapTaskUmbilicalProtocol.versionID, address, conf); + return RPC + .getProxy(LlapTaskUmbilicalProtocol.class, LlapTaskUmbilicalProtocol.versionID, + address, UserGroupInformation.getCurrentUser(), conf, socketFactory, + (int) timeout); } }); } @@ -354,10 +424,26 @@ public class AMReporter extends AbstractService { return taskCount.decrementAndGet(); } + void setAmFailed(boolean val) { + amFailed.set(val); + } + + boolean hasAmFailed() { + return amFailed.get(); + } + int getTaskCount() { return taskCount.get(); } + public synchronized String getCurrentDagName() { + return currentDagName; + } + + public synchronized void setCurrentDagName(String currentDagName) { + this.currentDagName = currentDagName; + } + synchronized void setNextHeartbeatTime(long nextTime) { nextHeartbeatTime = nextTime; } http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 10e192e..e26852a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -19,6 +19,7 @@ import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.RejectedExecutionException; @@ -29,6 +30,7 @@ import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; import org.apache.hadoop.hive.llap.daemon.HistoryLogger; import org.apache.hadoop.hive.llap.daemon.KilledTaskHandler; +import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.GroupInputSpecProto; @@ -58,7 +60,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; // TODO Convert this to a CompositeService -public class ContainerRunnerImpl extends CompositeService implements ContainerRunner, FragmentCompletionHandler { +public class ContainerRunnerImpl extends CompositeService implements ContainerRunner, FragmentCompletionHandler, QueryFailedHandler { // TODO Setup a set of threads to process incoming requests. // Make sure requests for a single dag/query are handled by the same thread @@ -212,7 +214,16 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu @Override public void queryComplete(QueryCompleteRequestProto request) { - queryTracker.queryComplete(null, request.getDagName(), request.getDeleteDelay()); + LOG.info("Processing queryComplete notification for {}", request.getDagName()); + List<QueryFragmentInfo> knownFragments = + queryTracker.queryComplete(null, request.getDagName(), request.getDeleteDelay()); + LOG.info("DBG: Pending fragment count for completed query {} = {}", request.getDagName(), + knownFragments.size()); + for (QueryFragmentInfo fragmentInfo : knownFragments) { + LOG.info("DBG: Issuing killFragment for completed query {} {}", request.getDagName(), + fragmentInfo.getFragmentIdentifierString()); + executorService.killFragment(fragmentInfo.getFragmentIdentifierString()); + } } @Override @@ -288,12 +299,27 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu queryTracker.fragmentComplete(fragmentInfo); } + @Override + public void queryFailed(String queryId, String dagName) { + LOG.info("Processing query failed notification for {}", dagName); + List<QueryFragmentInfo> knownFragments = + queryTracker.queryComplete(null, dagName, -1); + LOG.info("DBG: Pending fragment count for failed query {} = {}", dagName, + knownFragments.size()); + for (QueryFragmentInfo fragmentInfo : knownFragments) { + LOG.info("DBG: Issuing killFragment for failed query {} {}", dagName, + fragmentInfo.getFragmentIdentifierString()); + executorService.killFragment(fragmentInfo.getFragmentIdentifierString()); + } + } + private class KilledTaskHandlerImpl implements KilledTaskHandler { @Override public void taskKilled(String amLocation, int port, String user, - Token<JobTokenIdentifier> jobToken, TezTaskAttemptID taskAttemptId) { - amReporter.taskKilled(amLocation, port, user, jobToken, taskAttemptId); + Token<JobTokenIdentifier> jobToken, String queryId, String dagName, + TezTaskAttemptID taskAttemptId) { + amReporter.taskKilled(amLocation, port, user, jobToken, queryId, dagName, taskAttemptId); } } http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 7959945..75d1995 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -31,6 +31,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; +import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler; import org.apache.hadoop.hive.llap.daemon.registry.impl.LlapRegistryService; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateUpdatedRequestProto; @@ -157,7 +158,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla " sessionId: " + sessionId); - this.amReporter = new AMReporter(address, daemonConf); + this.amReporter = new AMReporter(address, new QueryFailedHandlerProxy(), daemonConf); this.server = new LlapDaemonProtocolServerImpl(numHandlers, this, address, rpcPort); @@ -418,4 +419,12 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla } } + + private class QueryFailedHandlerProxy implements QueryFailedHandler { + + @Override + public void queryFailed(String queryId, String dagName) { + containerRunner.queryFailed(queryId, dagName); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java index 554864e..aa065a9 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryFragmentInfo.java @@ -71,6 +71,10 @@ public class QueryFragmentInfo { return attemptNumber; } + public String getFragmentIdentifierString() { + return fragmentSpec.getFragmentIdentifierString(); + } + /** * Check whether a task can run to completion or may end up blocking on it's sources. * This currently happens via looking up source state. http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index 6aed60f..27f2d4c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -28,6 +28,7 @@ import java.util.concurrent.ConcurrentMap; import com.google.common.base.Preconditions; import com.google.common.collect.HashMultimap; +import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -101,6 +102,10 @@ public class QueryInfo { knownFragments.remove(fragmentInfo); } + public List<QueryFragmentInfo> getRegisteredFragments() { + return Lists.newArrayList(knownFragments); + } + private synchronized void createLocalDirs() throws IOException { if (localDirs == null) { localDirs = new String[localDirsBase.length]; http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index d796b24..19147e3 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -16,6 +16,7 @@ package org.apache.hadoop.hive.llap.daemon.impl; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentSpecProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; @@ -25,6 +26,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Collections; +import java.util.List; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -47,6 +49,7 @@ public class QueryTracker extends CompositeService { private final String[] localDirsBase; private final FileSystem localFs; + private final long defaultDeleteDelaySeconds; // TODO At the moment there's no way of knowing whether a query is running or not. // A race is possible between dagComplete and registerFragment - where the registerFragment @@ -75,6 +78,9 @@ public class QueryTracker extends CompositeService { throw new RuntimeException("Failed to setup local filesystem instance", e); } + this.defaultDeleteDelaySeconds = conf.getLong(LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS, + LlapConfiguration.LLAP_FILE_CLEANUP_DELAY_SECONDS_DEFAULT); + queryFileCleaner = new QueryFileCleaner(conf, localFs); addService(queryFileCleaner); } @@ -142,7 +148,10 @@ public class QueryTracker extends CompositeService { * @param dagName * @param deleteDelay */ - void queryComplete(String queryId, String dagName, long deleteDelay) { + List<QueryFragmentInfo> queryComplete(String queryId, String dagName, long deleteDelay) { + if (deleteDelay == -1) { + deleteDelay = defaultDeleteDelaySeconds; + } ReadWriteLock dagLock = getDagLock(dagName); dagLock.writeLock().lock(); try { @@ -153,6 +162,7 @@ public class QueryTracker extends CompositeService { QueryInfo queryInfo = queryInfoMap.remove(dagName); if (queryInfo == null) { LOG.warn("Ignoring query complete for unknown dag: {}", dagName); + return Collections.emptyList(); } String[] localDirs = queryInfo.getLocalDirsNoCreate(); if (localDirs != null) { @@ -161,8 +171,13 @@ public class QueryTracker extends CompositeService { ShuffleHandler.get().unregisterDag(localDir, dagName, queryInfo.getDagIdentifier()); } } + // Clearing this before sending a kill is OK, since canFinish will change to false. + // Ideally this should be a state machine where kills are issued to the executor, + // and the structures are cleaned up once all tasks complete. New requests, however, should not + // be allowed after a query complete is received. sourceCompletionMap.remove(dagName); dagSpecificLocks.remove(dagName); + return queryInfo.getRegisteredFragments(); // TODO HIVE-10762 Issue a kill message to all running fragments for this container. // TODO HIVE-10535 Cleanup map join cache } finally { http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java index 4c0fb8e..0fd89de 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorService.java @@ -263,11 +263,17 @@ public class TaskExecutorService extends AbstractService implements Scheduler<Ta } } - private static class WaitQueueWorkerCallback implements FutureCallback { + private class WaitQueueWorkerCallback implements FutureCallback { @Override public void onSuccess(Object result) { - LOG.error("Wait queue scheduler worker exited with success!"); + if (isShutdown.get()) { + LOG.info("Wait queue scheduler worker exited with success!"); + } else { + LOG.error("Wait queue scheduler worker exited with success!"); + Thread.getDefaultUncaughtExceptionHandler().uncaughtException(Thread.currentThread(), + new IllegalStateException("WaitQueue worked exited before shutdown")); + } } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index cd6a0da..9b14fa3 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -22,7 +22,6 @@ import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; import java.util.HashMap; import java.util.Map; -import java.util.Set; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; @@ -126,7 +125,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { // Register with the AMReporter when the callable is setup. Unregister once it starts running. if (jobToken != null) { this.amReporter.registerTask(request.getAmHost(), request.getAmPort(), - request.getUser(), jobToken); + request.getUser(), jobToken, null, request.getFragmentSpec().getDagName()); } this.metrics = metrics; this.requestId = getRequestId(request); @@ -287,7 +286,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { */ public void reportTaskKilled() { killedTaskHandler - .taskKilled(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken, + .taskKilled(request.getAmHost(), request.getAmPort(), request.getUser(), jobToken, null, + taskSpec.getDAGName(), taskSpec.getTaskAttemptID()); } http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java b/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java index 2f5e11d..fae7654 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/protocol/LlapTaskUmbilicalProtocol.java @@ -32,8 +32,8 @@ public interface LlapTaskUmbilicalProtocol extends VersionedProtocol { public TezHeartbeatResponse heartbeat(TezHeartbeatRequest request) throws IOException, TezException; - public void nodeHeartbeat(Text hostname, int port); + public void nodeHeartbeat(Text hostname, int port) throws IOException; - public void taskKilled(TezTaskAttemptID taskAttemptId); + public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException; } http://git-wip-us.apache.org/repos/asf/hive/blob/c5dc87a8/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java index 6a38d85..2305b8c 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskCommunicator.java @@ -31,6 +31,7 @@ import com.google.protobuf.ServiceException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.configuration.LlapConfiguration; +import org.apache.hadoop.hive.llap.daemon.QueryFailedHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentRuntimeInfo; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.QueryCompleteRequestProto; @@ -449,7 +450,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } @Override - public void nodeHeartbeat(Text hostname, int port) { + public void nodeHeartbeat(Text hostname, int port) throws IOException { entityTracker.nodePinged(hostname.toString(), port); if (LOG.isDebugEnabled()) { LOG.debug("Received heartbeat from [" + hostname + ":" + port +"]"); @@ -457,7 +458,7 @@ public class LlapTaskCommunicator extends TezTaskCommunicatorImpl { } @Override - public void taskKilled(TezTaskAttemptID taskAttemptId) { + public void taskKilled(TezTaskAttemptID taskAttemptId) throws IOException { // TODO Unregister the task for state updates, which could in turn unregister the node. getTaskCommunicatorContext().taskKilled(taskAttemptId, TaskAttemptEndReason.EXTERNAL_PREEMPTION, "Attempt preempted");