HIVE-16020: LLAP : Reduce IPC connection misses (Rajesh Balamohan, Siddharth Seth, reviewed by Sergey Shelukhin)
Change-Id: Ibf7f4c1a9840fa1463a7c7c4a777e5a276254e2e Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/59cde667 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/59cde667 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/59cde667 Branch: refs/heads/branch-2.2 Commit: 59cde667d77b549409898fdb30a549223a251e32 Parents: 5be5391 Author: Prasanth Jayachandran <pjayachand...@hortonworks.com> Authored: Mon Feb 27 19:44:19 2017 -0800 Committer: Owen O'Malley <omal...@apache.org> Committed: Tue Mar 28 15:27:52 2017 -0700 ---------------------------------------------------------------------- .../hive/llap/daemon/impl/AMReporter.java | 5 ++-- .../llap/daemon/impl/ContainerRunnerImpl.java | 16 ++++++---- .../hive/llap/daemon/impl/LlapDaemon.java | 8 +++-- .../hadoop/hive/llap/daemon/impl/QueryInfo.java | 29 ++++++++++++++++++ .../hive/llap/daemon/impl/QueryTracker.java | 4 ++- .../llap/daemon/impl/TaskRunnerCallable.java | 31 +++++++++++--------- .../daemon/impl/TaskExecutorTestHelpers.java | 4 ++- 7 files changed, 72 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/59cde667/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java index 32f070a..ede7e00 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/AMReporter.java @@ -105,7 +105,8 @@ public class AMReporter extends AbstractService { private final DaemonId daemonId; public AMReporter(int numExecutors, int maxThreads, AtomicReference<InetSocketAddress> - localAddress, QueryFailedHandler queryFailedHandler, Configuration conf, DaemonId daemonId) { + localAddress, QueryFailedHandler queryFailedHandler, Configuration conf, DaemonId daemonId, + SocketFactory socketFactory) { super(AMReporter.class.getName()); this.localAddress = localAddress; this.queryFailedHandler = queryFailedHandler; @@ -137,7 +138,7 @@ public class AMReporter extends AbstractService { .retryUpToMaximumTimeWithFixedSleep(retryTimeout, retrySleep, TimeUnit.MILLISECONDS); - this.socketFactory = NetUtils.getDefaultSocketFactory(conf); + this.socketFactory = socketFactory; LOG.info("Setting up AMReporter with " + "heartbeatInterval(ms)=" + heartbeatInterval + http://git-wip-us.apache.org/repos/asf/hive/blob/59cde667/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 6908138..cc4eff0 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -83,6 +83,8 @@ import com.google.common.base.Preconditions; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; +import javax.net.SocketFactory; + public class ContainerRunnerImpl extends CompositeService implements ContainerRunner, FragmentCompletionHandler, QueryFailedHandler { // TODO Setup a set of threads to process incoming requests. @@ -107,12 +109,14 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu private final String clusterId; private final DaemonId daemonId; private final UgiFactory fsUgiFactory; + private final SocketFactory socketFactory; public ContainerRunnerImpl(Configuration conf, int numExecutors, int waitQueueSize, boolean enablePreemption, String[] localDirsBase, AtomicReference<Integer> localShufflePort, AtomicReference<InetSocketAddress> localAddress, long totalMemoryAvailableBytes, LlapDaemonExecutorMetrics metrics, - AMReporter amReporter, ClassLoader classLoader, DaemonId daemonId, UgiFactory fsUgiFactory) { + AMReporter amReporter, ClassLoader classLoader, DaemonId daemonId, UgiFactory fsUgiFactory, + SocketFactory socketFactory) { super("ContainerRunnerImpl"); Preconditions.checkState(numExecutors > 0, "Invalid number of executors: " + numExecutors + ". Must be > 0"); @@ -122,6 +126,7 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu this.signer = UserGroupInformation.isSecurityEnabled() ? new LlapSignerImpl(conf, daemonId.getClusterString()) : null; this.fsUgiFactory = fsUgiFactory; + this.socketFactory = socketFactory; this.clusterId = daemonId.getClusterString(); this.daemonId = daemonId; @@ -239,7 +244,8 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu queryIdentifier, qIdProto.getApplicationIdString(), dagId, vertex.getDagName(), vertex.getHiveQueryId(), dagIdentifier, vertex.getVertexName(), request.getFragmentNumber(), request.getAttemptNumber(), - vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo); + vertex.getUser(), vertex, jobToken, fragmentIdString, tokenInfo, request.getAmHost(), + request.getAmPort()); String[] localDirs = fragmentInfo.getLocalDirs(); Preconditions.checkNotNull(localDirs); @@ -250,12 +256,12 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu // Used for re-localization, to add the user specified configuration (conf_pb_binary_stream) Configuration callableConf = new Configuration(getConfig()); - UserGroupInformation taskUgi = fsUgiFactory == null ? null : fsUgiFactory.createUgi(); + UserGroupInformation fsTaskUgi = fsUgiFactory == null ? null : fsUgiFactory.createUgi(); TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, callableConf, new ExecutionContextImpl(localAddress.get().getHostName()), env, credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, - this, tezHadoopShim, attemptId, vertex, initialEvent, taskUgi, - completionListener); + this, tezHadoopShim, attemptId, vertex, initialEvent, fsTaskUgi, + completionListener, socketFactory); submissionState = executorService.schedule(callable); if (LOG.isInfoEnabled()) { http://git-wip-us.apache.org/repos/asf/hive/blob/59cde667/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java index 0494f0d..1ede5a1 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/LlapDaemon.java @@ -29,6 +29,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import javax.management.ObjectName; +import javax.net.SocketFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.LogUtils; @@ -63,6 +64,7 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge; import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBridge.UdfWhitelistChecker; import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.CompositeService; @@ -106,6 +108,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla private final long maxJvmMemory; private final String[] localDirs; private final DaemonId daemonId; + private final SocketFactory socketFactory; // TODO Not the best way to share the address private final AtomicReference<InetSocketAddress> srvAddress = new AtomicReference<>(), @@ -257,8 +260,9 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla " sessionId: " + sessionId); int maxAmReporterThreads = HiveConf.getIntVar(daemonConf, ConfVars.LLAP_DAEMON_AM_REPORTER_MAX_THREADS); + this.socketFactory = NetUtils.getDefaultSocketFactory(daemonConf); this.amReporter = new AMReporter(numExecutors, maxAmReporterThreads, srvAddress, - new QueryFailedHandlerProxy(), daemonConf, daemonId); + new QueryFailedHandlerProxy(), daemonConf, daemonId, socketFactory); SecretManager sm = null; if (UserGroupInformation.isSecurityEnabled()) { @@ -276,7 +280,7 @@ public class LlapDaemon extends CompositeService implements ContainerRunner, Lla } this.containerRunner = new ContainerRunnerImpl(daemonConf, numExecutors, waitQueueSize, enablePreemption, localDirs, this.shufflePort, srvAddress, executorMemoryPerInstance, metrics, - amReporter, executorClassLoader, daemonId, fsUgiFactory); + amReporter, executorClassLoader, daemonId, fsUgiFactory, socketFactory); addIfService(containerRunner); // Not adding the registry as a service, since we need to control when it is initialized - conf used to pickup properties. http://git-wip-us.apache.org/repos/asf/hive/blob/59cde667/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java index 5d07e90..5f0271f 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryInfo.java @@ -16,6 +16,7 @@ package org.apache.hadoop.hive.llap.daemon.impl; import java.io.File; import java.io.IOException; +import java.net.InetSocketAddress; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -25,6 +26,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; import com.google.common.base.Preconditions; @@ -37,6 +39,11 @@ import org.apache.hadoop.hive.llap.LlapUtil; import org.apache.hadoop.hive.llap.daemon.FinishableStateUpdateHandler; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SignableVertexSpec; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; +import org.apache.tez.common.security.JobTokenIdentifier; public class QueryInfo { private final QueryIdentifier queryIdentifier; @@ -58,6 +65,7 @@ public class QueryInfo { private final FinishableStateTracker finishableStateTracker = new FinishableStateTracker(); private final String tokenUserName, appId; + private final AtomicReference<UserGroupInformation> umbilicalUgi; public QueryInfo(QueryIdentifier queryIdentifier, String appIdString, String dagIdString, String dagName, String hiveQueryIdString, @@ -77,6 +85,7 @@ public class QueryInfo { this.localFs = localFs; this.tokenUserName = tokenUserName; this.appId = tokenAppId; + this.umbilicalUgi = new AtomicReference<>(); } public QueryIdentifier getQueryIdentifier() { @@ -298,4 +307,24 @@ public class QueryInfo { public String getTokenAppId() { return appId; } + + public void setupUmbilicalUgi(String umbilicalUser, Token<JobTokenIdentifier> appToken, String amHost, int amPort) { + synchronized (umbilicalUgi) { + if (umbilicalUgi.get() == null) { + UserGroupInformation taskOwner = + UserGroupInformation.createRemoteUser(umbilicalUser); + final InetSocketAddress address = + NetUtils.createSocketAddrForHost(amHost, amPort); + SecurityUtil.setTokenService(appToken, address); + taskOwner.addToken(appToken); + umbilicalUgi.set(taskOwner); + } + } + } + + public UserGroupInformation getUmbilicalUgi() { + synchronized (umbilicalUgi) { + return umbilicalUgi.get(); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/59cde667/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index 9eaddd2..5cf3a38 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -139,7 +139,7 @@ public class QueryTracker extends AbstractService { QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagIdString, String dagName, String hiveQueryIdString, int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, String user, SignableVertexSpec vertex, Token<JobTokenIdentifier> appToken, - String fragmentIdString, LlapTokenInfo tokenInfo) throws IOException { + String fragmentIdString, LlapTokenInfo tokenInfo, String amHost, int amPort) throws IOException { ReadWriteLock dagLock = getDagLock(queryIdentifier); // Note: This is a readLock to prevent a race with queryComplete. Operations @@ -174,6 +174,8 @@ public class QueryTracker extends AbstractService { if (old != null) { queryInfo = old; } else { + // Ensure the UGI is setup once. + queryInfo.setupUmbilicalUgi(vertex.getTokenIdentifier(), appToken, amHost, amPort); isExistingQueryInfo = false; } } http://git-wip-us.apache.org/repos/asf/hive/blob/59cde667/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java index fe124ff..8739d5b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/TaskRunnerCallable.java @@ -41,7 +41,6 @@ import org.apache.hadoop.hive.ql.io.IOContextMap; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.Credentials; -import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.log4j.MDC; @@ -65,6 +64,7 @@ import org.apache.tez.runtime.task.TezTaskRunner2; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.SocketFactory; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.security.PrivilegedExceptionAction; @@ -116,7 +116,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { private final SignableVertexSpec vertex; private final TezEvent initialEvent; private final SchedulerFragmentCompletingListener completionListener; - private UserGroupInformation taskUgi; + private UserGroupInformation fsTaskUgi; + private final SocketFactory socketFactory; @VisibleForTesting public TaskRunnerCallable(SubmitWorkRequestProto request, QueryFragmentInfo fragmentInfo, @@ -125,7 +126,8 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { LlapDaemonExecutorMetrics metrics, KilledTaskHandler killedTaskHandler, FragmentCompletionHandler fragmentCompleteHandler, HadoopShim tezHadoopShim, TezTaskAttemptID attemptId, SignableVertexSpec vertex, TezEvent initialEvent, - UserGroupInformation taskUgi, SchedulerFragmentCompletingListener completionListener) { + UserGroupInformation fsTaskUgi, SchedulerFragmentCompletingListener completionListener, + SocketFactory socketFactory) { this.request = request; this.fragmentInfo = fragmentInfo; this.conf = conf; @@ -153,8 +155,9 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { this.fragmentCompletionHanler = fragmentCompleteHandler; this.tezHadoopShim = tezHadoopShim; this.initialEvent = initialEvent; - this.taskUgi = taskUgi; + this.fsTaskUgi = fsTaskUgi; this.completionListener = completionListener; + this.socketFactory = socketFactory; } public long getStartTime() { @@ -195,27 +198,27 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { // TODO Consolidate this code with TezChild. runtimeWatch.start(); - if (taskUgi == null) { - taskUgi = UserGroupInformation.createRemoteUser(vertex.getUser()); + if (fsTaskUgi == null) { + fsTaskUgi = UserGroupInformation.createRemoteUser(vertex.getUser()); } - taskUgi.addCredentials(credentials); + fsTaskUgi.addCredentials(credentials); Map<String, ByteBuffer> serviceConsumerMetadata = new HashMap<>(); serviceConsumerMetadata.put(TezConstants.TEZ_SHUFFLE_HANDLER_SERVICE_ID, TezCommonUtils.convertJobTokenToBytes(jobToken)); Multimap<String, String> startedInputsMap = createStartedInputMap(vertex); - UserGroupInformation taskOwner = - UserGroupInformation.createRemoteUser(vertex.getTokenIdentifier()); + final UserGroupInformation taskOwner = fragmentInfo.getQueryInfo().getUmbilicalUgi(); + if (LOG.isDebugEnabled()) { + LOG.debug("taskOwner hashCode:" + taskOwner.hashCode()); + } final InetSocketAddress address = NetUtils.createSocketAddrForHost(request.getAmHost(), request.getAmPort()); - SecurityUtil.setTokenService(jobToken, address); - taskOwner.addToken(jobToken); umbilical = taskOwner.doAs(new PrivilegedExceptionAction<LlapTaskUmbilicalProtocol>() { @Override public LlapTaskUmbilicalProtocol run() throws Exception { return RPC.getProxy(LlapTaskUmbilicalProtocol.class, - LlapTaskUmbilicalProtocol.versionID, address, conf); + LlapTaskUmbilicalProtocol.versionID, address, taskOwner, conf, socketFactory); } }); @@ -237,7 +240,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { try { synchronized (this) { if (shouldRunTask) { - taskRunner = new TezTaskRunner2(conf, taskUgi, fragmentInfo.getLocalDirs(), + taskRunner = new TezTaskRunner2(conf, fsTaskUgi, fragmentInfo.getLocalDirs(), taskSpec, vertex.getQueryIdentifier().getAppAttemptNumber(), serviceConsumerMetadata, envMap, startedInputsMap, taskReporter, executor, @@ -259,7 +262,7 @@ public class TaskRunnerCallable extends CallableWithNdc<TaskRunner2Result> { isCompleted.set(true); return result; } finally { - FileSystem.closeAllForUGI(taskUgi); + FileSystem.closeAllForUGI(fsTaskUgi); LOG.info("ExecutionTime for Container: " + request.getContainerIdString() + "=" + runtimeWatch.stop().elapsedMillis()); if (LOG.isDebugEnabled()) { http://git-wip-us.apache.org/repos/asf/hive/blob/59cde667/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java index 5dc1be5..ae3328a 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/impl/TaskExecutorTestHelpers.java @@ -44,6 +44,8 @@ import org.apache.tez.runtime.task.TaskRunner2Result; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.net.SocketFactory; + public class TaskExecutorTestHelpers { private static final Logger LOG = LoggerFactory.getLogger(TestTaskExecutorService.class); @@ -184,7 +186,7 @@ public class TaskExecutorTestHelpers { mock(KilledTaskHandler.class), mock( FragmentCompletionHandler.class), new DefaultHadoopShim(), null, requestProto.getWorkSpec().getVertex(), initialEvent, null, mock( - SchedulerFragmentCompletingListener.class)); + SchedulerFragmentCompletingListener.class), mock(SocketFactory.class)); this.workTime = workTime; this.canFinish = canFinish; }