HIVE-8890: HiveServer2 dynamic service discovery: use persistent ephemeral 
nodes curator recipe (Vaibhav Gumashta reviewed by Thejas Nair)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/652febcd
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/652febcd
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/652febcd

Branch: refs/heads/beeline-cli
Commit: 652febcdab727f39c05d6b5b3c0a6526d254ee0e
Parents: cccaa55
Author: Vaibhav Gumashta <vgumas...@apache.org>
Authored: Tue May 5 10:37:51 2015 -0700
Committer: Vaibhav Gumashta <vgumas...@apache.org>
Committed: Tue May 5 10:37:51 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   2 +-
 pom.xml                                         |   5 +
 service/pom.xml                                 |   5 +
 .../cli/thrift/ThriftBinaryCLIService.java      |   1 -
 .../apache/hive/service/server/HiveServer2.java | 106 +++++++++++++++----
 5 files changed, 97 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/652febcd/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java 
b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index f04ce82..5d4dbea 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1410,7 +1410,7 @@ public class HiveConf extends Configuration {
         "The port of ZooKeeper servers to talk to.\n" +
         "If the list of Zookeeper servers specified in 
hive.zookeeper.quorum\n" +
         "does not contain port numbers, this value is used."),
-    HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", 
"600000ms",
+    HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", 
"1200000ms",
         new TimeValidator(TimeUnit.MILLISECONDS),
         "ZooKeeper client's session timeout (in milliseconds). The client is 
disconnected, and as a result, all locks released, \n" +
         "if a heartbeat is not sent in the timeout."),

http://git-wip-us.apache.org/repos/asf/hive/blob/652febcd/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index acacf81..1921b06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -512,6 +512,11 @@
         <version>${curator.version}</version>
       </dependency>
       <dependency>
+        <groupId>org.apache.curator</groupId>
+        <artifactId>curator-recipes</artifactId>
+        <version>${curator.version}</version>
+      </dependency>
+      <dependency>
         <groupId>org.codehaus.groovy</groupId>
         <artifactId>groovy-all</artifactId>
         <version>${groovy.version}</version>

http://git-wip-us.apache.org/repos/asf/hive/blob/652febcd/service/pom.xml
----------------------------------------------------------------------
diff --git a/service/pom.xml b/service/pom.xml
index c5815af..d8e3126 100644
--- a/service/pom.xml
+++ b/service/pom.xml
@@ -91,6 +91,11 @@
       <artifactId>curator-framework</artifactId>
       <version>${curator.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.curator</groupId>
+      <artifactId>curator-recipes</artifactId>
+      <version>${curator.version}</version>
+    </dependency>
     <!-- intra-project -->
     <dependency>
       <groupId>org.apache.hive</groupId>

http://git-wip-us.apache.org/repos/asf/hive/blob/652febcd/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
----------------------------------------------------------------------
diff --git 
a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
 
b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
index ca1eae6..6c9efba 100644
--- 
a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
+++ 
b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java
@@ -93,7 +93,6 @@ public class ThriftBinaryCLIService extends ThriftCLIService {
       // TCP Server
       server = new TThreadPoolServer(sargs);
       server.setServerEventHandler(serverEventHandler);
-      server.serve();
       String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() 
+ " on port "
           + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + 
" worker threads";
       LOG.info(msg);

http://git-wip-us.apache.org/repos/asf/hive/blob/652febcd/service/src/java/org/apache/hive/service/server/HiveServer2.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java 
b/service/src/java/org/apache/hive/service/server/HiveServer2.java
index dc2217f..58e8e49 100644
--- a/service/src/java/org/apache/hive/service/server/HiveServer2.java
+++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java
@@ -23,6 +23,8 @@ import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
@@ -35,6 +37,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.curator.framework.CuratorFramework;
 import org.apache.curator.framework.CuratorFrameworkFactory;
 import org.apache.curator.framework.api.ACLProvider;
+import org.apache.curator.framework.api.BackgroundCallback;
+import org.apache.curator.framework.api.CuratorEvent;
+import org.apache.curator.framework.api.CuratorEventType;
+import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode;
 import org.apache.curator.retry.ExponentialBackoffRetry;
 import org.apache.hadoop.hive.common.LogUtils;
 import org.apache.hadoop.hive.common.LogUtils.LogInitializationException;
@@ -67,9 +73,11 @@ import org.apache.zookeeper.data.ACL;
  */
 public class HiveServer2 extends CompositeService {
   private static final Log LOG = LogFactory.getLog(HiveServer2.class);
+  private static CountDownLatch deleteSignal;
 
   private CLIService cliService;
   private ThriftCLIService thriftCLIService;
+  private PersistentEphemeralNode znode;
   private String znodePath;
   private CuratorFramework zooKeeperClient;
   private boolean registeredWithZooKeeper = false;
@@ -151,12 +159,19 @@ public class HiveServer2 extends CompositeService {
     String instanceURI = getServerInstanceURI(hiveConf);
     byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8"));
     setUpZooKeeperAuth(hiveConf);
+    int sessionTimeout =
+        (int) 
hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT,
+            TimeUnit.MILLISECONDS);
+    int baseSleepTime =
+        (int) 
hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME,
+            TimeUnit.MILLISECONDS);
+    int maxRetries = 
hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
     // Create a CuratorFramework instance to be used as the ZooKeeper client
     // Use the zooKeeperAclProvider to create appropriate ACLs
     zooKeeperClient =
         CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
-            .aclProvider(zooKeeperAclProvider).retryPolicy(new 
ExponentialBackoffRetry(1000, 3))
-            .build();
+            .sessionTimeoutMs(sessionTimeout).aclProvider(zooKeeperAclProvider)
+            .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, 
maxRetries)).build();
     zooKeeperClient.start();
     // Create the parent znodes recursively; ignore if the parent already 
exists.
     try {
@@ -176,18 +191,28 @@ public class HiveServer2 extends CompositeService {
           ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
               + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + 
instanceURI + ";"
               + "version=" + HiveVersionInfo.getVersion() + ";" + "sequence=";
-      znodePath =
-          zooKeeperClient.create().creatingParentsIfNeeded()
-              .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(pathPrefix, 
znodeDataUTF8);
+      znode =
+          new PersistentEphemeralNode(zooKeeperClient,
+              PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, 
znodeDataUTF8);
+      znode.start();
+      // We'll wait for 120s for node creation
+      long znodeCreationTimeout = 120;
+      if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) 
{
+        throw new Exception("Max znode creation wait time: " + 
znodeCreationTimeout + "s exhausted");
+      }
       setRegisteredWithZooKeeper(true);
+      znodePath = znode.getActualPath();
       // Set a watch on the znode
       if (zooKeeperClient.checkExists().usingWatcher(new 
DeRegisterWatcher()).forPath(znodePath) == null) {
         // No node exists, throw exception
         throw new Exception("Unable to create znode for this HiveServer2 
instance on ZooKeeper.");
       }
       LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + 
instanceURI);
-    } catch (KeeperException e) {
+    } catch (Exception e) {
       LOG.fatal("Unable to create a znode for this server instance", e);
+      if (znode != null) {
+        znode.close();
+      }
       throw (e);
     }
   }
@@ -223,22 +248,33 @@ public class HiveServer2 extends CompositeService {
     @Override
     public void process(WatchedEvent event) {
       if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) {
-        HiveServer2.this.setRegisteredWithZooKeeper(false);
-        // If there are no more active client sessions, stop the server
-        if (cliService.getSessionManager().getOpenSessionCount() == 0) {
-          LOG.warn("This instance of HiveServer2 has been removed from the 
list of server "
-              + "instances available for dynamic service discovery. "
-              + "The last client session has ended - will shutdown now.");
-          HiveServer2.this.stop();
+        if (znode != null) {
+          try {
+            znode.close();
+            LOG.warn("This HiveServer2 instance is now de-registered from 
ZooKeeper. "
+                + "The server will be shut down after the last client sesssion 
completes.");
+          } catch (IOException e) {
+            LOG.error("Failed to close the persistent ephemeral znode", e);
+          } finally {
+            HiveServer2.this.setRegisteredWithZooKeeper(false);
+            // If there are no more active client sessions, stop the server
+            if (cliService.getSessionManager().getOpenSessionCount() == 0) {
+              LOG.warn("This instance of HiveServer2 has been removed from the 
list of server "
+                  + "instances available for dynamic service discovery. "
+                  + "The last client session has ended - will shutdown now.");
+              HiveServer2.this.stop();
+            }
+          }
         }
-        LOG.warn("This HiveServer2 instance is now de-registered from 
ZooKeeper. "
-            + "The server will be shut down after the last client sesssion 
completes.");
       }
     }
   }
 
   private void removeServerInstanceFromZooKeeper() throws Exception {
     setRegisteredWithZooKeeper(false);
+    if (znode != null) {
+      znode.close();
+    }
     zooKeeperClient.close();
     LOG.info("Server instance removed from ZooKeeper.");
   }
@@ -359,25 +395,53 @@ public class HiveServer2 extends CompositeService {
     HiveConf hiveConf = new HiveConf();
     String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf);
     String rootNamespace = 
hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE);
+    int baseSleepTime = (int) 
hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, 
TimeUnit.MILLISECONDS);
+    int maxRetries = 
hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES);
     CuratorFramework zooKeeperClient =
         CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble)
-            .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
+            .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, 
maxRetries)).build();
     zooKeeperClient.start();
     List<String> znodePaths =
         zooKeeperClient.getChildren().forPath(
             ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
+    List<String> znodePathsUpdated;
     // Now for each path that is for the given versionNumber, delete the znode 
from ZooKeeper
-    for (String znodePath : znodePaths) {
+    for (int i = 0; i < znodePaths.size(); i++) {
+      String znodePath = znodePaths.get(i);
+      deleteSignal = new CountDownLatch(1);
       if (znodePath.contains("version=" + versionNumber + ";")) {
-        LOG.info("Removing the znode: " + znodePath + " from ZooKeeper");
-        zooKeeperClient.delete().forPath(
+        String fullZnodePath =
             ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace
-                + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath);
+                + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath;
+        LOG.warn("Will attempt to remove the znode: " + fullZnodePath + " from 
ZooKeeper");
+        System.out.println("Will attempt to remove the znode: " + 
fullZnodePath + " from ZooKeeper");
+        zooKeeperClient.delete().guaranteed().inBackground(new 
DeleteCallBack())
+            .forPath(fullZnodePath);
+        // Wait for the delete to complete
+        deleteSignal.await();
+        // Get the updated path list
+        znodePathsUpdated =
+            zooKeeperClient.getChildren().forPath(
+                ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace);
+        // Gives a list of any new paths that may have been created to 
maintain the persistent ephemeral node
+        znodePathsUpdated.removeAll(znodePaths);
+        // Add the new paths to the znodes list. We'll try for their removal 
as well.
+        znodePaths.addAll(znodePathsUpdated);
       }
     }
     zooKeeperClient.close();
   }
 
+  private static class DeleteCallBack implements BackgroundCallback {
+    @Override
+    public void processResult(CuratorFramework zooKeeperClient, CuratorEvent 
event)
+        throws Exception {
+      if (event.getType() == CuratorEventType.DELETE) {
+        deleteSignal.countDown();
+      }
+    }
+  }
+
   public static void main(String[] args) {
     HiveConf.setLoadHiveServer2Config(true);
     try {
@@ -547,6 +611,8 @@ public class HiveServer2 extends CompositeService {
       } catch (Exception e) {
         LOG.fatal("Error deregistering HiveServer2 instances for version: " + 
versionNumber
             + " from ZooKeeper", e);
+        System.out.println("Error deregistering HiveServer2 instances for 
version: " + versionNumber
+            + " from ZooKeeper." + e);
         System.exit(-1);
       }
       System.exit(0);

Reply via email to