IVE-15958: LLAP: IPC connections are not being reused for umbilical protocol (Prasanth Jayachandran reviewed by Siddharth Seth)
Change-Id: Ib4e8ba9881cc560142dc3f75a130060b29ea7c57 Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/38c9ac74 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/38c9ac74 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/38c9ac74 Branch: refs/heads/branch-2.2 Commit: 38c9ac74596e3ab18af77a9a76d05bc97e0b1069 Parents: b24f270 Author: Prasanth Jayachandran <pjayachand...@hortonworks.com> Authored: Mon Feb 27 19:45:37 2017 -0800 Committer: Owen O'Malley <omal...@apache.org> Committed: Tue Mar 28 14:02:44 2017 -0700 ---------------------------------------------------------------------- .../hive/llap/daemon/impl/AMReporter.java | 74 ++++++++++++-------- .../llap/daemon/impl/ContainerRunnerImpl.java | 21 +++--- .../hadoop/hive/llap/daemon/impl/QueryInfo.java | 17 +++-- .../hive/llap/daemon/impl/QueryTracker.java | 20 +++--- .../daemon/impl/TaskExecutorTestHelpers.java | 2 +- 5 files changed, 82 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/38c9ac74/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index ede7e00..b01a495 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -113,9 +113,9 @@ public class AMReporter extends AbstractService { this.conf = conf; this.daemonId = daemonId; if (maxThreads < numExecutors) { - maxThreads = numExecutors; LOG.warn("maxThreads={} is less than numExecutors={}. Setting maxThreads=numExecutors", - maxThreads, numExecutors); + maxThreads, numExecutors); + maxThreads = numExecutors; } ExecutorService rawExecutor = new ThreadPoolExecutor(numExecutors, maxThreads, @@ -227,12 +227,15 @@ public class AMReporter extends AbstractService { public void taskKilled(String amLocation, int port, String user, Token<JobTokenIdentifier> jobToken, final QueryIdentifier queryIdentifier, final TezTaskAttemptID taskAttemptId) { - // Not re-using the connection for the AM heartbeat - which may or may not be open by this point. - // knownAppMasters is used for sending heartbeats for queued tasks. Killed messages use a new connection. LlapNodeId amNodeId = LlapNodeId.getInstance(amLocation, port); - AMNodeInfo amNodeInfo = - new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, - conf); + AMNodeInfo amNodeInfo; + synchronized (knownAppMasters) { + amNodeInfo = knownAppMasters.get(amNodeId); + if (amNodeInfo == null) { + amNodeInfo = new AMNodeInfo(amNodeId, user, jobToken, queryIdentifier, retryPolicy, retryTimeout, socketFactory, + conf); + } + } // Even if the service hasn't started up. It's OK to make this invocation since this will // only happen after the AtomicReference address has been populated. Not adding an additional check. @@ -252,6 +255,20 @@ public class AMReporter extends AbstractService { }); } + public void queryComplete(LlapNodeId llapNodeId) { + if (llapNodeId != null) { + synchronized (knownAppMasters) { + AMNodeInfo amNodeInfo = knownAppMasters.remove(llapNodeId); + // TODO: not stopping umbilical explicitly as some taskKill requests may get scheduled during queryComplete + // which will be using the umbilical. HIVE-16021 should fix this, until then leave umbilical open and wait for + // it to be closed after max idle timeout (10s default) + if (LOG.isDebugEnabled()) { + LOG.debug("Query complete received. Removed {}.", amNodeInfo); + } + } + } + } + private class QueueLookupCallable extends CallableWithNdc<Void> { @Override @@ -259,7 +276,7 @@ public class AMReporter extends AbstractService { while (!isShutdown.get() && !Thread.currentThread().isInterrupted()) { try { final AMNodeInfo amNodeInfo = pendingHeartbeatQueeu.take(); - if (amNodeInfo.getTaskCount() == 0 || amNodeInfo.hasAmFailed()) { + if (amNodeInfo.hasAmFailed()) { synchronized (knownAppMasters) { if (LOG.isDebugEnabled()) { LOG.debug( @@ -269,28 +286,29 @@ public class AMReporter extends AbstractService { } knownAppMasters.remove(amNodeInfo.amNodeId); } - amNodeInfo.stopUmbilical(); } else { - // Add back to the queue for the next heartbeat, and schedule the actual heartbeat - long next = System.currentTimeMillis() + heartbeatInterval; - amNodeInfo.setNextHeartbeatTime(next); - pendingHeartbeatQueeu.add(amNodeInfo); - ListenableFuture<Void> future = executor.submit(new AMHeartbeatCallable(amNodeInfo)); - Futures.addCallback(future, new FutureCallback<Void>() { - @Override - public void onSuccess(Void result) { - // Nothing to do. - } - - @Override - public void onFailure(Throwable t) { - QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier(); - amNodeInfo.setAmFailed(true); - LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}", + if (amNodeInfo.getTaskCount() > 0) { + // Add back to the queue for the next heartbeat, and schedule the actual heartbeat + long next = System.currentTimeMillis() + heartbeatInterval; + amNodeInfo.setNextHeartbeatTime(next); + pendingHeartbeatQueeu.add(amNodeInfo); + ListenableFuture<Void> future = executor.submit(new AMHeartbeatCallable(amNodeInfo)); + Futures.addCallback(future, new FutureCallback<Void>() { + @Override + public void onSuccess(Void result) { + // Nothing to do. + } + + @Override + public void onFailure(Throwable t) { + QueryIdentifier currentQueryIdentifier = amNodeInfo.getCurrentQueryIdentifier(); + amNodeInfo.setAmFailed(true); + LOG.warn("Heartbeat failed to AM {}. Killing all other tasks for the query={}", amNodeInfo.amNodeId, currentQueryIdentifier, t); - queryFailedHandler.queryFailed(currentQueryIdentifier); - } - }); + queryFailedHandler.queryFailed(currentQueryIdentifier); + } + }); + } } } catch (InterruptedException e) { if (isShutdown.get()) { http://git-wip-us.apache.org/repos/asf/hive/blob/38c9ac74/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index cc4eff0..1176e5e 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -30,6 +30,7 @@ import org.apache.hadoop.hive.common.UgiFactory; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.llap.DaemonId; +import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.NotTezEventHelper; import org.apache.hadoop.hive.llap.daemon.ContainerRunner; import org.apache.hadoop.hive.llap.daemon.FragmentCompletionHandler; @@ -240,12 +241,12 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials); + LlapNodeId amNodeId = LlapNodeId.getInstance(request.getAmHost(), request.getAmPort()); QueryFragmentInfo fragmentInfo = queryTracker.registerFragment( queryIdentifier, qIdProto.getApplicationIdString(), dagId, vertex.getDagName(), vertex.getHiveQueryId(), dagIdentifier, vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(), - vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, request.getAmHost(), - request.getAmPort()); + vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, amNodeId); String[] localDirs = fragmentInfo.getLocalDirs(); Preconditions.checkNotNull(localDirs); @@ -388,14 +389,18 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu new QueryIdentifier(request.getQueryIdentifier().getApplicationIdString(), request.getQueryIdentifier().getDagIndex()); LOG.info("Processing queryComplete notification for {}", queryIdentifier); - List<QueryFragmentInfo> knownFragments = queryTracker.queryComplete( - queryIdentifier, request.getDeleteDelay(), false); - LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier, + QueryInfo queryInfo = queryTracker.queryComplete(queryIdentifier, request.getDeleteDelay(), false); + if (queryInfo != null) { + List<QueryFragmentInfo> knownFragments = queryInfo.getRegisteredFragments(); + LOG.info("DBG: Pending fragment count for completed query {} = {}", queryIdentifier, knownFragments.size()); - for (QueryFragmentInfo fragmentInfo : knownFragments) { - LOG.info("Issuing killFragment for completed query {} {}", queryIdentifier, + for (QueryFragmentInfo fragmentInfo : knownFragments) { + LOG.info("Issuing killFragment for completed query {} {}", queryIdentifier, fragmentInfo.getFragmentIdentifierString()); - executorService.killFragment(fragmentInfo.getFragmentIdentifierString()); + executorService.killFragment(fragmentInfo.getFragmentIdentifierString()); + } + LlapNodeId amNodeId = queryInfo.getAmNodeId(); + amReporter.queryComplete(amNodeId); } return QueryCompleteResponseProto.getDefaultInstance(); } http://git-wip-us.apache.org/repos/asf/hive/blob/38c9ac74/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index 5f0271f..90641d4 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -35,6 +35,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; @@ -56,6 +57,7 @@ public class QueryInfo { private final String[] localDirsBase; private final FileSystem localFs; private String[] localDirs; + private final LlapNodeId amNodeId; // Map of states for different vertices. private final Set<QueryFragmentInfo> knownFragments = @@ -68,11 +70,11 @@ public class QueryInfo { private final AtomicReference<UserGroupInformation> umbilicalUgi; public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagIdString, - String dagName, String hiveQueryIdString, - int dagIdentifier, String user, - ConcurrentMap<String, SourceStateProto> sourceStateMap, - String[] localDirsBase, FileSystem localFs, String tokenUserName, - String tokenAppId) { + String dagName, String hiveQueryIdString, + int dagIdentifier, String user, + ConcurrentMap<String, SourceStateProto> sourceStateMap, + String[] localDirsBase, FileSystem localFs, String tokenUserName, + String tokenAppId, final LlapNodeId amNodeId) { this.queryIdentifier = queryIdentifier; this.appIdString = appIdString; this.dagIdString = dagIdString; @@ -86,6 +88,7 @@ public class QueryInfo { this.tokenUserName = tokenUserName; this.appId = tokenAppId; this.umbilicalUgi = new AtomicReference<>(); + this.amNodeId = amNodeId; } public QueryIdentifier getQueryIdentifier() { @@ -116,6 +119,10 @@ public class QueryInfo { return sourceStateMap; } + public LlapNodeId getAmNodeId() { + return amNodeId; + } + public QueryFragmentInfo registerFragment(String vertexName, int fragmentNumber, int attemptNumber, SignableVertexSpec vertexSpec, String fragmentIdString) { QueryFragmentInfo fragmentInfo = new QueryFragmentInfo( http://git-wip-us.apache.org/repos/asf/hive/blob/38c9ac74/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index 5cf3a38..7e646c5 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -21,6 +21,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.llap.LlapNodeId; import org.apache.hadoop.hive.llap.log.Log4jQueryCompleteMarker; import org.apache.hadoop.hive.llap.log.LogHelpers; import org.apache.hadoop.security.UserGroupInformation; @@ -69,8 +70,6 @@ public class QueryTracker extends AbstractService { private final ConcurrentHashMap<QueryIdentifier, QueryInfo> queryInfoMap = new ConcurrentHashMap<>(); - - private final String[] localDirsBase; private final FileSystem localFs; private final String clusterId; @@ -137,9 +136,10 @@ public class QueryTracker extends AbstractService { * Register a new fragment for a specific query */ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagIdString, - String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, - String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> appToken, - String fragmentIdString, LlapTokenInfo tokenInfo, String amHost, int amPort) throws IOException { + String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber, + int attemptNumber, + String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> appToken, + String fragmentIdString, LlapTokenInfo tokenInfo, final LlapNodeId amNodeId) throws IOException { ReadWriteLock dagLock = getDagLock(queryIdentifier); // Note: This is a readLock to prevent a race with queryComplete. Operations @@ -169,13 +169,13 @@ public class QueryTracker extends AbstractService { new QueryInfo(queryIdentifier, appIdString, dagIdString, dagName, hiveQueryIdString, dagIdentifier, user, getSourceCompletionMap(queryIdentifier), localDirsBase, localFs, - tokenInfo.userName, tokenInfo.appId); + tokenInfo.userName, tokenInfo.appId, amNodeId); QueryInfo old = queryInfoMap.putIfAbsent(queryIdentifier, queryInfo); if (old != null) { queryInfo = old; } else { // Ensure the UGI is setup once. - queryInfo.setupUmbilicalUgi(vertex.getTokenIdentifier(), appToken, amHost, amPort); + queryInfo.setupUmbilicalUgi(vertex.getTokenIdentifier(), appToken, amNodeId.getHostname(), amNodeId.getPort()); isExistingQueryInfo = false; } } @@ -238,7 +238,7 @@ public class QueryTracker extends AbstractService { * @param queryIdentifier * @param deleteDelay */ - List<QueryFragmentInfo> queryComplete(QueryIdentifier queryIdentifier, long deleteDelay, + QueryInfo queryComplete(QueryIdentifier queryIdentifier, long deleteDelay, boolean isInternal) throws IOException { if (deleteDelay == -1) { deleteDelay = defaultDeleteDelaySeconds; @@ -255,7 +255,7 @@ public class QueryTracker extends AbstractService { if (queryInfo == null) { // Should not happen. LOG.warn("Ignoring query complete for unknown dag: {}", queryIdentifier); - return Collections.emptyList(); + return null; } String[] localDirs = queryInfo.getLocalDirsNoCreate(); if (localDirs != null) { @@ -292,7 +292,7 @@ public class QueryTracker extends AbstractService { if (savedQueryId != null) { ObjectCacheFactory.removeLlapQueryCache(savedQueryId); } - return queryInfo.getRegisteredFragments(); + return queryInfo; } finally { dagLock.writeLock().unlock(); } http://git-wip-us.apache.org/repos/asf/hive/blob/38c9ac74/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index ae3328a..d1fce19 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -93,7 +93,7 @@ public class TaskExecutorTestHelpers { new QueryInfo(queryIdentifier, "fake_app_id_string", "fake_dag_id_string", "fake_dag_name", "fakeHiveQueryId", 1, "fakeUser", new ConcurrentHashMap<String, LlapDaemonProtocolProtos.SourceStateProto>(), - new String[0], null, "fakeUser", null); + new String[0], null, "fakeUser", null, null); return queryInfo; }