Repository: hive Updated Branches: refs/heads/master 43837e8ef -> 1e6fa1eb3
HIVE-12931. Shuffle tokens stay around forever in LLAP. (Siddharth Seth, reviewed by Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/1e6fa1eb Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/1e6fa1eb Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/1e6fa1eb Branch: refs/heads/master Commit: 1e6fa1eb3f8b7eeb551355cb6ebee483ef4e4d71 Parents: 43837e8 Author: Siddharth Seth <ss...@apache.org> Authored: Mon Feb 1 11:14:37 2016 -0800 Committer: Siddharth Seth <ss...@apache.org> Committed: Mon Feb 1 11:14:37 2016 -0800 ---------------------------------------------------------------------- .../llap/daemon/impl/ContainerRunnerImpl.java | 31 +++++-------- .../hive/llap/daemon/impl/QueryTracker.java | 14 +++++- .../llap/shufflehandler/ShuffleHandler.java | 49 +++++++++++++++----- 3 files changed, 62 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/1e6fa1eb/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java index 535fe76..e80fb15 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/ContainerRunnerImpl.java @@ -46,7 +46,6 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SubmitWor import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentRequestProto; import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.TerminateFragmentResponseProto; import org.apache.hadoop.hive.llap.metrics.LlapDaemonExecutorMetrics; -import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; import org.apache.hadoop.hive.ql.exec.tez.TezProcessor; import org.apache.hadoop.io.DataInputBuffer; import org.apache.hadoop.security.Credentials; @@ -178,36 +177,30 @@ public class ContainerRunnerImpl extends CompositeService implements ContainerRu QueryIdentifier queryIdentifier = new QueryIdentifier(request.getApplicationIdString(), dagIdentifier); + Credentials credentials = new Credentials(); + DataInputBuffer dib = new DataInputBuffer(); + byte[] tokenBytes = request.getCredentialsBinary().toByteArray(); + dib.reset(tokenBytes, tokenBytes.length); + credentials.readTokenStorageStream(dib); + + Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials); + QueryFragmentInfo fragmentInfo = queryTracker - .registerFragment(queryIdentifier, request.getApplicationIdString(), fragmentSpec.getDagName(), + .registerFragment(queryIdentifier, request.getApplicationIdString(), + fragmentSpec.getDagName(), dagIdentifier, fragmentSpec.getVertexName(), fragmentSpec.getFragmentNumber(), - fragmentSpec.getAttemptNumber(), request.getUser(), request.getFragmentSpec()); + fragmentSpec.getAttemptNumber(), request.getUser(), request.getFragmentSpec(), + jobToken); String[] localDirs = fragmentInfo.getLocalDirs(); Preconditions.checkNotNull(localDirs); - if (LOG.isDebugEnabled()) { LOG.debug("Dirs are: " + Arrays.toString(localDirs)); } // May need to setup localDir for re-localization, which is usually setup as Environment.PWD. // Used for re-localization, to add the user specified configuration (conf_pb_binary_stream) - Credentials credentials = new Credentials(); - DataInputBuffer dib = new DataInputBuffer(); - byte[] tokenBytes = request.getCredentialsBinary().toByteArray(); - dib.reset(tokenBytes, tokenBytes.length); - credentials.readTokenStorageStream(dib); - - Token<JobTokenIdentifier> jobToken = TokenCache.getSessionToken(credentials); - - if (LOG.isDebugEnabled()) { - LOG.debug("Registering request with the ShuffleHandler"); - } - ShuffleHandler.get() - .registerDag(request.getApplicationIdString(), dagIdentifier, jobToken, - request.getUser(), localDirs); - TaskRunnerCallable callable = new TaskRunnerCallable(request, fragmentInfo, new Configuration(getConfig()), new LlapExecutionContext(localAddress.get().getHostName(), queryTracker), env, credentials, memoryPerExecutor, amReporter, confParams, metrics, killedTaskHandler, http://git-wip-us.apache.org/repos/asf/hive/blob/1e6fa1eb/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java index 0676edd..80264a0 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/daemon/impl/QueryTracker.java @@ -19,6 +19,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.token.Token; import org.apache.tez.common.CallableWithNdc; import org.apache.hadoop.service.AbstractService; @@ -31,6 +32,7 @@ import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.FragmentS import org.apache.hadoop.hive.llap.daemon.rpc.LlapDaemonProtocolProtos.SourceStateProto; import org.apache.hadoop.hive.llap.shufflehandler.ShuffleHandler; import org.apache.hadoop.hive.ql.exec.ObjectCacheFactory; +import org.apache.tez.common.security.JobTokenIdentifier; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -120,8 +122,8 @@ public class QueryTracker extends AbstractService { * @throws IOException */ QueryFragmentInfo registerFragment(QueryIdentifier queryIdentifier, String appIdString, String dagName, - int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, String user, - FragmentSpecProto fragmentSpec) throws IOException { + int dagIdentifier, String vertexName, int fragmentNumber, int attemptNumber, String user, + FragmentSpecProto fragmentSpec, Token<JobTokenIdentifier> appToken) throws IOException { ReadWriteLock dagLock = getDagLock(queryIdentifier); dagLock.readLock().lock(); try { @@ -132,6 +134,14 @@ public class QueryTracker extends AbstractService { getSourceCompletionMap(queryIdentifier), localDirsBase, localFs); queryInfoMap.putIfAbsent(queryIdentifier, queryInfo); } + + if (LOG.isDebugEnabled()) { + LOG.debug("Registering request for {} with the ShuffleHandler", queryIdentifier); + } + ShuffleHandler.get() + .registerDag(appIdString, dagIdentifier, appToken, + user, queryInfo.getLocalDirs()); + return queryInfo.registerFragment(vertexName, fragmentNumber, attemptNumber, fragmentSpec); } else { // Cleanup the dag lock here, since it may have been created after the query completed http://git-wip-us.apache.org/repos/asf/hive/blob/1e6fa1eb/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java index b042455..2c51169 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/shufflehandler/ShuffleHandler.java @@ -155,7 +155,7 @@ public class ShuffleHandler implements AttemptRegistrationListener { private final ReadaheadPool readaheadPool = ReadaheadPool.getInstance(); /* List of registered applications */ - private final ConcurrentMap<String, Boolean> registeredApps = new ConcurrentHashMap<String, Boolean>(); + private final ConcurrentMap<String, Integer> registeredApps = new ConcurrentHashMap<>(); /* Maps application identifiers (jobIds) to the associated user for the app */ private final ConcurrentMap<String,String> userRsrc; private JobTokenSecretManager secretManager; @@ -406,6 +406,9 @@ public class ShuffleHandler implements AttemptRegistrationListener { /** * Register an application and it's associated credentials and user information. + * + * This method and unregisterDag must be synchronized externally to prevent races in shuffle token registration/unregistration + * * @param applicationIdString * @param dagIdentifier * @param appToken @@ -414,12 +417,24 @@ public class ShuffleHandler implements AttemptRegistrationListener { public void registerDag(String applicationIdString, int dagIdentifier, Token<JobTokenIdentifier> appToken, String user, String[] appDirs) { - // TODO Fix this. There's a race here, where an app may think everything is registered, finish really fast, send events and the consumer will not find the registration. - Boolean registered = registeredApps.putIfAbsent(applicationIdString, Boolean.valueOf(true)); - if (registered == null) { - LOG.debug("Registering watches for AppDirs: appId=" + applicationIdString); + Integer registeredDagIdentifier = registeredApps.putIfAbsent(applicationIdString, dagIdentifier); + // App never seen, or previous dag has been unregistered. + if (registeredDagIdentifier == null) { recordJobShuffleInfo(applicationIdString, user, appToken); + } + // Register the new dag identifier, if that's not the one currently registered. + // Register comes in before the unregister for the previous dag + if (registeredDagIdentifier != null && !registeredDagIdentifier.equals(dagIdentifier)) { + registeredApps.put(applicationIdString, dagIdentifier); + // Don't need to recordShuffleInfo since the out of sync unregister will not remove the + // credentials + } + // First time registration, or new register comes in before the previous unregister. + if (registeredDagIdentifier == null || !registeredDagIdentifier.equals(dagIdentifier)) { if (dirWatcher != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Registering watches for AppDirs: appId={}, dagId={}", applicationIdString, dagIdentifier); + } for (String appDir : appDirs) { try { dirWatcher.registerDagDir(appDir, applicationIdString, dagIdentifier, user, @@ -432,15 +447,27 @@ public class ShuffleHandler implements AttemptRegistrationListener { } } + /** + * Unregister a specific dag + * + * This method and registerDag must be synchronized externally to prevent races in shuffle token registration/unregistration + * + * @param dir + * @param applicationIdString + * @param dagIdentifier + */ public void unregisterDag(String dir, String applicationIdString, int dagIdentifier) { + Integer currentDagIdentifier = registeredApps.get(applicationIdString); + // Unregister may come in after the new dag has started running. The methods are expected to + // be synchronized, hence the following check is sufficient. + if (currentDagIdentifier != null && currentDagIdentifier.equals(dagIdentifier)) { + registeredApps.remove(applicationIdString); + removeJobShuffleInfo(applicationIdString); + } + // Unregister for the dirWatcher for the specific dagIdentifier in either case. if (dirWatcher != null) { dirWatcher.unregisterDagDir(dir, applicationIdString, dagIdentifier); } - // TODO Cleanup registered tokens and dag info - } - - public void unregisterApplication(String applicationIdString) { - removeJobShuffleInfo(applicationIdString); } @@ -468,7 +495,7 @@ public class ShuffleHandler implements AttemptRegistrationListener { // This is in place to be compatible with the MR ShuffleHandler. Requests from ShuffleInputs // arrive with a job_ prefix. String jobIdString = appIdString.replace("application", "job"); - userRsrc.put(jobIdString, user); + userRsrc.putIfAbsent(jobIdString, user); secretManager.addTokenForJob(jobIdString, jobToken); LOG.info("Added token for " + jobIdString); }