HIVE-17387 : implement Tez AM registry in Hive (Sergey Shelukhin, reviewed by Gunther Hagleitner)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0e0c9803 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0e0c9803 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0e0c9803 Branch: refs/heads/hive-14535 Commit: 0e0c98039055983823b96db0c340a6015a8be812 Parents: 1acaf15 Author: sergey <ser...@apache.org> Authored: Mon Sep 11 20:04:45 2017 -0700 Committer: sergey <ser...@apache.org> Committed: Mon Sep 11 20:04:45 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 7 +- .../impl/LlapZookeeperRegistryImpl.java | 17 +-- .../hive/registry/impl/TezAmInstance.java | 76 ++++++++++++ .../hive/registry/impl/TezAmRegistryImpl.java | 118 +++++++++++++++++++ .../hive/registry/impl/ZkRegistryBase.java | 37 +++++- llap-tez/pom.xml | 34 ++++++ .../tezplugins/LlapTaskSchedulerService.java | 83 +++++++++---- .../endpoint/LlapPluginServerImpl.java | 11 +- .../TestLlapTaskSchedulerService.java | 1 + .../ql/exec/tez/SessionExpirationTracker.java | 3 +- .../hadoop/hive/ql/exec/tez/TezSessionPool.java | 93 +++++++++++++-- .../hive/ql/exec/tez/TezSessionPoolManager.java | 34 +++--- .../hive/ql/exec/tez/TezSessionPoolSession.java | 8 +- .../hive/ql/exec/tez/TezSessionState.java | 40 +++---- .../apache/hadoop/hive/ql/exec/tez/TezTask.java | 5 +- .../hadoop/hive/ql/session/SessionState.java | 10 +- .../hive/ql/exec/tez/SampleTezSessionState.java | 12 +- .../hive/ql/exec/tez/TestTezSessionPool.java | 14 ++- .../hadoop/hive/ql/exec/tez/TestTezTask.java | 3 +- 19 files changed, 493 insertions(+), 113 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index c9d75c0..24c5db0 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -86,7 +86,6 @@ public class HiveConf extends Configuration { private static byte[] confVarByteArray = null; - private static final Map<String, ConfVars> vars = new HashMap<String, ConfVars>(); private static final Map<String, ConfVars> metaConfs = new HashMap<String, ConfVars>(); private final List<String> restrictList = new ArrayList<String>(); @@ -3274,6 +3273,12 @@ public class HiveConf extends Configuration { "llap.daemon.communicator.num.threads"), LLAP_DAEMON_DOWNLOAD_PERMANENT_FNS("hive.llap.daemon.download.permanent.fns", false, "Whether LLAP daemon should localize the resources for permanent UDFs."), + LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME("hive.llap.task.scheduler.am.registry", "llap", + "AM registry name for LLAP task scheduler plugin to register with."), + LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL("hive.llap.task.scheduler.am.registry.principal", "", + "The name of the principal used to access ZK AM registry securely."), + LLAP_TASK_SCHEDULER_AM_REGISTRY_KEYTAB_FILE("hive.llap.task.scheduler.am.registry.keytab.file", "", + "The path to the Kerberos keytab file used to access ZK AM registry securely."), LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS( "hive.llap.task.scheduler.node.reenable.min.timeout.ms", "200ms", new TimeValidator(TimeUnit.MILLISECONDS), http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index 65f8f94..8339230 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -254,13 +254,13 @@ public class LlapZookeeperRegistryImpl this.instancesCache = cache; this.parent = parent; this.encoder = encoder; - parent.populateCache(instancesCache); + parent.populateCache(instancesCache, false); } @Override public Collection<LlapServiceInstance> getAll() { - return parent.getAll(); + return parent.getAllInternal(); } @Override @@ -281,12 +281,12 @@ public class LlapZookeeperRegistryImpl @Override public Set<LlapServiceInstance> getByHost(String host) { - return parent.getByHost(host); + return parent.getByHostInternal(host); } @Override public int size() { - return parent.size(); + return parent.sizeInternal(); } @Override @@ -402,14 +402,9 @@ public class LlapZookeeperRegistryImpl } @Override - public void start() throws IOException { - super.start(); - } - - @Override - public void stop() throws IOException { - super.stop(); + public void stop() { CloseableUtils.closeQuietly(slotZnode); + super.stop(); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java new file mode 100644 index 0000000..a71904c --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java @@ -0,0 +1,76 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.registry.impl; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.codec.binary.Base64; + +import com.google.common.io.ByteStreams; + +import org.apache.tez.common.security.JobTokenIdentifier; + +import org.apache.hadoop.security.token.Token; + +import java.io.IOException; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.types.AddressTypes; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ServiceRecord; + +public class TezAmInstance extends ServiceInstanceBase { + private static final Logger LOG = LoggerFactory.getLogger(TezAmInstance.class); + private final int pluginPort; + private Token<JobTokenIdentifier> token; + + public TezAmInstance(ServiceRecord srv) throws IOException { + super(srv, TezAmRegistryImpl.IPC_TEZCLIENT); + final Endpoint plugin = srv.getInternalEndpoint(TezAmRegistryImpl.IPC_PLUGIN); + if (plugin != null) { + this.pluginPort = Integer.parseInt(RegistryTypeUtils.getAddressField( + plugin.addresses.get(0), AddressTypes.ADDRESS_PORT_FIELD)); + } else { + this.pluginPort = -1; + } + } + + public int getPluginPort() { + return pluginPort; + } + + public String getSessionId() { + return getProperties().get(TezAmRegistryImpl.AM_SESSION_ID); + } + + public String getJobIdForPluginToken() { + return getProperties().get(TezAmRegistryImpl.AM_PLUGIN_JOBID); + } + + public Token<JobTokenIdentifier> getPluginToken() { + if (this.token != null) return token; + String tokenString = getProperties().get(TezAmRegistryImpl.AM_PLUGIN_TOKEN); + if (tokenString == null || tokenString.isEmpty()) return null; + byte[] tokenBytes = Base64.decodeBase64(tokenString); + Token<JobTokenIdentifier> token = new Token<>(); + try { + token.readFields(ByteStreams.newDataInput(tokenBytes)); + } catch (IOException e) { + LOG.error("Couldn't read the plugin token from [" + tokenString + "]", e); + return null; + } + this.token = token; + return token; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java new file mode 100644 index 0000000..417e571 --- /dev/null +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java @@ -0,0 +1,118 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.registry.impl; + +import org.apache.commons.lang3.StringUtils; + +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Collection; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; +import org.apache.hadoop.registry.client.binding.RegistryTypeUtils; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.types.Endpoint; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class TezAmRegistryImpl extends ZkRegistryBase<TezAmInstance> { + private static final Logger LOG = LoggerFactory.getLogger(TezAmRegistryImpl.class); + + static final String IPC_TEZCLIENT = "tez-client"; + static final String IPC_PLUGIN = "llap-plugin"; + static final String AM_SESSION_ID = "am.session.id", AM_PLUGIN_TOKEN = "am.plugin.token", + AM_PLUGIN_JOBID = "am.plugin.jobid"; + private final static String NAMESPACE_PREFIX = "tez-am-"; + private final static String USER_SCOPE_PATH_PREFIX = "user-"; + private static final String WORKER_PREFIX = "worker-"; + private static final String SASL_LOGIN_CONTEXT_NAME = "TezAmZooKeeperClient"; + + private final String registryName; + + public static TezAmRegistryImpl create(Configuration conf, boolean b) { + String amRegistryName = HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME); + return StringUtils.isBlank(amRegistryName) ? null + : new TezAmRegistryImpl(amRegistryName, conf, true); + } + + + private TezAmRegistryImpl(String instanceName, Configuration conf, boolean useSecureZk) { + super(instanceName, conf, null, NAMESPACE_PREFIX, USER_SCOPE_PATH_PREFIX, WORKER_PREFIX, + useSecureZk ? SASL_LOGIN_CONTEXT_NAME : null, + HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL), + HiveConf.getVar(conf, ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_KEYTAB_FILE), + null); // Always validate ACLs + this.registryName = instanceName; + LOG.info("AM Zookeeper Registry is enabled with registryid: " + instanceName); + } + + public void initializeWithoutRegistering() throws IOException { + initializeWithoutRegisteringInternal(); + } + + public void populateCache(boolean doInvokeListeners) throws IOException { + PathChildrenCache pcc = ensureInstancesCache(0); + populateCache(pcc, doInvokeListeners); + } + + public String register(int amPort, int pluginPort, String sessionId, + String serializedToken, String jobIdForToken) throws IOException { + ServiceRecord srv = new ServiceRecord(); + Endpoint rpcEndpoint = RegistryTypeUtils.ipcEndpoint( + IPC_TEZCLIENT, new InetSocketAddress(hostname, amPort)); + srv.addInternalEndpoint(rpcEndpoint); + Endpoint pluginEndpoint = null; + if (pluginPort >= 0) { + pluginEndpoint = RegistryTypeUtils.ipcEndpoint( + IPC_PLUGIN, new InetSocketAddress(hostname, pluginPort)); + srv.addInternalEndpoint(pluginEndpoint); + } + srv.set(AM_SESSION_ID, sessionId); + boolean hasToken = serializedToken != null; + srv.set(AM_PLUGIN_TOKEN, hasToken ? serializedToken : ""); + srv.set(AM_PLUGIN_JOBID, jobIdForToken != null ? jobIdForToken : ""); + String uniqueId = registerServiceRecord(srv); + LOG.info("Registered this AM: rpc: {}, plugin: {}, sessionId: {}, token: {}, znodePath: {}", + rpcEndpoint, pluginEndpoint, sessionId, hasToken, getRegistrationZnodePath()); + return uniqueId; + } + + public TezAmInstance getInstance(String name) { + Collection<TezAmInstance> instances = getAllInternal(); + for(TezAmInstance instance : instances) { + if (instance.getWorkerIdentity().equals(name)) { + return instance; + } + } + return null; + } + + @Override + protected TezAmInstance createServiceInstance(ServiceRecord srv) throws IOException { + return new TezAmInstance(srv); + } + + @Override + protected String getZkPathUser(Configuration conf) { + // We assume that AMs and HS2 run under the same user. + return RegistryUtils.currentUser(); + } + + public String getRegistryName() { + return registryName; + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java index c773770..b6773b5 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java +++ b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java @@ -58,6 +58,7 @@ import org.apache.hadoop.registry.client.types.ServiceRecord; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.zookeeper.KeeperException.InvalidACLException; +import org.apache.zookeeper.KeeperException.NodeExistsException; import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; @@ -278,6 +279,27 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { } + protected final void initializeWithoutRegisteringInternal() throws IOException { + // Create a znode under the rootNamespace parent for this instance of the server + try { + try { + zooKeeperClient.create().creatingParentsIfNeeded().forPath(workersPath); + } catch (NodeExistsException ex) { + // Ignore - this is expected. + } + if (doCheckAcls) { + try { + checkAndSetAcls(); + } catch (Exception ex) { + throw new IOException("Error validating or setting ACLs. " + disableMessage, ex); + } + } + } catch (Exception e) { + LOG.error("Unable to create a parent znode for the registry", e); + throw (e instanceof IOException) ? (IOException)e : new IOException(e); + } + } + private void checkAndSetAcls() throws Exception { if (!UserGroupInformation.isSecurityEnabled()) return; // We are trying to check ACLs on the "workers" directory, which noone except us should be @@ -356,7 +378,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { instanceSet.add(instance); } - protected final void populateCache(PathChildrenCache instancesCache) { + protected final void populateCache(PathChildrenCache instancesCache, boolean doInvokeListeners) { for (ChildData childData : instancesCache.getCurrentData()) { byte[] data = getWorkerData(childData, workerNodePrefix); if (data == null) continue; @@ -364,6 +386,11 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { ServiceRecord srv = encoder.fromBytes(childData.getPath(), data); InstanceType instance = createServiceInstance(srv); addToCache(childData.getPath(), instance.getHost(), instance); + if (doInvokeListeners) { + for (ServiceInstanceStateChangeListener<InstanceType> listener : stateChangeListeners) { + listener.onCreate(instance); + } + } } catch (IOException e) { LOG.error("Unable to decode data for zkpath: {}." + " Ignoring from current instances list..", childData.getPath()); @@ -426,12 +453,12 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { // The real implementation for the instanceset... instanceset has its own copy of the // ZK cache yet completely depends on the parent in every other aspect and is thus unneeded. - public int size() { + protected final int sizeInternal() { // not using the path child cache here as there could be more than 1 path per host (worker and slot znodes) return nodeToInstanceCache.size(); } - protected final Set<InstanceType> getByHost(String host) { + protected final Set<InstanceType> getByHostInternal(String host) { Set<InstanceType> byHost = nodeToInstanceCache.get(host); byHost = (byHost == null) ? Sets.<InstanceType>newHashSet() : byHost; if (LOG.isDebugEnabled()) { @@ -440,7 +467,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { return byHost; } - protected final Collection<InstanceType> getAll() { + protected final Collection<InstanceType> getAllInternal() { Set<InstanceType> instances = new HashSet<>(); for(Set<InstanceType> instanceSet : pathToInstanceCache.values()) { instances.addAll(instanceSet); @@ -533,7 +560,7 @@ public abstract class ZkRegistryBase<InstanceType extends ServiceInstance> { CloseableUtils.class.getName(); } - public void stop() throws IOException { + public void stop() { CloseableUtils.closeQuietly(znode); CloseableUtils.closeQuietly(instancesCache); CloseableUtils.closeQuietly(zooKeeperClient); http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/llap-tez/pom.xml ---------------------------------------------------------------------- diff --git a/llap-tez/pom.xml b/llap-tez/pom.xml index 1e5b235..69fbea3 100644 --- a/llap-tez/pom.xml +++ b/llap-tez/pom.xml @@ -46,6 +46,28 @@ </dependency> <!-- inter-project --> <dependency> + <groupId>org.apache.zookeeper</groupId> + <artifactId>zookeeper</artifactId> + <version>${zookeeper.version}</version> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-framework</artifactId> + <version>${curator.version}</version> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>apache-curator</artifactId> + <version>${curator.version}</version> + <type>pom</type> + </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-test</artifactId> + <version>${curator.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>${commons-lang3.version}</version> @@ -67,6 +89,18 @@ </exclusions> </dependency> <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + <optional>true</optional> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-yarn-registry</artifactId> + <version>${hadoop.version}</version> + <optional>true</optional> + </dependency> + <dependency> <groupId>org.apache.tez</groupId> <artifactId>tez-api</artifactId> <version>${tez.version}</version> http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java index cf8bd46..26747fc 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java @@ -14,6 +14,13 @@ package org.apache.hadoop.hive.llap.tezplugins; +import com.google.common.io.ByteArrayDataOutput; + +import org.apache.hadoop.io.Text; +import org.apache.hadoop.yarn.api.records.ApplicationId; + +import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl; + import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; import java.io.IOException; @@ -217,6 +224,7 @@ public class LlapTaskSchedulerService extends TaskScheduler { private final AtomicInteger assignedTaskCounter = new AtomicInteger(0); private final LlapRegistryService registry = new LlapRegistryService(false); + private final TezAmRegistryImpl amRegistry; private volatile ListenableFuture<Void> nodeEnablerFuture; private volatile ListenableFuture<Void> delayedTaskSchedulerFuture; @@ -236,6 +244,8 @@ public class LlapTaskSchedulerService extends TaskScheduler { private int totalGuaranteed = 0, unusedGuaranteed = 0; private LlapTaskCommunicator communicator; + private final int amPort; + private final String serializedToken, jobIdForToken; public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) { this(taskSchedulerContext, new MonotonicClock(), true); @@ -245,17 +255,13 @@ public class LlapTaskSchedulerService extends TaskScheduler { // The fields that HS2 uses to give AM information about plugin endpoint. /** Whether to enable the endpoint. */ public static final String LLAP_PLUGIN_ENDPOINT_ENABLED = "llap.plugin.endpoint.enabled"; - /** The fake job ID generated by HS2 to use for the job token. */ - public static final String LLAP_PLUGIN_ENDPOINT_JOBID = "llap.plugin.endpoint.jobid"; - /** The job token generated by HS2 for the endpoint. See the comment at the place where - * this is used - the token will be generated by AM after we have AM registry. */ - public static final String LLAP_PLUGIN_ENDPOINT_TOKEN = "llap.plugin.endpoint.token"; @VisibleForTesting public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, Clock clock, boolean initMetrics) { super(taskSchedulerContext); this.clock = clock; + this.amPort = taskSchedulerContext.getAppClientPort(); this.delayedTaskSchedulerCallable = createDelayedTaskSchedulerCallable(); try { this.conf = TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload()); @@ -267,26 +273,24 @@ public class LlapTaskSchedulerService extends TaskScheduler { if (conf.getBoolean(LLAP_PLUGIN_ENDPOINT_ENABLED, false)) { JobTokenSecretManager sm = null; if (UserGroupInformation.isSecurityEnabled()) { + // Set up the security for plugin endpoint. + // We will create the token and publish it in the AM registry. + // Note: this application ID is bogus and is only needed for JobTokenSecretManager. + ApplicationId id = ApplicationId.newInstance( + System.nanoTime(), (int)(System.nanoTime() % 100000)); + Token<JobTokenIdentifier> token = createAmsToken(id); + serializedToken = serializeToken(token); + jobIdForToken = token.getService().toString(); sm = new JobTokenSecretManager(); - // Note: for HA/AMRegistry, this token will have to be published in a secure location (ZK). - // Otherwise, when HS2 dies, noone will be able to talk to this AM. - // We could generate it in AM in the first place, but then even the original HS2 - // won't be able to talk to us before we register. Which might be ok. - // For now, there's no mechanism to coordinate this, so we email the token from HS2. - byte[] tokenBytes = Base64.decodeBase64(conf.get(LLAP_PLUGIN_ENDPOINT_TOKEN)); - Token<JobTokenIdentifier> token = new Token<>(); - try { - token.readFields(ByteStreams.newDataInput(tokenBytes)); - } catch (IOException e) { - // This shouldn't really happen on a byte array. - throw new RuntimeException(e); - } - sm.addTokenForJob(conf.get(LLAP_PLUGIN_ENDPOINT_JOBID), token); + sm.addTokenForJob(jobIdForToken, token); + } else { + serializedToken = jobIdForToken = null; } pluginEndpoint = new LlapPluginServerImpl(sm, HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_RPC_NUM_HANDLERS), HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_RPC_PORT), this); } else { + serializedToken = jobIdForToken = null; pluginEndpoint = null; } // This is called once per AM, so we don't get the starting duck count here. @@ -358,9 +362,11 @@ public class LlapTaskSchedulerService extends TaskScheduler { } String hostsString = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS); - LOG.info( - "Running with configuration: hosts={}, numSchedulableTasksPerNode={}, nodeBlacklistConf={}, localityConf={}", + LOG.info("Running with configuration: hosts={}, numSchedulableTasksPerNode={}, " + + "nodeBlacklistConf={}, localityConf={}", hostsString, numSchedulableTasksPerNode, nodeBlacklistConf, localityDelayConf); + this.amRegistry = TezAmRegistryImpl.create(conf, true); + LlapTaskCommunicator peer = LlapTaskCommunicator.instance.get(); if (peer != null) { @@ -372,6 +378,29 @@ public class LlapTaskSchedulerService extends TaskScheduler { } } + private static Token<JobTokenIdentifier> createAmsToken(ApplicationId id) { + if (!UserGroupInformation.isSecurityEnabled()) return null; + JobTokenIdentifier identifier = new JobTokenIdentifier(new Text(id.toString())); + JobTokenSecretManager jobTokenManager = new JobTokenSecretManager(); + Token<JobTokenIdentifier> sessionToken = new Token<>(identifier, jobTokenManager); + sessionToken.setService(identifier.getJobId()); + return sessionToken; + } + + private static String serializeToken(Token<JobTokenIdentifier> token) { + byte[] bytes = null; + try { + ByteArrayDataOutput out = ByteStreams.newDataOutput(); + token.write(out); + bytes = out.toByteArray(); + } catch (IOException e) { + // This shouldn't really happen on a byte array. + throw new RuntimeException(e); + } + return Base64.encodeBase64String(bytes); + } + + @VisibleForTesting void updateGuaranteedCount(int newTotalGuaranteed) { List<TaskInfo> toUpdate = null; @@ -520,6 +549,9 @@ public class LlapTaskSchedulerService extends TaskScheduler { @Override public void initialize() { registry.init(conf); + if (pluginEndpoint != null) { + pluginEndpoint.init(conf); + } } @Override @@ -562,6 +594,12 @@ public class LlapTaskSchedulerService extends TaskScheduler { addNode(new NodeInfo(inst, nodeBlacklistConf, clock, numSchedulableTasksPerNode, metrics), inst); } + if (amRegistry != null) { + amRegistry.start(); + int pluginPort = pluginEndpoint != null ? pluginEndpoint.getActualPort() : -1; + amRegistry.register(amPort, pluginPort, HiveConf.getVar(conf, ConfVars.HIVESESSIONID), + serializedToken, jobIdForToken); + } } finally { writeLock.unlock(); } @@ -682,6 +720,9 @@ public class LlapTaskSchedulerService extends TaskScheduler { if (registry != null) { registry.stop(); } + if (amRegistry != null) { + amRegistry.stop(); + } if (pluginEndpoint != null) { pluginEndpoint.stop(); http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java index f3c0d52..4d5333f 100644 --- a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java +++ b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java @@ -43,6 +43,7 @@ public class LlapPluginServerImpl extends AbstractService implements LlapPluginP private final SecretManager<JobTokenIdentifier> secretManager; private final int numHandlers; private final LlapTaskSchedulerService parent; + private final AtomicReference<InetSocketAddress> bindAddress = new AtomicReference<>(); public LlapPluginServerImpl(SecretManager<JobTokenIdentifier> secretManager, int numHandlers, int port, LlapTaskSchedulerService parent) { @@ -66,7 +67,7 @@ public class LlapPluginServerImpl extends AbstractService implements LlapPluginP final Configuration conf = getConfig(); final BlockingService daemonImpl = LlapPluginProtocolProtos.LlapPluginProtocol.newReflectiveBlockingService(this); - server = LlapUtil.startProtocolServer(port, numHandlers, null, conf, daemonImpl, + server = LlapUtil.startProtocolServer(port, numHandlers, bindAddress , conf, daemonImpl, LlapPluginProtocolPB.class, secretManager, new LlapPluginPolicyProvider(), ConfVars.LLAP_PLUGIN_ACL, ConfVars.LLAP_PLUGIN_ACL_DENY); } @@ -77,4 +78,12 @@ public class LlapPluginServerImpl extends AbstractService implements LlapPluginP server.stop(); } } + + public int getActualPort() { + InetSocketAddress bindAddress = this.bindAddress.get(); + if (bindAddress == null) { + throw new RuntimeException("Cannot get port before the service is started"); + } + return bindAddress.getPort(); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java ---------------------------------------------------------------------- diff --git a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java index 156e620..51d2e08 100644 --- a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java +++ b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java @@ -1938,6 +1938,7 @@ public class TestLlapTaskSchedulerService { nodeDisableTimeoutMillis + "ms"); conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, false); conf.setLong(ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY.varname, localityDelayMs); + conf.set(ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME.varname, ""); doReturn(appAttemptId).when(mockAppCallback).getApplicationAttemptId(); doReturn(11111l).when(mockAppCallback).getCustomClusterIdentifier(); http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java index 8bee77e..da93a3a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java @@ -45,6 +45,7 @@ class SessionExpirationTracker { private final long sessionLifetimeMs; private final long sessionLifetimeJitterMs; private final RestartImpl sessionRestartImpl; + private volatile SessionState initSessionState; interface RestartImpl { void closeAndReopenPoolSession(TezSessionPoolSession session) throws Exception; @@ -68,7 +69,6 @@ class SessionExpirationTracker { LOG.debug("Session expiration is enabled; session lifetime is " + sessionLifetimeMs + " + [0, " + sessionLifetimeJitterMs + ") ms"); } - final SessionState initSessionState = SessionState.get(); expirationQueue = new PriorityBlockingQueue<>(11, new Comparator<TezSessionPoolSession>() { @Override public int compare(TezSessionPoolSession o1, TezSessionPoolSession o2) { @@ -179,6 +179,7 @@ class SessionExpirationTracker { public void start() { + initSessionState = SessionState.get(); expirationThread.start(); restartThread.start(); } http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java index 4f58565..6e8122d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java @@ -17,6 +17,14 @@ */ package org.apache.hadoop.hive.ql.exec.tez; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService; + +import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener; +import org.apache.hadoop.hive.registry.impl.TezAmInstance; +import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl; + import java.io.IOException; import java.util.Queue; import java.util.Set; @@ -48,14 +56,31 @@ class TezSessionPool { private final HiveConf initConf; private final BlockingDeque<TezSessionPoolSession> defaultQueuePool; - TezSessionPool(HiveConf initConf, int numSessionsTotal) { + private final String amRegistryName; + private final TezAmRegistryImpl amRegistry; + + private final ConcurrentHashMap<String, TezSessionPoolSession> bySessionId = + new ConcurrentHashMap<>(); + + + TezSessionPool(HiveConf initConf, int numSessionsTotal, boolean useAmRegistryIfPresent) { this.initConf = initConf; assert numSessionsTotal > 0; defaultQueuePool = new LinkedBlockingDeque<TezSessionPoolSession>(numSessionsTotal); + this.amRegistry = useAmRegistryIfPresent ? TezAmRegistryImpl.create(initConf, true) : null; + this.amRegistryName = amRegistry == null ? null : amRegistry.getRegistryName(); } void startInitialSessions() throws Exception { if (initialSessions.isEmpty()) return; + if (amRegistry != null) { + amRegistry.start(); + amRegistry.initializeWithoutRegistering(); + // Note: we may later have special logic to pick up old AMs, if any. + amRegistry.registerStateChangeListener(new ChangeListener()); + amRegistry.populateCache(true); + } + int threadCount = Math.min(initialSessions.size(), HiveConf.getIntVar(initConf, ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS)); Preconditions.checkArgument(threadCount > 0); @@ -134,7 +159,6 @@ class TezSessionPool { // Re-setting the queue config is an old hack that we may remove in future. Path scratchDir = oldSession.getTezScratchDir(); Set<String> additionalFiles = oldSession.getAdditionalFilesNotFromConf(); - HiveConf conf = oldSession.getConf(); String queueName = oldSession.getQueueName(); try { oldSession.close(false); @@ -142,24 +166,67 @@ class TezSessionPool { if (!wasRemoved) { LOG.error("Old session was closed but it was not in the pool", oldSession); } + bySessionId.remove(oldSession.getSessionId()); } finally { // There's some bogus code that can modify the queue name. Force-set it for pool sessions. // TODO: this might only be applicable to TezSessionPoolManager; try moving it there? - conf.set(TezConfiguration.TEZ_QUEUE_NAME, queueName); - newSession.open(conf, additionalFiles, scratchDir); + newSession.getConf().set(TezConfiguration.TEZ_QUEUE_NAME, queueName); + // The caller probably created the new session with the old config, but update the + // registry again just in case. TODO: maybe we should enforce that. + configureAmRegistry(newSession); + newSession.open(additionalFiles, scratchDir); defaultQueuePool.put(newSession); } } - private void startInitialSession(TezSessionPoolSession sessionState) throws Exception { - HiveConf newConf = new HiveConf(initConf); - // Makes no senses for it to be mixed up like this. - boolean isUsable = sessionState.tryUse(); - if (!isUsable) throw new IOException(sessionState + " is not usable at pool startup"); - newConf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionState.getQueueName()); - sessionState.open(newConf); - if (sessionState.returnAfterUse()) { - defaultQueuePool.put(sessionState); + private void startInitialSession(TezSessionPoolSession session) throws Exception { + boolean isUsable = session.tryUse(); + if (!isUsable) throw new IOException(session + " is not usable at pool startup"); + session.getConf().set(TezConfiguration.TEZ_QUEUE_NAME, session.getQueueName()); + configureAmRegistry(session); + session.open(); + if (session.returnAfterUse()) { + defaultQueuePool.put(session); + } + } + + private void configureAmRegistry(TezSessionPoolSession session) { + if (amRegistryName != null) { + bySessionId.put(session.getSessionId(), session); + HiveConf conf = session.getConf(); + conf.set(ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME.varname, amRegistryName); + conf.set(ConfVars.HIVESESSIONID.varname, session.getSessionId()); + // TODO: can be enable temporarily for testing + // conf.set(LlapTaskSchedulerService.LLAP_PLUGIN_ENDPOINT_ENABLED, "true"); + } + } + + + private final class ChangeListener + implements ServiceInstanceStateChangeListener<TezAmInstance> { + + @Override + public void onCreate(TezAmInstance serviceInstance) { + String sessionId = serviceInstance.getSessionId(); + TezSessionPoolSession session = bySessionId.get(sessionId); + LOG.warn("AM for " + sessionId + " has registered; updating [" + session + + "] with an endpoint at " + serviceInstance.getPluginPort()); + // TODO: actually update the session once WM is committed + } + + @Override + public void onUpdate(TezAmInstance serviceInstance) { + // Presumably we'd get those later if AM updates its stuff. + LOG.warn("Received an unexpected update for instance={}. Ignoring", serviceInstance); + } + + @Override + public void onRemove(TezAmInstance serviceInstance) { + String sessionId = serviceInstance.getSessionId(); + // For now, we don't take any action. In future, we might restore the session based + // on this and get rid of the logic outside of the pool that replaces/reopens/etc. + LOG.warn("AM for " + sessionId + " has disappeared from the registry"); + bySessionId.remove(sessionId); } } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 1f4705c..9f72155 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -114,7 +114,9 @@ public class TezSessionPoolManager int numSessions = conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE); int numSessionsTotal = numSessions * (defaultQueueList.length - emptyNames); if (numSessionsTotal > 0) { - defaultSessionPool = new TezSessionPool(initConf, numSessionsTotal); + // TODO: this can be enabled to test. Will only be used in WM case for now. + boolean enableAmRegistry = false; + defaultSessionPool = new TezSessionPool(initConf, numSessionsTotal, enableAmRegistry); } numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES); @@ -152,14 +154,16 @@ public class TezSessionPoolManager if (queueName.isEmpty()) { continue; } - defaultSessionPool.addInitialSession(createAndInitSession(queueName, true)); + HiveConf sessionConf = new HiveConf(initConf); + defaultSessionPool.addInitialSession(createAndInitSession(queueName, true, sessionConf)); } } } // TODO Create and init session sets up queue, isDefault - but does not initialize the configuration - private TezSessionPoolSession createAndInitSession(String queue, boolean isDefault) { - TezSessionPoolSession sessionState = createSession(TezSessionState.makeSessionId()); + private TezSessionPoolSession createAndInitSession( + String queue, boolean isDefault, HiveConf conf) { + TezSessionPoolSession sessionState = createSession(TezSessionState.makeSessionId(), conf); // TODO When will the queue ever be null. // Pass queue and default in as constructor parameters, and make them final. if (queue != null) { @@ -233,12 +237,12 @@ public class TezSessionPoolManager */ private TezSessionState getNewSessionState(HiveConf conf, String queueName, boolean doOpen) throws Exception { - TezSessionPoolSession retTezSessionState = createAndInitSession(queueName, false); + TezSessionPoolSession retTezSessionState = createAndInitSession(queueName, false, conf); if (queueName != null) { conf.set(TezConfiguration.TEZ_QUEUE_NAME, queueName); } if (doOpen) { - retTezSessionState.open(conf); + retTezSessionState.open(); LOG.info("Started a new session for queue: " + queueName + " session id: " + retTezSessionState.getSessionId()); } @@ -316,8 +320,8 @@ public class TezSessionPoolManager tezSessionState.close(false); } - protected TezSessionPoolSession createSession(String sessionId) { - return new TezSessionPoolSession(sessionId, this, expirationTracker); + protected TezSessionPoolSession createSession(String sessionId, HiveConf conf) { + return new TezSessionPoolSession(sessionId, this, expirationTracker, conf); } /* @@ -386,13 +390,8 @@ public class TezSessionPoolManager /** Reopens the session that was found to not be running. */ public void reopenSession(TezSessionState sessionState, Configuration conf) throws Exception { HiveConf sessionConf = sessionState.getConf(); - // TODO: when will sessionConf be null, other than tests? Set in open. Throw? - if (sessionConf == null) { - LOG.warn("Session configuration is null for " + sessionState); - // default queue name when the initial session was created - sessionConf = new HiveConf(conf, TezSessionPoolManager.class); - } - if (sessionState.getQueueName() != null && sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) == null) { + if (sessionState.getQueueName() != null + && sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) == null) { sessionConf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionState.getQueueName()); } Set<String> oldAdditionalFiles = sessionState.getAdditionalFilesNotFromConf(); @@ -401,7 +400,7 @@ public class TezSessionPoolManager // Close the old one, but keep the tmp files around. sessionState.close(true); // TODO: should we reuse scratchDir too? - sessionState.open(sessionConf, oldAdditionalFiles, null); + sessionState.open(oldAdditionalFiles, null); } public void closeNonDefaultSessions(boolean keepTmpDir) throws Exception { @@ -422,7 +421,8 @@ public class TezSessionPoolManager if (queueName == null) { LOG.warn("Pool session has a null queue: " + oldSession); } - TezSessionPoolSession newSession = createAndInitSession(queueName, oldSession.isDefault()); + TezSessionPoolSession newSession = createAndInitSession( + queueName, oldSession.isDefault(), oldSession.getConf()); defaultSessionPool.replaceSession(oldSession, newSession); } http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java index 005eeed..8ecdbbf 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java @@ -56,8 +56,8 @@ class TezSessionPoolSession extends TezSessionState { private final SessionExpirationTracker expirationTracker; public TezSessionPoolSession(String sessionId, OpenSessionTracker parent, - SessionExpirationTracker expirationTracker) { - super(sessionId); + SessionExpirationTracker expirationTracker, HiveConf conf) { + super(sessionId, conf); this.parent = parent; this.expirationTracker = expirationTracker; } @@ -83,10 +83,10 @@ class TezSessionPoolSession extends TezSessionState { } @Override - protected void openInternal(HiveConf conf, Collection<String> additionalFiles, + protected void openInternal(Collection<String> additionalFiles, boolean isAsync, LogHelper console, Path scratchDir) throws IOException, LoginException, URISyntaxException, TezException { - super.openInternal(conf, additionalFiles, isAsync, console, scratchDir); + super.openInternal(additionalFiles, isAsync, console, scratchDir); parent.registerOpenSession(this); if (expirationTracker != null) { expirationTracker.addToExpirationQueue(this); http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java index fe5c6a1..170de21 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java @@ -93,7 +93,7 @@ public class TezSessionState { private static final String LLAP_LAUNCHER = LlapContainerLauncher.class.getName(); private static final String LLAP_TASK_COMMUNICATOR = LlapTaskCommunicator.class.getName(); - private HiveConf conf; + private final HiveConf conf; private Path tezScratchDir; private LocalResource appJarLr; private TezClient session; @@ -116,8 +116,9 @@ public class TezSessionState { * Constructor. We do not automatically connect, because we only want to * load tez classes when the user has tez installed. */ - public TezSessionState(DagUtils utils) { + public TezSessionState(DagUtils utils, HiveConf conf) { this.utils = utils; + this.conf = conf; } public String toString() { @@ -129,8 +130,8 @@ public class TezSessionState { * Constructor. We do not automatically connect, because we only want to * load tez classes when the user has tez installed. */ - public TezSessionState(String sessionId) { - this(DagUtils.getInstance()); + public TezSessionState(String sessionId, HiveConf conf) { + this(DagUtils.getInstance(), conf); this.sessionId = sessionId; } @@ -176,10 +177,9 @@ public class TezSessionState { return UUID.randomUUID().toString(); } - public void open(HiveConf conf) - throws IOException, LoginException, URISyntaxException, TezException { + public void open() throws IOException, LoginException, URISyntaxException, TezException { Set<String> noFiles = null; - open(conf, noFiles, null); + open(noFiles, null); } /** @@ -191,9 +191,9 @@ public class TezSessionState { * @throws TezException * @throws InterruptedException */ - public void open(HiveConf conf, String[] additionalFiles) - throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException { - openInternal(conf, setFromArray(additionalFiles), false, null, null); + public void open(String[] additionalFiles) + throws IOException, LoginException, URISyntaxException, TezException { + openInternal(setFromArray(additionalFiles), false, null, null); } private static Set<String> setFromArray(String[] additionalFiles) { @@ -205,20 +205,19 @@ public class TezSessionState { return files; } - public void beginOpen(HiveConf conf, String[] additionalFiles, LogHelper console) - throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException { - openInternal(conf, setFromArray(additionalFiles), true, console, null); + public void beginOpen(String[] additionalFiles, LogHelper console) + throws IOException, LoginException, URISyntaxException, TezException { + openInternal(setFromArray(additionalFiles), true, console, null); } - public void open(HiveConf conf, Collection<String> additionalFiles, Path scratchDir) + public void open(Collection<String> additionalFiles, Path scratchDir) throws LoginException, IOException, URISyntaxException, TezException { - openInternal(conf, additionalFiles, false, null, scratchDir); + openInternal(additionalFiles, false, null, scratchDir); } - protected void openInternal(final HiveConf conf, Collection<String> additionalFiles, - boolean isAsync, LogHelper console, Path scratchDir) throws IOException, LoginException, - IllegalArgumentException, URISyntaxException, TezException { - this.conf = conf; + protected void openInternal(Collection<String> additionalFiles, + boolean isAsync, LogHelper console, Path scratchDir) + throws IOException, LoginException, URISyntaxException, TezException { // TODO Why is the queue name set again. It has already been setup via setQueueName. Do only one of the two. String confQueueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME); if (queueName != null && !queueName.equals(confQueueName)) { @@ -464,7 +463,7 @@ public class TezSessionState { } public void refreshLocalResourcesFromConf(HiveConf conf) - throws IOException, LoginException, IllegalArgumentException, URISyntaxException, TezException { + throws IOException, LoginException, URISyntaxException, TezException { String dir = tezScratchDir.toString(); @@ -531,7 +530,6 @@ public class TezSessionState { sessionFuture = null; console = null; tezScratchDir = null; - conf = null; appJarLr = null; additionalFilesNotFromConf.clear(); localizedResources.clear(); http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index f1f1028..e6e236d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@ -339,10 +339,9 @@ public class TezTask extends Task<TezWork> { TezClient client = session.getSession(); // TODO null can also mean that this operation was interrupted. Should we really try to re-create the session in that case ? if (client == null) { - // can happen if the user sets the tez flag after the session was - // established + // Can happen if the user sets the tez flag after the session was established. LOG.info("Tez session hasn't been created yet. Opening session"); - session.open(conf, inputOutputJars); + session.open(inputOutputJars); } else { LOG.info("Session is already open"); http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java index d7592bb..8b64407 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java @@ -627,7 +627,11 @@ public class SessionState { try { if (startSs.tezSessionState == null) { - startSs.setTezSession(new TezSessionState(startSs.getSessionId())); + startSs.setTezSession(new TezSessionState(startSs.getSessionId(), startSs.sessionConf)); + } else { + // Only TezTask sets this, and then removes when done, so we don't expect to see it. + LOG.warn("Tez session was already present in SessionState before start: " + + startSs.tezSessionState); } if (startSs.tezSessionState.isOpen()) { return; @@ -640,9 +644,9 @@ public class SessionState { } // Neither open nor opening. if (!isAsync) { - startSs.tezSessionState.open(startSs.sessionConf); // should use conf on session start-up + startSs.tezSessionState.open(); } else { - startSs.tezSessionState.beginOpen(startSs.sessionConf, null, console); + startSs.tezSessionState.beginOpen(null, console); } } catch (Exception e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java index 973c0cc..4e5d991 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java @@ -39,13 +39,14 @@ public class SampleTezSessionState extends TezSessionPoolSession { private boolean open; private final String sessionId; - private HiveConf hiveConf; + private final HiveConf hiveConf; private String user; private boolean doAsEnabled; - public SampleTezSessionState(String sessionId, TezSessionPoolManager parent) { - super(sessionId, parent, parent.getExpirationTracker()); + public SampleTezSessionState(String sessionId, TezSessionPoolManager parent, HiveConf conf) { + super(sessionId, parent, parent.getExpirationTracker(), conf); this.sessionId = sessionId; + this.hiveConf = conf; } @Override @@ -58,12 +59,11 @@ public class SampleTezSessionState extends TezSessionPoolSession { } @Override - public void open(HiveConf conf) throws IOException, LoginException, URISyntaxException, + public void open() throws IOException, LoginException, URISyntaxException, TezException { - this.hiveConf = conf; UserGroupInformation ugi = Utils.getUGI(); user = ugi.getShortUserName(); - this.doAsEnabled = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); + this.doAsEnabled = hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); setOpen(true); } http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index d2b98c4..5e1e68c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -49,8 +49,14 @@ public class TestTezSessionPool { } @Override - public TezSessionPoolSession createSession(String sessionId) { - return new SampleTezSessionState(sessionId, this); + public void setupPool(HiveConf conf) throws InterruptedException { + conf.setVar(ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME, ""); + super.setupPool(conf); + } + + @Override + public TezSessionPoolSession createSession(String sessionId, HiveConf conf) { + return new SampleTezSessionState(sessionId, this, conf); } } @@ -188,7 +194,7 @@ public class TestTezSessionPool { poolManager.reopenSession(session, conf); Mockito.verify(session).close(true); - Mockito.verify(session).open(conf, new HashSet<String>(), null); + Mockito.verify(session).open(new HashSet<String>(), null); // mocked session starts with default queue assertEquals("default", session.getQueueName()); @@ -325,7 +331,7 @@ public class TestTezSessionPool { poolManager.reopenSession(session, conf); Mockito.verify(session).close(true); - Mockito.verify(session).open(conf, new HashSet<String>(), null); + Mockito.verify(session).open(new HashSet<String>(), null); } @Test http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java index 176692b..9b9eead 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java @@ -227,8 +227,7 @@ public class TestTezTask { task.submit(conf, dag, path, appLr, sessionState, Collections.<LocalResource> emptyList(), new String[0], Collections.<String,LocalResource> emptyMap()); // validate close/reopen - verify(sessionState, times(1)).open( - any(HiveConf.class), any(Collection.class), any(Path.class)); + verify(sessionState, times(1)).open(any(Collection.class), any(Path.class)); verify(sessionState, times(1)).close(eq(true)); // now uses pool after HIVE-7043 verify(session, times(2)).submitDAG(any(DAG.class)); }