HIVE-17387 : implement Tez AM registry in Hive (Sergey Shelukhin, reviewed by 
Gunther Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/0e0c9803
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/0e0c9803
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/0e0c9803

Branch: refs/heads/hive-14535
Commit: 0e0c98039055983823b96db0c340a6015a8be812
Parents: 1acaf15
Author: sergey <ser...@apache.org>
Authored: Mon Sep 11 20:04:45 2017 -0700
Committer: sergey <ser...@apache.org>
Committed: Mon Sep 11 20:04:45 2017 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   7 +-
 .../impl/LlapZookeeperRegistryImpl.java         |  17 +--
 .../hive/registry/impl/TezAmInstance.java       |  76 ++++++++++++
 .../hive/registry/impl/TezAmRegistryImpl.java   | 118 +++++++++++++++++++
 .../hive/registry/impl/ZkRegistryBase.java      |  37 +++++-
 llap-tez/pom.xml                                |  34 ++++++
 .../tezplugins/LlapTaskSchedulerService.java    |  83 +++++++++----
 .../endpoint/LlapPluginServerImpl.java          |  11 +-
 .../TestLlapTaskSchedulerService.java           |   1 +
 .../ql/exec/tez/SessionExpirationTracker.java   |   3 +-
 .../hadoop/hive/ql/exec/tez/TezSessionPool.java |  93 +++++++++++++--
 .../hive/ql/exec/tez/TezSessionPoolManager.java |  34 +++---
 .../hive/ql/exec/tez/TezSessionPoolSession.java |   8 +-
 .../hive/ql/exec/tez/TezSessionState.java       |  40 +++----
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java |   5 +-
 .../hadoop/hive/ql/session/SessionState.java    |  10 +-
 .../hive/ql/exec/tez/SampleTezSessionState.java |  12 +-
 .../hive/ql/exec/tez/TestTezSessionPool.java    |  14 ++-
 .../hadoop/hive/ql/exec/tez/TestTezTask.java    |   3 +-
 19 files changed, 493 insertions(+), 113 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index c9d75c0..24c5db0 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -86,7 +86,6 @@ public class HiveConf extends Configuration {
 
   private static byte[] confVarByteArray = null;
 
-
   private static final Map<String, ConfVars> vars = new HashMap<String, 
ConfVars>();
   private static final Map<String, ConfVars> metaConfs = new HashMap<String, 
ConfVars>();
   private final List<String> restrictList = new ArrayList<String>();
@@ -3274,6 +3273,12 @@ public class HiveConf extends Configuration {
       "llap.daemon.communicator.num.threads"),
     
LLAP_DAEMON_DOWNLOAD_PERMANENT_FNS("hive.llap.daemon.download.permanent.fns", 
false,
         "Whether LLAP daemon should localize the resources for permanent 
UDFs."),
+    
LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME("hive.llap.task.scheduler.am.registry", 
"llap",
+      "AM registry name for LLAP task scheduler plugin to register with."),
+    
LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL("hive.llap.task.scheduler.am.registry.principal",
 "",
+      "The name of the principal used to access ZK AM registry securely."),
+    
LLAP_TASK_SCHEDULER_AM_REGISTRY_KEYTAB_FILE("hive.llap.task.scheduler.am.registry.keytab.file",
 "",
+      "The path to the Kerberos keytab file used to access ZK AM registry 
securely."),
     LLAP_TASK_SCHEDULER_NODE_REENABLE_MIN_TIMEOUT_MS(
       "hive.llap.task.scheduler.node.reenable.min.timeout.ms", "200ms",
       new TimeValidator(TimeUnit.MILLISECONDS),

http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
index 65f8f94..8339230 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java
@@ -254,13 +254,13 @@ public class LlapZookeeperRegistryImpl
       this.instancesCache = cache;
       this.parent = parent;
       this.encoder = encoder;
-      parent.populateCache(instancesCache);
+      parent.populateCache(instancesCache, false);
     }
 
 
     @Override
     public Collection<LlapServiceInstance> getAll() {
-      return parent.getAll();
+      return parent.getAllInternal();
     }
 
     @Override
@@ -281,12 +281,12 @@ public class LlapZookeeperRegistryImpl
 
     @Override
     public Set<LlapServiceInstance> getByHost(String host) {
-      return parent.getByHost(host);
+      return parent.getByHostInternal(host);
     }
 
     @Override
     public int size() {
-      return parent.size();
+      return parent.sizeInternal();
     }
 
     @Override
@@ -402,14 +402,9 @@ public class LlapZookeeperRegistryImpl
   }
 
   @Override
-  public void start() throws IOException {
-    super.start();
-  }
-
-  @Override
-  public void stop() throws IOException {
-    super.stop();
+  public void stop() {
     CloseableUtils.closeQuietly(slotZnode);
+    super.stop();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java 
b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
new file mode 100644
index 0000000..a71904c
--- /dev/null
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmInstance.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.registry.impl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.codec.binary.Base64;
+
+import com.google.common.io.ByteStreams;
+
+import org.apache.tez.common.security.JobTokenIdentifier;
+
+import org.apache.hadoop.security.token.Token;
+
+import java.io.IOException;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.types.AddressTypes;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+
+public class TezAmInstance extends ServiceInstanceBase {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TezAmInstance.class);
+  private final int pluginPort;
+  private Token<JobTokenIdentifier> token;
+
+  public TezAmInstance(ServiceRecord srv) throws IOException {
+    super(srv, TezAmRegistryImpl.IPC_TEZCLIENT);
+    final Endpoint plugin = 
srv.getInternalEndpoint(TezAmRegistryImpl.IPC_PLUGIN);
+    if (plugin != null) {
+      this.pluginPort = Integer.parseInt(RegistryTypeUtils.getAddressField(
+        plugin.addresses.get(0), AddressTypes.ADDRESS_PORT_FIELD));
+    } else {
+      this.pluginPort = -1;
+    }
+  }
+
+  public int getPluginPort() {
+    return pluginPort;
+  }
+
+  public String getSessionId() {
+    return getProperties().get(TezAmRegistryImpl.AM_SESSION_ID);
+  }
+
+  public String getJobIdForPluginToken() {
+    return getProperties().get(TezAmRegistryImpl.AM_PLUGIN_JOBID);
+  }
+
+  public Token<JobTokenIdentifier> getPluginToken() {
+    if (this.token != null) return token;
+    String tokenString = 
getProperties().get(TezAmRegistryImpl.AM_PLUGIN_TOKEN);
+    if (tokenString == null || tokenString.isEmpty()) return null;
+    byte[] tokenBytes = Base64.decodeBase64(tokenString);
+    Token<JobTokenIdentifier> token = new Token<>();
+    try {
+      token.readFields(ByteStreams.newDataInput(tokenBytes));
+    } catch (IOException e) {
+      LOG.error("Couldn't read the plugin token from [" + tokenString + "]", 
e);
+      return null;
+    }
+    this.token = token;
+    return token;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
 
b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
new file mode 100644
index 0000000..417e571
--- /dev/null
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/TezAmRegistryImpl.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.hive.registry.impl;
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
+import org.apache.hadoop.registry.client.binding.RegistryTypeUtils;
+import org.apache.hadoop.registry.client.binding.RegistryUtils;
+import org.apache.hadoop.registry.client.types.Endpoint;
+import org.apache.hadoop.registry.client.types.ServiceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TezAmRegistryImpl extends ZkRegistryBase<TezAmInstance> {
+  private static final Logger LOG = 
LoggerFactory.getLogger(TezAmRegistryImpl.class);
+
+  static final String IPC_TEZCLIENT = "tez-client";
+  static final String IPC_PLUGIN = "llap-plugin";
+  static final String AM_SESSION_ID = "am.session.id", AM_PLUGIN_TOKEN = 
"am.plugin.token",
+      AM_PLUGIN_JOBID = "am.plugin.jobid";
+  private final static String NAMESPACE_PREFIX = "tez-am-";
+  private final static String USER_SCOPE_PATH_PREFIX = "user-";
+  private static final String WORKER_PREFIX = "worker-";
+  private static final String SASL_LOGIN_CONTEXT_NAME = "TezAmZooKeeperClient";
+
+  private final String registryName;
+
+  public static TezAmRegistryImpl create(Configuration conf, boolean b) {
+    String amRegistryName = HiveConf.getVar(conf, 
ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME);
+    return StringUtils.isBlank(amRegistryName) ? null
+        : new TezAmRegistryImpl(amRegistryName, conf, true);
+  }
+
+
+  private TezAmRegistryImpl(String instanceName, Configuration conf, boolean 
useSecureZk) {
+    super(instanceName, conf, null, NAMESPACE_PREFIX, USER_SCOPE_PATH_PREFIX, 
WORKER_PREFIX,
+        useSecureZk ? SASL_LOGIN_CONTEXT_NAME : null,
+        HiveConf.getVar(conf, 
ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_PRINCIPAL),
+        HiveConf.getVar(conf, 
ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_KEYTAB_FILE),
+        null); // Always validate ACLs
+    this.registryName = instanceName;
+    LOG.info("AM Zookeeper Registry is enabled with registryid: " + 
instanceName);
+  }
+
+  public void initializeWithoutRegistering() throws IOException {
+    initializeWithoutRegisteringInternal();
+  }
+
+  public void populateCache(boolean doInvokeListeners) throws IOException {
+    PathChildrenCache pcc = ensureInstancesCache(0);
+    populateCache(pcc, doInvokeListeners);
+  }
+
+  public String register(int amPort, int pluginPort, String sessionId,
+      String serializedToken, String jobIdForToken) throws IOException {
+    ServiceRecord srv = new ServiceRecord();
+    Endpoint rpcEndpoint = RegistryTypeUtils.ipcEndpoint(
+        IPC_TEZCLIENT, new InetSocketAddress(hostname, amPort));
+    srv.addInternalEndpoint(rpcEndpoint);
+    Endpoint pluginEndpoint = null;
+    if (pluginPort >= 0) {
+      pluginEndpoint = RegistryTypeUtils.ipcEndpoint(
+          IPC_PLUGIN, new InetSocketAddress(hostname, pluginPort));
+      srv.addInternalEndpoint(pluginEndpoint);
+    }
+    srv.set(AM_SESSION_ID, sessionId);
+    boolean hasToken = serializedToken != null;
+    srv.set(AM_PLUGIN_TOKEN, hasToken ? serializedToken : "");
+    srv.set(AM_PLUGIN_JOBID, jobIdForToken != null ? jobIdForToken : "");
+    String uniqueId = registerServiceRecord(srv);
+    LOG.info("Registered this AM: rpc: {}, plugin: {}, sessionId: {}, token: 
{}, znodePath: {}",
+        rpcEndpoint, pluginEndpoint, sessionId, hasToken, 
getRegistrationZnodePath());
+    return uniqueId;
+  }
+
+  public TezAmInstance getInstance(String name) {
+    Collection<TezAmInstance> instances = getAllInternal();
+    for(TezAmInstance instance : instances) {
+      if (instance.getWorkerIdentity().equals(name)) {
+        return instance;
+      }
+    }
+    return null;
+  }
+
+  @Override
+  protected TezAmInstance createServiceInstance(ServiceRecord srv) throws 
IOException {
+    return new TezAmInstance(srv);
+  }
+
+  @Override
+  protected String getZkPathUser(Configuration conf) {
+    // We assume that AMs and HS2 run under the same user.
+    return RegistryUtils.currentUser();
+  }
+
+  public String getRegistryName() {
+    return registryName;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
----------------------------------------------------------------------
diff --git 
a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java 
b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
index c773770..b6773b5 100644
--- 
a/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
+++ 
b/llap-client/src/java/org/apache/hadoop/hive/registry/impl/ZkRegistryBase.java
@@ -58,6 +58,7 @@ import org.apache.hadoop.registry.client.types.ServiceRecord;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.zookeeper.KeeperException.InvalidACLException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
 import org.apache.zookeeper.ZooDefs;
 import org.apache.zookeeper.data.ACL;
 import org.apache.zookeeper.data.Id;
@@ -278,6 +279,27 @@ public abstract class ZkRegistryBase<InstanceType extends 
ServiceInstance> {
   }
 
 
+  protected final void initializeWithoutRegisteringInternal() throws 
IOException {
+    // Create a znode under the rootNamespace parent for this instance of the 
server
+    try {
+      try {
+        
zooKeeperClient.create().creatingParentsIfNeeded().forPath(workersPath);
+      } catch (NodeExistsException ex) {
+        // Ignore - this is expected.
+      }
+      if (doCheckAcls) {
+        try {
+          checkAndSetAcls();
+        } catch (Exception ex) {
+          throw new IOException("Error validating or setting ACLs. " + 
disableMessage, ex);
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Unable to create a parent znode for the registry", e);
+      throw (e instanceof IOException) ? (IOException)e : new IOException(e);
+    }
+  }
+
   private void checkAndSetAcls() throws Exception {
     if (!UserGroupInformation.isSecurityEnabled()) return;
     // We are trying to check ACLs on the "workers" directory, which noone 
except us should be
@@ -356,7 +378,7 @@ public abstract class ZkRegistryBase<InstanceType extends 
ServiceInstance> {
     instanceSet.add(instance);
   }
 
-  protected final void populateCache(PathChildrenCache instancesCache) {
+  protected final void populateCache(PathChildrenCache instancesCache, boolean 
doInvokeListeners) {
     for (ChildData childData : instancesCache.getCurrentData()) {
       byte[] data = getWorkerData(childData, workerNodePrefix);
       if (data == null) continue;
@@ -364,6 +386,11 @@ public abstract class ZkRegistryBase<InstanceType extends 
ServiceInstance> {
         ServiceRecord srv = encoder.fromBytes(childData.getPath(), data);
         InstanceType instance = createServiceInstance(srv);
         addToCache(childData.getPath(), instance.getHost(), instance);
+        if (doInvokeListeners) {
+          for (ServiceInstanceStateChangeListener<InstanceType> listener : 
stateChangeListeners) {
+            listener.onCreate(instance);
+          }
+        }
       } catch (IOException e) {
         LOG.error("Unable to decode data for zkpath: {}." +
             " Ignoring from current instances list..", childData.getPath());
@@ -426,12 +453,12 @@ public abstract class ZkRegistryBase<InstanceType extends 
ServiceInstance> {
   // The real implementation for the instanceset... instanceset has its own 
copy of the
   // ZK cache yet completely depends on the parent in every other aspect and 
is thus unneeded.
 
-  public int size() {
+  protected final int sizeInternal() {
     // not using the path child cache here as there could be more than 1 path 
per host (worker and slot znodes)
     return nodeToInstanceCache.size();
   }
 
-  protected final Set<InstanceType> getByHost(String host) {
+  protected final Set<InstanceType> getByHostInternal(String host) {
     Set<InstanceType> byHost = nodeToInstanceCache.get(host);
     byHost = (byHost == null) ? Sets.<InstanceType>newHashSet() : byHost;
     if (LOG.isDebugEnabled()) {
@@ -440,7 +467,7 @@ public abstract class ZkRegistryBase<InstanceType extends 
ServiceInstance> {
     return byHost;
   }
 
-  protected final Collection<InstanceType> getAll() {
+  protected final Collection<InstanceType> getAllInternal() {
     Set<InstanceType> instances =  new HashSet<>();
     for(Set<InstanceType> instanceSet : pathToInstanceCache.values()) {
       instances.addAll(instanceSet);
@@ -533,7 +560,7 @@ public abstract class ZkRegistryBase<InstanceType extends 
ServiceInstance> {
     CloseableUtils.class.getName();
   }
 
-  public void stop() throws IOException {
+  public void stop() {
     CloseableUtils.closeQuietly(znode);
     CloseableUtils.closeQuietly(instancesCache);
     CloseableUtils.closeQuietly(zooKeeperClient);

http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/llap-tez/pom.xml
----------------------------------------------------------------------
diff --git a/llap-tez/pom.xml b/llap-tez/pom.xml
index 1e5b235..69fbea3 100644
--- a/llap-tez/pom.xml
+++ b/llap-tez/pom.xml
@@ -46,6 +46,28 @@
     </dependency>
     <!-- inter-project -->
     <dependency>
+      <groupId>org.apache.zookeeper</groupId>
+      <artifactId>zookeeper</artifactId>
+      <version>${zookeeper.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-framework</artifactId>
+      <version>${curator.version}</version>
+    </dependency>
+    <dependency>
+    <groupId>org.apache.curator</groupId>
+      <artifactId>apache-curator</artifactId>
+      <version>${curator.version}</version>
+      <type>pom</type>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-test</artifactId>
+      <version>${curator.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.commons</groupId>
       <artifactId>commons-lang3</artifactId>
       <version>${commons-lang3.version}</version>
@@ -67,6 +89,18 @@
       </exclusions>
     </dependency>
     <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-mapreduce-client-core</artifactId>
+      <version>${hadoop.version}</version>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-yarn-registry</artifactId>
+      <version>${hadoop.version}</version>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
       <groupId>org.apache.tez</groupId>
       <artifactId>tez-api</artifactId>
       <version>${tez.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
index cf8bd46..26747fc 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/LlapTaskSchedulerService.java
@@ -14,6 +14,13 @@
 
 package org.apache.hadoop.hive.llap.tezplugins;
 
+import com.google.common.io.ByteArrayDataOutput;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl;
+
 import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
 
 import java.io.IOException;
@@ -217,6 +224,7 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
   private final AtomicInteger assignedTaskCounter = new AtomicInteger(0);
 
   private final LlapRegistryService registry = new LlapRegistryService(false);
+  private final TezAmRegistryImpl amRegistry;
 
   private volatile ListenableFuture<Void> nodeEnablerFuture;
   private volatile ListenableFuture<Void> delayedTaskSchedulerFuture;
@@ -236,6 +244,8 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
   private int totalGuaranteed = 0, unusedGuaranteed = 0;
 
   private LlapTaskCommunicator communicator;
+  private final int amPort;
+  private final String serializedToken, jobIdForToken;
 
   public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext) {
     this(taskSchedulerContext, new MonotonicClock(), true);
@@ -245,17 +255,13 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
   // The fields that HS2 uses to give AM information about plugin endpoint.
   /** Whether to enable the endpoint. */
   public static final String LLAP_PLUGIN_ENDPOINT_ENABLED = 
"llap.plugin.endpoint.enabled";
-  /** The fake job ID generated by HS2 to use for the job token. */
-  public static final String LLAP_PLUGIN_ENDPOINT_JOBID = 
"llap.plugin.endpoint.jobid";
-  /** The job token generated by HS2 for the endpoint. See the comment at the 
place where
-   * this is used - the token will be generated by AM after we have AM 
registry. */
-  public static final String LLAP_PLUGIN_ENDPOINT_TOKEN = 
"llap.plugin.endpoint.token";
 
   @VisibleForTesting
   public LlapTaskSchedulerService(TaskSchedulerContext taskSchedulerContext, 
Clock clock,
       boolean initMetrics) {
     super(taskSchedulerContext);
     this.clock = clock;
+    this.amPort = taskSchedulerContext.getAppClientPort();
     this.delayedTaskSchedulerCallable = createDelayedTaskSchedulerCallable();
     try {
       this.conf = 
TezUtils.createConfFromUserPayload(taskSchedulerContext.getInitialUserPayload());
@@ -267,26 +273,24 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
     if (conf.getBoolean(LLAP_PLUGIN_ENDPOINT_ENABLED, false)) {
       JobTokenSecretManager sm = null;
       if (UserGroupInformation.isSecurityEnabled()) {
+        // Set up the security for plugin endpoint.
+        // We will create the token and publish it in the AM registry.
+        // Note: this application ID is bogus and is only needed for 
JobTokenSecretManager.
+        ApplicationId id = ApplicationId.newInstance(
+            System.nanoTime(), (int)(System.nanoTime() % 100000));
+        Token<JobTokenIdentifier> token = createAmsToken(id);
+        serializedToken = serializeToken(token);
+        jobIdForToken = token.getService().toString();
         sm = new JobTokenSecretManager();
-        // Note: for HA/AMRegistry, this token will have to be published in a 
secure location (ZK).
-        //       Otherwise, when HS2 dies, noone will be able to talk to this 
AM.
-        //       We could generate it in AM in the first place, but then even 
the original HS2
-        //       won't be able to talk to us before we register. Which might 
be ok.
-        // For now, there's no mechanism to coordinate this, so we email the 
token from HS2.
-        byte[] tokenBytes = 
Base64.decodeBase64(conf.get(LLAP_PLUGIN_ENDPOINT_TOKEN));
-        Token<JobTokenIdentifier> token = new Token<>();
-        try {
-          token.readFields(ByteStreams.newDataInput(tokenBytes));
-        } catch (IOException e) {
-          // This shouldn't really happen on a byte array.
-          throw new RuntimeException(e);
-        }
-        sm.addTokenForJob(conf.get(LLAP_PLUGIN_ENDPOINT_JOBID), token);
+        sm.addTokenForJob(jobIdForToken, token);
+      } else {
+        serializedToken = jobIdForToken = null;
       }
       pluginEndpoint = new LlapPluginServerImpl(sm,
           HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_RPC_NUM_HANDLERS),
           HiveConf.getIntVar(conf, ConfVars.LLAP_PLUGIN_RPC_PORT), this);
     } else {
+      serializedToken = jobIdForToken = null;
       pluginEndpoint = null;
     }
     // This is called once per AM, so we don't get the starting duck count 
here.
@@ -358,9 +362,11 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
     }
 
     String hostsString = HiveConf.getVar(conf, 
ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
-    LOG.info(
-        "Running with configuration: hosts={}, numSchedulableTasksPerNode={}, 
nodeBlacklistConf={}, localityConf={}",
+    LOG.info("Running with configuration: hosts={}, 
numSchedulableTasksPerNode={}, "
+        + "nodeBlacklistConf={}, localityConf={}",
         hostsString, numSchedulableTasksPerNode, nodeBlacklistConf, 
localityDelayConf);
+    this.amRegistry = TezAmRegistryImpl.create(conf, true);
+
 
     LlapTaskCommunicator peer = LlapTaskCommunicator.instance.get();
     if (peer != null) {
@@ -372,6 +378,29 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
     }
   }
 
+  private static Token<JobTokenIdentifier> createAmsToken(ApplicationId id) {
+    if (!UserGroupInformation.isSecurityEnabled()) return null;
+    JobTokenIdentifier identifier = new JobTokenIdentifier(new 
Text(id.toString()));
+    JobTokenSecretManager jobTokenManager = new JobTokenSecretManager();
+    Token<JobTokenIdentifier> sessionToken = new Token<>(identifier, 
jobTokenManager);
+    sessionToken.setService(identifier.getJobId());
+    return sessionToken;
+  }
+
+  private static String serializeToken(Token<JobTokenIdentifier> token) {
+    byte[] bytes = null;
+    try {
+      ByteArrayDataOutput out = ByteStreams.newDataOutput();
+      token.write(out);
+      bytes = out.toByteArray();
+    } catch (IOException e) {
+      // This shouldn't really happen on a byte array.
+      throw new RuntimeException(e);
+    }
+    return Base64.encodeBase64String(bytes);
+  }
+
+
   @VisibleForTesting
   void updateGuaranteedCount(int newTotalGuaranteed) {
     List<TaskInfo> toUpdate = null;
@@ -520,6 +549,9 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
   @Override
   public void initialize() {
     registry.init(conf);
+    if (pluginEndpoint != null) {
+      pluginEndpoint.init(conf);
+    }
   }
 
   @Override
@@ -562,6 +594,12 @@ public class LlapTaskSchedulerService extends 
TaskScheduler {
         addNode(new NodeInfo(inst, nodeBlacklistConf, clock,
             numSchedulableTasksPerNode, metrics), inst);
       }
+      if (amRegistry != null) {
+        amRegistry.start();
+        int pluginPort = pluginEndpoint != null ? 
pluginEndpoint.getActualPort() : -1;
+        amRegistry.register(amPort, pluginPort, HiveConf.getVar(conf, 
ConfVars.HIVESESSIONID),
+            serializedToken, jobIdForToken);
+      }
     } finally {
       writeLock.unlock();
     }
@@ -682,6 +720,9 @@ public class LlapTaskSchedulerService extends TaskScheduler 
{
         if (registry != null) {
           registry.stop();
         }
+        if (amRegistry != null) {
+          amRegistry.stop();
+        }
 
         if (pluginEndpoint != null) {
           pluginEndpoint.stop();

http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java
 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java
index f3c0d52..4d5333f 100644
--- 
a/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java
+++ 
b/llap-tez/src/java/org/apache/hadoop/hive/llap/tezplugins/endpoint/LlapPluginServerImpl.java
@@ -43,6 +43,7 @@ public class LlapPluginServerImpl extends AbstractService 
implements LlapPluginP
   private final SecretManager<JobTokenIdentifier> secretManager;
   private final int numHandlers;
   private final LlapTaskSchedulerService parent;
+  private final AtomicReference<InetSocketAddress> bindAddress = new 
AtomicReference<>();
 
   public LlapPluginServerImpl(SecretManager<JobTokenIdentifier> secretManager,
       int numHandlers, int port, LlapTaskSchedulerService parent) {
@@ -66,7 +67,7 @@ public class LlapPluginServerImpl extends AbstractService 
implements LlapPluginP
     final Configuration conf = getConfig();
     final BlockingService daemonImpl =
         
LlapPluginProtocolProtos.LlapPluginProtocol.newReflectiveBlockingService(this);
-    server = LlapUtil.startProtocolServer(port, numHandlers, null, conf, 
daemonImpl,
+    server = LlapUtil.startProtocolServer(port, numHandlers, bindAddress , 
conf, daemonImpl,
         LlapPluginProtocolPB.class, secretManager, new 
LlapPluginPolicyProvider(),
         ConfVars.LLAP_PLUGIN_ACL, ConfVars.LLAP_PLUGIN_ACL_DENY);
   }
@@ -77,4 +78,12 @@ public class LlapPluginServerImpl extends AbstractService 
implements LlapPluginP
       server.stop();
     }
   }
+
+  public int getActualPort() {
+    InetSocketAddress bindAddress = this.bindAddress.get();
+    if (bindAddress == null) {
+      throw new RuntimeException("Cannot get port before the service is 
started");
+    }
+    return bindAddress.getPort();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
----------------------------------------------------------------------
diff --git 
a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
 
b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
index 156e620..51d2e08 100644
--- 
a/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
+++ 
b/llap-tez/src/test/org/apache/hadoop/hive/llap/tezplugins/TestLlapTaskSchedulerService.java
@@ -1938,6 +1938,7 @@ public class TestLlapTaskSchedulerService {
         nodeDisableTimeoutMillis + "ms");
       conf.setBoolean(LlapFixedRegistryImpl.FIXED_REGISTRY_RESOLVE_HOST_NAMES, 
false);
       conf.setLong(ConfVars.LLAP_TASK_SCHEDULER_LOCALITY_DELAY.varname, 
localityDelayMs);
+      conf.set(ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME.varname, "");
 
       doReturn(appAttemptId).when(mockAppCallback).getApplicationAttemptId();
       doReturn(11111l).when(mockAppCallback).getCustomClusterIdentifier();

http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java
index 8bee77e..da93a3a 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SessionExpirationTracker.java
@@ -45,6 +45,7 @@ class SessionExpirationTracker {
   private final long sessionLifetimeMs;
   private final long sessionLifetimeJitterMs;
   private final RestartImpl sessionRestartImpl;
+  private volatile SessionState initSessionState;
 
   interface RestartImpl {
     void closeAndReopenPoolSession(TezSessionPoolSession session) throws 
Exception;
@@ -68,7 +69,6 @@ class SessionExpirationTracker {
       LOG.debug("Session expiration is enabled; session lifetime is "
           + sessionLifetimeMs + " + [0, " + sessionLifetimeJitterMs + ") ms");
     }
-    final SessionState initSessionState = SessionState.get();
     expirationQueue = new PriorityBlockingQueue<>(11, new 
Comparator<TezSessionPoolSession>() {
       @Override
       public int compare(TezSessionPoolSession o1, TezSessionPoolSession o2) {
@@ -179,6 +179,7 @@ class SessionExpirationTracker {
 
 
   public void start() {
+    initSessionState = SessionState.get();
     expirationThread.start();
     restartThread.start();
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
index 4f58565..6e8122d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPool.java
@@ -17,6 +17,14 @@
  */
 package org.apache.hadoop.hive.ql.exec.tez;
 
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.hadoop.hive.llap.tezplugins.LlapTaskSchedulerService;
+
+import org.apache.hadoop.hive.registry.ServiceInstanceStateChangeListener;
+import org.apache.hadoop.hive.registry.impl.TezAmInstance;
+import org.apache.hadoop.hive.registry.impl.TezAmRegistryImpl;
+
 import java.io.IOException;
 import java.util.Queue;
 import java.util.Set;
@@ -48,14 +56,31 @@ class TezSessionPool {
   private final HiveConf initConf;
   private final BlockingDeque<TezSessionPoolSession> defaultQueuePool;
 
-  TezSessionPool(HiveConf initConf, int numSessionsTotal) {
+  private final String amRegistryName;
+  private final TezAmRegistryImpl amRegistry;
+
+  private final ConcurrentHashMap<String, TezSessionPoolSession> bySessionId =
+      new ConcurrentHashMap<>();
+
+
+  TezSessionPool(HiveConf initConf, int numSessionsTotal, boolean 
useAmRegistryIfPresent) {
     this.initConf = initConf;
     assert numSessionsTotal > 0;
     defaultQueuePool = new 
LinkedBlockingDeque<TezSessionPoolSession>(numSessionsTotal);
+    this.amRegistry = useAmRegistryIfPresent ? 
TezAmRegistryImpl.create(initConf, true) : null;
+    this.amRegistryName = amRegistry == null ? null : 
amRegistry.getRegistryName();
   }
 
   void startInitialSessions() throws Exception {
     if (initialSessions.isEmpty()) return;
+    if (amRegistry != null) {
+      amRegistry.start();
+      amRegistry.initializeWithoutRegistering();
+      // Note: we may later have special logic to pick up old AMs, if any.
+      amRegistry.registerStateChangeListener(new ChangeListener());
+      amRegistry.populateCache(true);
+    }
+
     int threadCount = Math.min(initialSessions.size(),
         HiveConf.getIntVar(initConf, 
ConfVars.HIVE_SERVER2_TEZ_SESSION_MAX_INIT_THREADS));
     Preconditions.checkArgument(threadCount > 0);
@@ -134,7 +159,6 @@ class TezSessionPool {
     // Re-setting the queue config is an old hack that we may remove in future.
     Path scratchDir = oldSession.getTezScratchDir();
     Set<String> additionalFiles = oldSession.getAdditionalFilesNotFromConf();
-    HiveConf conf = oldSession.getConf();
     String queueName = oldSession.getQueueName();
     try {
       oldSession.close(false);
@@ -142,24 +166,67 @@ class TezSessionPool {
       if (!wasRemoved) {
         LOG.error("Old session was closed but it was not in the pool", 
oldSession);
       }
+      bySessionId.remove(oldSession.getSessionId());
     } finally {
       // There's some bogus code that can modify the queue name. Force-set it 
for pool sessions.
       // TODO: this might only be applicable to TezSessionPoolManager; try 
moving it there?
-      conf.set(TezConfiguration.TEZ_QUEUE_NAME, queueName);
-      newSession.open(conf, additionalFiles, scratchDir);
+      newSession.getConf().set(TezConfiguration.TEZ_QUEUE_NAME, queueName);
+      // The caller probably created the new session with the old config, but 
update the
+      // registry again just in case. TODO: maybe we should enforce that.
+      configureAmRegistry(newSession);
+      newSession.open(additionalFiles, scratchDir);
       defaultQueuePool.put(newSession);
     }
   }
 
-  private void startInitialSession(TezSessionPoolSession sessionState) throws 
Exception {
-    HiveConf newConf = new HiveConf(initConf);
-    // Makes no senses for it to be mixed up like this.
-    boolean isUsable = sessionState.tryUse();
-    if (!isUsable) throw new IOException(sessionState + " is not usable at 
pool startup");
-    newConf.set(TezConfiguration.TEZ_QUEUE_NAME, sessionState.getQueueName());
-    sessionState.open(newConf);
-    if (sessionState.returnAfterUse()) {
-      defaultQueuePool.put(sessionState);
+  private void startInitialSession(TezSessionPoolSession session) throws 
Exception {
+    boolean isUsable = session.tryUse();
+    if (!isUsable) throw new IOException(session + " is not usable at pool 
startup");
+    session.getConf().set(TezConfiguration.TEZ_QUEUE_NAME, 
session.getQueueName());
+    configureAmRegistry(session);
+    session.open();
+    if (session.returnAfterUse()) {
+      defaultQueuePool.put(session);
+    }
+  }
+
+  private void configureAmRegistry(TezSessionPoolSession session) {
+    if (amRegistryName != null) {
+      bySessionId.put(session.getSessionId(), session);
+      HiveConf conf = session.getConf();
+      conf.set(ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME.varname, 
amRegistryName);
+      conf.set(ConfVars.HIVESESSIONID.varname, session.getSessionId());
+      // TODO: can be enable temporarily for testing
+      // conf.set(LlapTaskSchedulerService.LLAP_PLUGIN_ENDPOINT_ENABLED, 
"true");
+    }
+  }
+
+
+  private final class ChangeListener
+    implements ServiceInstanceStateChangeListener<TezAmInstance> {
+
+    @Override
+    public void onCreate(TezAmInstance serviceInstance) {
+      String sessionId = serviceInstance.getSessionId();
+      TezSessionPoolSession session = bySessionId.get(sessionId);
+      LOG.warn("AM for " + sessionId + " has registered; updating [" + session
+          + "] with an endpoint at " + serviceInstance.getPluginPort());
+      // TODO: actually update the session once WM is committed
+    }
+
+    @Override
+    public void onUpdate(TezAmInstance serviceInstance) {
+      // Presumably we'd get those later if AM updates its stuff.
+      LOG.warn("Received an unexpected update for instance={}. Ignoring", 
serviceInstance);
+    }
+
+    @Override
+    public void onRemove(TezAmInstance serviceInstance) {
+      String sessionId = serviceInstance.getSessionId();
+      // For now, we don't take any action. In future, we might restore the 
session based
+      // on this and get rid of the logic outside of the pool that 
replaces/reopens/etc.
+      LOG.warn("AM for " + sessionId + " has disappeared from the registry");
+      bySessionId.remove(sessionId);
     }
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index 1f4705c..9f72155 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -114,7 +114,9 @@ public class TezSessionPoolManager
     int numSessions = 
conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE);
     int numSessionsTotal = numSessions * (defaultQueueList.length - 
emptyNames);
     if (numSessionsTotal > 0) {
-      defaultSessionPool = new TezSessionPool(initConf, numSessionsTotal);
+      // TODO: this can be enabled to test. Will only be used in WM case for 
now.
+      boolean enableAmRegistry = false;
+      defaultSessionPool = new TezSessionPool(initConf, numSessionsTotal, 
enableAmRegistry);
     }
 
     numConcurrentLlapQueries = 
conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES);
@@ -152,14 +154,16 @@ public class TezSessionPoolManager
         if (queueName.isEmpty()) {
           continue;
         }
-        defaultSessionPool.addInitialSession(createAndInitSession(queueName, 
true));
+        HiveConf sessionConf = new HiveConf(initConf);
+        defaultSessionPool.addInitialSession(createAndInitSession(queueName, 
true, sessionConf));
       }
     }
   }
 
   // TODO Create and init session sets up queue, isDefault - but does not 
initialize the configuration
-  private TezSessionPoolSession createAndInitSession(String queue, boolean 
isDefault) {
-    TezSessionPoolSession sessionState = 
createSession(TezSessionState.makeSessionId());
+  private TezSessionPoolSession createAndInitSession(
+      String queue, boolean isDefault, HiveConf conf) {
+    TezSessionPoolSession sessionState = 
createSession(TezSessionState.makeSessionId(), conf);
     // TODO When will the queue ever be null.
     // Pass queue and default in as constructor parameters, and make them 
final.
     if (queue != null) {
@@ -233,12 +237,12 @@ public class TezSessionPoolManager
    */
   private TezSessionState getNewSessionState(HiveConf conf,
       String queueName, boolean doOpen) throws Exception {
-    TezSessionPoolSession retTezSessionState = createAndInitSession(queueName, 
false);
+    TezSessionPoolSession retTezSessionState = createAndInitSession(queueName, 
false, conf);
     if (queueName != null) {
       conf.set(TezConfiguration.TEZ_QUEUE_NAME, queueName);
     }
     if (doOpen) {
-      retTezSessionState.open(conf);
+      retTezSessionState.open();
       LOG.info("Started a new session for queue: " + queueName +
           " session id: " + retTezSessionState.getSessionId());
     }
@@ -316,8 +320,8 @@ public class TezSessionPoolManager
     tezSessionState.close(false);
   }
 
-  protected TezSessionPoolSession createSession(String sessionId) {
-    return new TezSessionPoolSession(sessionId, this, expirationTracker);
+  protected TezSessionPoolSession createSession(String sessionId, HiveConf 
conf) {
+    return new TezSessionPoolSession(sessionId, this, expirationTracker, conf);
   }
 
   /*
@@ -386,13 +390,8 @@ public class TezSessionPoolManager
   /** Reopens the session that was found to not be running. */
   public void reopenSession(TezSessionState sessionState, Configuration conf) 
throws Exception {
     HiveConf sessionConf = sessionState.getConf();
-    // TODO: when will sessionConf be null, other than tests? Set in open. 
Throw?
-    if (sessionConf == null) {
-      LOG.warn("Session configuration is null for " + sessionState);
-      // default queue name when the initial session was created
-      sessionConf = new HiveConf(conf, TezSessionPoolManager.class);
-    }
-    if (sessionState.getQueueName() != null && 
sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) == null) {
+    if (sessionState.getQueueName() != null
+        && sessionConf.get(TezConfiguration.TEZ_QUEUE_NAME) == null) {
       sessionConf.set(TezConfiguration.TEZ_QUEUE_NAME, 
sessionState.getQueueName());
     }
     Set<String> oldAdditionalFiles = 
sessionState.getAdditionalFilesNotFromConf();
@@ -401,7 +400,7 @@ public class TezSessionPoolManager
     // Close the old one, but keep the tmp files around.
     sessionState.close(true);
     // TODO: should we reuse scratchDir too?
-    sessionState.open(sessionConf, oldAdditionalFiles, null);
+    sessionState.open(oldAdditionalFiles, null);
   }
 
   public void closeNonDefaultSessions(boolean keepTmpDir) throws Exception {
@@ -422,7 +421,8 @@ public class TezSessionPoolManager
     if (queueName == null) {
       LOG.warn("Pool session has a null queue: " + oldSession);
     }
-    TezSessionPoolSession newSession = createAndInitSession(queueName, 
oldSession.isDefault());
+    TezSessionPoolSession newSession = createAndInitSession(
+        queueName, oldSession.isDefault(), oldSession.getConf());
     defaultSessionPool.replaceSession(oldSession, newSession);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
index 005eeed..8ecdbbf 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolSession.java
@@ -56,8 +56,8 @@ class TezSessionPoolSession extends TezSessionState {
   private final SessionExpirationTracker expirationTracker;
 
   public TezSessionPoolSession(String sessionId, OpenSessionTracker parent,
-      SessionExpirationTracker expirationTracker) {
-    super(sessionId);
+      SessionExpirationTracker expirationTracker, HiveConf conf) {
+    super(sessionId, conf);
     this.parent = parent;
     this.expirationTracker = expirationTracker;
   }
@@ -83,10 +83,10 @@ class TezSessionPoolSession extends TezSessionState {
   }
 
   @Override
-  protected void openInternal(HiveConf conf, Collection<String> 
additionalFiles,
+  protected void openInternal(Collection<String> additionalFiles,
       boolean isAsync, LogHelper console, Path scratchDir)
           throws IOException, LoginException, URISyntaxException, TezException 
{
-    super.openInternal(conf, additionalFiles, isAsync, console, scratchDir);
+    super.openInternal(additionalFiles, isAsync, console, scratchDir);
     parent.registerOpenSession(this);
     if (expirationTracker != null) {
       expirationTracker.addToExpirationQueue(this);

http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
----------------------------------------------------------------------
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
index fe5c6a1..170de21 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionState.java
@@ -93,7 +93,7 @@ public class TezSessionState {
   private static final String LLAP_LAUNCHER = 
LlapContainerLauncher.class.getName();
   private static final String LLAP_TASK_COMMUNICATOR = 
LlapTaskCommunicator.class.getName();
 
-  private HiveConf conf;
+  private final HiveConf conf;
   private Path tezScratchDir;
   private LocalResource appJarLr;
   private TezClient session;
@@ -116,8 +116,9 @@ public class TezSessionState {
    * Constructor. We do not automatically connect, because we only want to
    * load tez classes when the user has tez installed.
    */
-  public TezSessionState(DagUtils utils) {
+  public TezSessionState(DagUtils utils, HiveConf conf) {
     this.utils = utils;
+    this.conf = conf;
   }
 
   public String toString() {
@@ -129,8 +130,8 @@ public class TezSessionState {
    * Constructor. We do not automatically connect, because we only want to
    * load tez classes when the user has tez installed.
    */
-  public TezSessionState(String sessionId) {
-    this(DagUtils.getInstance());
+  public TezSessionState(String sessionId, HiveConf conf) {
+    this(DagUtils.getInstance(), conf);
     this.sessionId = sessionId;
   }
 
@@ -176,10 +177,9 @@ public class TezSessionState {
     return UUID.randomUUID().toString();
   }
 
-  public void open(HiveConf conf)
-      throws IOException, LoginException, URISyntaxException, TezException {
+  public void open() throws IOException, LoginException, URISyntaxException, 
TezException {
     Set<String> noFiles = null;
-    open(conf, noFiles, null);
+    open(noFiles, null);
   }
 
   /**
@@ -191,9 +191,9 @@ public class TezSessionState {
    * @throws TezException
    * @throws InterruptedException
    */
-  public void open(HiveConf conf, String[] additionalFiles)
-    throws IOException, LoginException, IllegalArgumentException, 
URISyntaxException, TezException {
-    openInternal(conf, setFromArray(additionalFiles), false, null, null);
+  public void open(String[] additionalFiles)
+      throws IOException, LoginException, URISyntaxException, TezException {
+    openInternal(setFromArray(additionalFiles), false, null, null);
   }
 
   private static Set<String> setFromArray(String[] additionalFiles) {
@@ -205,20 +205,19 @@ public class TezSessionState {
     return files;
   }
 
-  public void beginOpen(HiveConf conf, String[] additionalFiles, LogHelper 
console)
-    throws IOException, LoginException, IllegalArgumentException, 
URISyntaxException, TezException {
-    openInternal(conf, setFromArray(additionalFiles), true, console, null);
+  public void beginOpen(String[] additionalFiles, LogHelper console)
+      throws IOException, LoginException, URISyntaxException, TezException {
+    openInternal(setFromArray(additionalFiles), true, console, null);
   }
 
-  public void open(HiveConf conf, Collection<String> additionalFiles, Path 
scratchDir)
+  public void open(Collection<String> additionalFiles, Path scratchDir)
       throws LoginException, IOException, URISyntaxException, TezException {
-    openInternal(conf, additionalFiles, false, null, scratchDir);
+    openInternal(additionalFiles, false, null, scratchDir);
   }
 
-  protected void openInternal(final HiveConf conf, Collection<String> 
additionalFiles,
-      boolean isAsync, LogHelper console, Path scratchDir) throws IOException, 
LoginException,
-        IllegalArgumentException, URISyntaxException, TezException {
-    this.conf = conf;
+  protected void openInternal(Collection<String> additionalFiles,
+      boolean isAsync, LogHelper console, Path scratchDir)
+          throws IOException, LoginException, URISyntaxException, TezException 
{
     // TODO Why is the queue name set again. It has already been setup via 
setQueueName. Do only one of the two.
     String confQueueName = conf.get(TezConfiguration.TEZ_QUEUE_NAME);
     if (queueName != null && !queueName.equals(confQueueName)) {
@@ -464,7 +463,7 @@ public class TezSessionState {
   }
 
   public void refreshLocalResourcesFromConf(HiveConf conf)
-    throws IOException, LoginException, IllegalArgumentException, 
URISyntaxException, TezException {
+      throws IOException, LoginException, URISyntaxException, TezException {
 
     String dir = tezScratchDir.toString();
 
@@ -531,7 +530,6 @@ public class TezSessionState {
     sessionFuture = null;
     console = null;
     tezScratchDir = null;
-    conf = null;
     appJarLr = null;
     additionalFilesNotFromConf.clear();
     localizedResources.clear();

http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index f1f1028..e6e236d 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -339,10 +339,9 @@ public class TezTask extends Task<TezWork> {
     TezClient client = session.getSession();
     // TODO null can also mean that this operation was interrupted. Should we 
really try to re-create the session in that case ?
     if (client == null) {
-      // can happen if the user sets the tez flag after the session was
-      // established
+      // Can happen if the user sets the tez flag after the session was 
established.
       LOG.info("Tez session hasn't been created yet. Opening session");
-      session.open(conf, inputOutputJars);
+      session.open(inputOutputJars);
     } else {
       LOG.info("Session is already open");
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java 
b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
index d7592bb..8b64407 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/session/SessionState.java
@@ -627,7 +627,11 @@ public class SessionState {
 
     try {
       if (startSs.tezSessionState == null) {
-        startSs.setTezSession(new TezSessionState(startSs.getSessionId()));
+        startSs.setTezSession(new TezSessionState(startSs.getSessionId(), 
startSs.sessionConf));
+      } else {
+        // Only TezTask sets this, and then removes when done, so we don't 
expect to see it.
+        LOG.warn("Tez session was already present in SessionState before 
start: "
+            + startSs.tezSessionState);
       }
       if (startSs.tezSessionState.isOpen()) {
         return;
@@ -640,9 +644,9 @@ public class SessionState {
       }
       // Neither open nor opening.
       if (!isAsync) {
-        startSs.tezSessionState.open(startSs.sessionConf); // should use conf 
on session start-up
+        startSs.tezSessionState.open();
       } else {
-        startSs.tezSessionState.beginOpen(startSs.sessionConf, null, console);
+        startSs.tezSessionState.beginOpen(null, console);
       }
     } catch (Exception e) {
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
index 973c0cc..4e5d991 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/SampleTezSessionState.java
@@ -39,13 +39,14 @@ public class SampleTezSessionState extends 
TezSessionPoolSession {
 
   private boolean open;
   private final String sessionId;
-  private HiveConf hiveConf;
+  private final HiveConf hiveConf;
   private String user;
   private boolean doAsEnabled;
 
-  public SampleTezSessionState(String sessionId, TezSessionPoolManager parent) 
{
-    super(sessionId, parent, parent.getExpirationTracker());
+  public SampleTezSessionState(String sessionId, TezSessionPoolManager parent, 
HiveConf conf) {
+    super(sessionId, parent, parent.getExpirationTracker(), conf);
     this.sessionId = sessionId;
+    this.hiveConf = conf;
   }
 
   @Override
@@ -58,12 +59,11 @@ public class SampleTezSessionState extends 
TezSessionPoolSession {
   }
 
   @Override
-  public void open(HiveConf conf) throws IOException, LoginException, 
URISyntaxException,
+  public void open() throws IOException, LoginException, URISyntaxException,
       TezException {
-    this.hiveConf = conf;
     UserGroupInformation ugi = Utils.getUGI();
     user = ugi.getShortUserName();
-    this.doAsEnabled = 
conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
+    this.doAsEnabled = 
hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS);
     setOpen(true);
   }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
----------------------------------------------------------------------
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index d2b98c4..5e1e68c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -49,8 +49,14 @@ public class TestTezSessionPool {
     }
 
     @Override
-    public TezSessionPoolSession createSession(String sessionId) {
-      return new SampleTezSessionState(sessionId, this);
+    public void setupPool(HiveConf conf) throws InterruptedException {
+      conf.setVar(ConfVars.LLAP_TASK_SCHEDULER_AM_REGISTRY_NAME, "");
+      super.setupPool(conf);
+    }
+
+    @Override
+    public TezSessionPoolSession createSession(String sessionId, HiveConf 
conf) {
+      return new SampleTezSessionState(sessionId, this, conf);
     }
   }
 
@@ -188,7 +194,7 @@ public class TestTezSessionPool {
       poolManager.reopenSession(session, conf);
 
       Mockito.verify(session).close(true);
-      Mockito.verify(session).open(conf, new HashSet<String>(), null);
+      Mockito.verify(session).open(new HashSet<String>(), null);
 
       // mocked session starts with default queue
       assertEquals("default", session.getQueueName());
@@ -325,7 +331,7 @@ public class TestTezSessionPool {
     poolManager.reopenSession(session, conf);
 
     Mockito.verify(session).close(true);
-    Mockito.verify(session).open(conf, new HashSet<String>(), null);
+    Mockito.verify(session).open(new HashSet<String>(), null);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/0e0c9803/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java 
b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
index 176692b..9b9eead 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezTask.java
@@ -227,8 +227,7 @@ public class TestTezTask {
     task.submit(conf, dag, path, appLr, sessionState, 
Collections.<LocalResource> emptyList(),
         new String[0], Collections.<String,LocalResource> emptyMap());
     // validate close/reopen
-    verify(sessionState, times(1)).open(
-        any(HiveConf.class), any(Collection.class), any(Path.class));
+    verify(sessionState, times(1)).open(any(Collection.class), 
any(Path.class));
     verify(sessionState, times(1)).close(eq(true)); // now uses pool after 
HIVE-7043
     verify(session, times(2)).submitDAG(any(DAG.class));
   }

Reply via email to