HIVE-8890: HiveServer2 dynamic service discovery: use persistent ephemeral nodes curator recipe (Vaibhav Gumashta reviewed by Thejas Nair)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/652febcd Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/652febcd Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/652febcd Branch: refs/heads/beeline-cli Commit: 652febcdab727f39c05d6b5b3c0a6526d254ee0e Parents: cccaa55 Author: Vaibhav Gumashta <vgumas...@apache.org> Authored: Tue May 5 10:37:51 2015 -0700 Committer: Vaibhav Gumashta <vgumas...@apache.org> Committed: Tue May 5 10:37:51 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 2 +- pom.xml | 5 + service/pom.xml | 5 + .../cli/thrift/ThriftBinaryCLIService.java | 1 - .../apache/hive/service/server/HiveServer2.java | 106 +++++++++++++++---- 5 files changed, 97 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/652febcd/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index f04ce82..5d4dbea 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1410,7 +1410,7 @@ public class HiveConf extends Configuration { "The port of ZooKeeper servers to talk to.\n" + "If the list of Zookeeper servers specified in hive.zookeeper.quorum\n" + "does not contain port numbers, this value is used."), - HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", "600000ms", + HIVE_ZOOKEEPER_SESSION_TIMEOUT("hive.zookeeper.session.timeout", "1200000ms", new TimeValidator(TimeUnit.MILLISECONDS), "ZooKeeper client's session timeout (in milliseconds). The client is disconnected, and as a result, all locks released, \n" + "if a heartbeat is not sent in the timeout."), http://git-wip-us.apache.org/repos/asf/hive/blob/652febcd/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index acacf81..1921b06 100644 --- a/pom.xml +++ b/pom.xml @@ -512,6 +512,11 @@ <version>${curator.version}</version> </dependency> <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + <version>${curator.version}</version> + </dependency> + <dependency> <groupId>org.codehaus.groovy</groupId> <artifactId>groovy-all</artifactId> <version>${groovy.version}</version> http://git-wip-us.apache.org/repos/asf/hive/blob/652febcd/service/pom.xml ---------------------------------------------------------------------- diff --git a/service/pom.xml b/service/pom.xml index c5815af..d8e3126 100644 --- a/service/pom.xml +++ b/service/pom.xml @@ -91,6 +91,11 @@ <artifactId>curator-framework</artifactId> <version>${curator.version}</version> </dependency> + <dependency> + <groupId>org.apache.curator</groupId> + <artifactId>curator-recipes</artifactId> + <version>${curator.version}</version> + </dependency> <!-- intra-project --> <dependency> <groupId>org.apache.hive</groupId> http://git-wip-us.apache.org/repos/asf/hive/blob/652febcd/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java index ca1eae6..6c9efba 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftBinaryCLIService.java @@ -93,7 +93,6 @@ public class ThriftBinaryCLIService extends ThriftCLIService { // TCP Server server = new TThreadPoolServer(sargs); server.setServerEventHandler(serverEventHandler); - server.serve(); String msg = "Starting " + ThriftBinaryCLIService.class.getSimpleName() + " on port " + portNum + " with " + minWorkerThreads + "..." + maxWorkerThreads + " worker threads"; LOG.info(msg); http://git-wip-us.apache.org/repos/asf/hive/blob/652febcd/service/src/java/org/apache/hive/service/server/HiveServer2.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index dc2217f..58e8e49 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -23,6 +23,8 @@ import java.nio.charset.Charset; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; @@ -35,6 +37,10 @@ import org.apache.commons.logging.LogFactory; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.api.ACLProvider; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.apache.curator.framework.api.CuratorEventType; +import org.apache.curator.framework.recipes.nodes.PersistentEphemeralNode; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.hadoop.hive.common.LogUtils; import org.apache.hadoop.hive.common.LogUtils.LogInitializationException; @@ -67,9 +73,11 @@ import org.apache.zookeeper.data.ACL; */ public class HiveServer2 extends CompositeService { private static final Log LOG = LogFactory.getLog(HiveServer2.class); + private static CountDownLatch deleteSignal; private CLIService cliService; private ThriftCLIService thriftCLIService; + private PersistentEphemeralNode znode; private String znodePath; private CuratorFramework zooKeeperClient; private boolean registeredWithZooKeeper = false; @@ -151,12 +159,19 @@ public class HiveServer2 extends CompositeService { String instanceURI = getServerInstanceURI(hiveConf); byte[] znodeDataUTF8 = instanceURI.getBytes(Charset.forName("UTF-8")); setUpZooKeeperAuth(hiveConf); + int sessionTimeout = + (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_SESSION_TIMEOUT, + TimeUnit.MILLISECONDS); + int baseSleepTime = + (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, + TimeUnit.MILLISECONDS); + int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES); // Create a CuratorFramework instance to be used as the ZooKeeper client // Use the zooKeeperAclProvider to create appropriate ACLs zooKeeperClient = CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble) - .aclProvider(zooKeeperAclProvider).retryPolicy(new ExponentialBackoffRetry(1000, 3)) - .build(); + .sessionTimeoutMs(sessionTimeout).aclProvider(zooKeeperAclProvider) + .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build(); zooKeeperClient.start(); // Create the parent znodes recursively; ignore if the parent already exists. try { @@ -176,18 +191,28 @@ public class HiveServer2 extends CompositeService { ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + "serverUri=" + instanceURI + ";" + "version=" + HiveVersionInfo.getVersion() + ";" + "sequence="; - znodePath = - zooKeeperClient.create().creatingParentsIfNeeded() - .withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath(pathPrefix, znodeDataUTF8); + znode = + new PersistentEphemeralNode(zooKeeperClient, + PersistentEphemeralNode.Mode.EPHEMERAL_SEQUENTIAL, pathPrefix, znodeDataUTF8); + znode.start(); + // We'll wait for 120s for node creation + long znodeCreationTimeout = 120; + if (!znode.waitForInitialCreate(znodeCreationTimeout, TimeUnit.SECONDS)) { + throw new Exception("Max znode creation wait time: " + znodeCreationTimeout + "s exhausted"); + } setRegisteredWithZooKeeper(true); + znodePath = znode.getActualPath(); // Set a watch on the znode if (zooKeeperClient.checkExists().usingWatcher(new DeRegisterWatcher()).forPath(znodePath) == null) { // No node exists, throw exception throw new Exception("Unable to create znode for this HiveServer2 instance on ZooKeeper."); } LOG.info("Created a znode on ZooKeeper for HiveServer2 uri: " + instanceURI); - } catch (KeeperException e) { + } catch (Exception e) { LOG.fatal("Unable to create a znode for this server instance", e); + if (znode != null) { + znode.close(); + } throw (e); } } @@ -223,22 +248,33 @@ public class HiveServer2 extends CompositeService { @Override public void process(WatchedEvent event) { if (event.getType().equals(Watcher.Event.EventType.NodeDeleted)) { - HiveServer2.this.setRegisteredWithZooKeeper(false); - // If there are no more active client sessions, stop the server - if (cliService.getSessionManager().getOpenSessionCount() == 0) { - LOG.warn("This instance of HiveServer2 has been removed from the list of server " - + "instances available for dynamic service discovery. " - + "The last client session has ended - will shutdown now."); - HiveServer2.this.stop(); + if (znode != null) { + try { + znode.close(); + LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. " + + "The server will be shut down after the last client sesssion completes."); + } catch (IOException e) { + LOG.error("Failed to close the persistent ephemeral znode", e); + } finally { + HiveServer2.this.setRegisteredWithZooKeeper(false); + // If there are no more active client sessions, stop the server + if (cliService.getSessionManager().getOpenSessionCount() == 0) { + LOG.warn("This instance of HiveServer2 has been removed from the list of server " + + "instances available for dynamic service discovery. " + + "The last client session has ended - will shutdown now."); + HiveServer2.this.stop(); + } + } } - LOG.warn("This HiveServer2 instance is now de-registered from ZooKeeper. " - + "The server will be shut down after the last client sesssion completes."); } } } private void removeServerInstanceFromZooKeeper() throws Exception { setRegisteredWithZooKeeper(false); + if (znode != null) { + znode.close(); + } zooKeeperClient.close(); LOG.info("Server instance removed from ZooKeeper."); } @@ -359,25 +395,53 @@ public class HiveServer2 extends CompositeService { HiveConf hiveConf = new HiveConf(); String zooKeeperEnsemble = ZooKeeperHiveHelper.getQuorumServers(hiveConf); String rootNamespace = hiveConf.getVar(HiveConf.ConfVars.HIVE_SERVER2_ZOOKEEPER_NAMESPACE); + int baseSleepTime = (int) hiveConf.getTimeVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_BASESLEEPTIME, TimeUnit.MILLISECONDS); + int maxRetries = hiveConf.getIntVar(HiveConf.ConfVars.HIVE_ZOOKEEPER_CONNECTION_MAX_RETRIES); CuratorFramework zooKeeperClient = CuratorFrameworkFactory.builder().connectString(zooKeeperEnsemble) - .retryPolicy(new ExponentialBackoffRetry(1000, 3)).build(); + .retryPolicy(new ExponentialBackoffRetry(baseSleepTime, maxRetries)).build(); zooKeeperClient.start(); List<String> znodePaths = zooKeeperClient.getChildren().forPath( ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace); + List<String> znodePathsUpdated; // Now for each path that is for the given versionNumber, delete the znode from ZooKeeper - for (String znodePath : znodePaths) { + for (int i = 0; i < znodePaths.size(); i++) { + String znodePath = znodePaths.get(i); + deleteSignal = new CountDownLatch(1); if (znodePath.contains("version=" + versionNumber + ";")) { - LOG.info("Removing the znode: " + znodePath + " from ZooKeeper"); - zooKeeperClient.delete().forPath( + String fullZnodePath = ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace - + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath); + + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + znodePath; + LOG.warn("Will attempt to remove the znode: " + fullZnodePath + " from ZooKeeper"); + System.out.println("Will attempt to remove the znode: " + fullZnodePath + " from ZooKeeper"); + zooKeeperClient.delete().guaranteed().inBackground(new DeleteCallBack()) + .forPath(fullZnodePath); + // Wait for the delete to complete + deleteSignal.await(); + // Get the updated path list + znodePathsUpdated = + zooKeeperClient.getChildren().forPath( + ZooKeeperHiveHelper.ZOOKEEPER_PATH_SEPARATOR + rootNamespace); + // Gives a list of any new paths that may have been created to maintain the persistent ephemeral node + znodePathsUpdated.removeAll(znodePaths); + // Add the new paths to the znodes list. We'll try for their removal as well. + znodePaths.addAll(znodePathsUpdated); } } zooKeeperClient.close(); } + private static class DeleteCallBack implements BackgroundCallback { + @Override + public void processResult(CuratorFramework zooKeeperClient, CuratorEvent event) + throws Exception { + if (event.getType() == CuratorEventType.DELETE) { + deleteSignal.countDown(); + } + } + } + public static void main(String[] args) { HiveConf.setLoadHiveServer2Config(true); try { @@ -547,6 +611,8 @@ public class HiveServer2 extends CompositeService { } catch (Exception e) { LOG.fatal("Error deregistering HiveServer2 instances for version: " + versionNumber + " from ZooKeeper", e); + System.out.println("Error deregistering HiveServer2 instances for version: " + versionNumber + + " from ZooKeeper." + e); System.exit(-1); } System.exit(0);