HIVE-15810 : llapstatus should wait for ZK node to become available when in wait mode (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b978c074 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b978c074 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b978c074 Branch: refs/heads/hive-14535 Commit: b978c074deda11f3bf70a607e25ea35ef8b914af Parents: da6f581 Author: Sergey Shelukhin <[email protected]> Authored: Mon Feb 6 19:00:31 2017 -0800 Committer: Sergey Shelukhin <[email protected]> Committed: Mon Feb 6 19:00:31 2017 -0800 ---------------------------------------------------------------------- .../hive/llap/registry/ServiceRegistry.java | 30 ++++------- .../registry/impl/LlapFixedRegistryImpl.java | 2 +- .../llap/registry/impl/LlapRegistryService.java | 6 ++- .../impl/LlapZookeeperRegistryImpl.java | 55 ++++++++++++++------ .../hive/llap/cli/LlapStatusServiceDriver.java | 11 ++-- .../java/org/apache/hive/http/LlapServlet.java | 2 +- 6 files changed, 60 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b978c074/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java index 8d7fcb7..5739d72 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/ServiceRegistry.java @@ -24,54 +24,42 @@ public interface ServiceRegistry { /** * Start the service registry - * - * @throws IOException */ - public void start() throws IOException; + void start() throws IOException; /** * Stop the service registry - * - * @throws IOException */ - public void stop() throws IOException; + void stop() throws IOException; /** * Register the current instance - the implementation takes care of the endpoints to register. - * * @return self identifying name - * - * @throws IOException */ - public String register() throws IOException; + String register() throws IOException; /** * Remove the current registration cleanly (implementation defined cleanup) - * - * @throws IOException */ - public void unregister() throws IOException; + void unregister() throws IOException; /** * Client API to get the list of instances registered via the current registry key. - * * @param component - * @return - * @throws IOException + * @param clusterReadyTimeoutMs The time to wait for the cluster to be ready, if it's not + * started yet. 0 means do not wait. */ - public ServiceInstanceSet getInstances(String component) throws IOException; + ServiceInstanceSet getInstances(String component, long clusterReadyTimeoutMs) throws IOException; /** * Adds state change listeners for service instances. - * * @param listener - state change listener - * @throws IOException */ - public void registerStateChangeListener(ServiceInstanceStateChangeListener listener) + void registerStateChangeListener(ServiceInstanceStateChangeListener listener) throws IOException; /** * @return The application ID of the LLAP cluster. */ ApplicationId getApplicationId() throws IOException; -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hive/blob/b978c074/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java index 45ac5bf..ebc32a1 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapFixedRegistryImpl.java @@ -258,7 +258,7 @@ public class LlapFixedRegistryImpl implements ServiceRegistry { } @Override - public ServiceInstanceSet getInstances(String component) throws IOException { + public ServiceInstanceSet getInstances(String component, long timeoutMs) throws IOException { return new FixedServiceInstanceSet(); } http://git-wip-us.apache.org/repos/asf/hive/blob/b978c074/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java index 2a5afac..5a94db9 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapRegistryService.java @@ -128,7 +128,11 @@ public class LlapRegistryService extends AbstractService { } public ServiceInstanceSet getInstances() throws IOException { - return this.registry.getInstances("LLAP"); + return getInstances(0); + } + + public ServiceInstanceSet getInstances(long clusterReadyTimeoutMs) throws IOException { + return this.registry.getInstances("LLAP", clusterReadyTimeoutMs); } public void registerStateChangeListener(ServiceInstanceStateChangeListener listener) http://git-wip-us.apache.org/repos/asf/hive/blob/b978c074/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java ---------------------------------------------------------------------- diff --git a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java index ac48b67..7ae80b0 100644 --- a/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java +++ b/llap-client/src/java/org/apache/hadoop/hive/llap/registry/impl/LlapZookeeperRegistryImpl.java @@ -32,6 +32,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; @@ -76,6 +77,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.KeeperException.InvalidACLException; import org.apache.zookeeper.client.ZooKeeperSaslClient; import org.apache.zookeeper.data.ACL; import org.apache.zookeeper.data.Id; @@ -781,8 +783,9 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry { } @Override - public ServiceInstanceSet getInstances(String component) throws IOException { - checkPathChildrenCache(); + public ServiceInstanceSet getInstances( + String component, long clusterReadyTimeoutMs) throws IOException { + checkPathChildrenCache(clusterReadyTimeoutMs); // lazily create instances if (instances == null) { @@ -793,7 +796,7 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry { @Override public ApplicationId getApplicationId() throws IOException { - getInstances(null); + getInstances("LLAP", 0); return instances.getApplicationId(); } @@ -801,28 +804,46 @@ public class LlapZookeeperRegistryImpl implements ServiceRegistry { public synchronized void registerStateChangeListener( final ServiceInstanceStateChangeListener listener) throws IOException { - checkPathChildrenCache(); + checkPathChildrenCache(0); this.stateChangeListeners.add(listener); } - private synchronized void checkPathChildrenCache() throws IOException { + private synchronized void checkPathChildrenCache(long clusterReadyTimeoutMs) throws IOException { Preconditions.checkArgument(zooKeeperClient != null && - zooKeeperClient.getState() == CuratorFrameworkState.STARTED, - "client is not started"); - + zooKeeperClient.getState() == CuratorFrameworkState.STARTED, "client is not started"); // lazily create PathChildrenCache - if (instancesCache == null) { - this.instancesCache = new PathChildrenCache(zooKeeperClient, workersPath, true); - instancesCache.getListenable().addListener(new InstanceStateChangeListener(), - Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() - .setDaemon(true) - .setNameFormat("StateChangeNotificationHandler") - .build())); + if (instancesCache != null) return; + ExecutorService tp = Executors.newFixedThreadPool(1, new ThreadFactoryBuilder() + .setDaemon(true).setNameFormat("StateChangeNotificationHandler").build()); + long startTimeNs = System.nanoTime(), deltaNs = clusterReadyTimeoutMs * 1000000L; + long sleepTimeMs = Math.min(16, clusterReadyTimeoutMs); + while (true) { + PathChildrenCache instancesCache = new PathChildrenCache(zooKeeperClient, workersPath, true); + instancesCache.getListenable().addListener(new InstanceStateChangeListener(), tp); try { - this.instancesCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); + instancesCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); + this.instancesCache = instancesCache; + break; + } catch (InvalidACLException e) { + // PathChildrenCache tried to mkdir when the znode wasn't there, and failed. + CloseableUtils.closeQuietly(instancesCache); + long elapsedNs = System.nanoTime() - startTimeNs; + if (deltaNs == 0 || deltaNs <= elapsedNs) { + LOG.error("Unable to start curator PathChildrenCache", e); + throw new IOException(e); + } + LOG.warn("The cluster is not started yet (InvalidACL); will retry"); + try { + Thread.sleep(Math.min(sleepTimeMs, (deltaNs - elapsedNs)/1000000L)); + } catch (InterruptedException e1) { + LOG.error("Interrupted while retrying the PathChildrenCache startup"); + throw new IOException(e1); + } + sleepTimeMs = sleepTimeMs << 1; } catch (Exception e) { - LOG.error("Unable to start curator PathChildrenCache. Exception: {}", e); + CloseableUtils.closeQuietly(instancesCache); + LOG.error("Unable to start curator PathChildrenCache", e); throw new IOException(e); } } http://git-wip-us.apache.org/repos/asf/hive/blob/b978c074/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java index 39d542b..b30f837 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapStatusServiceDriver.java @@ -174,7 +174,7 @@ public class LlapStatusServiceDriver { } } - public int run(LlapStatusOptions options) { + public int run(LlapStatusOptions options, long watchTimeoutMs) { appStatusBuilder = new AppStatusBuilder(); try { if (appName == null) { @@ -253,7 +253,7 @@ public class LlapStatusServiceDriver { return ret.getInt(); } else { try { - ret = populateAppStatusFromLlapRegistry(appStatusBuilder); + ret = populateAppStatusFromLlapRegistry(appStatusBuilder, watchTimeoutMs); } catch (LlapStatusCliException e) { logError(e); return e.getExitCode().getInt(); @@ -481,7 +481,8 @@ public class LlapStatusServiceDriver { * @return an ExitCode. An ExitCode other than ExitCode.SUCCESS implies future progress not possible * @throws LlapStatusCliException */ - private ExitCode populateAppStatusFromLlapRegistry(AppStatusBuilder appStatusBuilder) throws + private ExitCode populateAppStatusFromLlapRegistry( + AppStatusBuilder appStatusBuilder, long watchTimeoutMs) throws LlapStatusCliException { if (llapRegistry == null) { @@ -495,7 +496,7 @@ public class LlapStatusServiceDriver { Collection<ServiceInstance> serviceInstances; try { - serviceInstances = llapRegistry.getInstances().getAll(); + serviceInstances = llapRegistry.getInstances(watchTimeoutMs).getAll(); } catch (IOException e) { throw new LlapStatusCliException(ExitCode.LLAP_REGISTRY_ERROR, "Failed to get instances from llap registry", e); } @@ -964,7 +965,7 @@ public class LlapStatusServiceDriver { numAttempts, watchMode, new DecimalFormat("#.###").format(runningNodesThreshold)); while (numAttempts > 0) { try { - ret = statusServiceDriver.run(options); + ret = statusServiceDriver.run(options, watchMode ? watchTimeout : 0); if (ret == ExitCode.SUCCESS.getInt()) { if (watchMode) { currentState = statusServiceDriver.appStatusBuilder.state; http://git-wip-us.apache.org/repos/asf/hive/blob/b978c074/service/src/java/org/apache/hive/http/LlapServlet.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/http/LlapServlet.java b/service/src/java/org/apache/hive/http/LlapServlet.java index 993766b..1546522 100644 --- a/service/src/java/org/apache/hive/http/LlapServlet.java +++ b/service/src/java/org/apache/hive/http/LlapServlet.java @@ -98,7 +98,7 @@ public class LlapServlet extends HttpServlet { LOG.info("Retrieving info for cluster: " + clusterName); LlapStatusServiceDriver driver = new LlapStatusServiceDriver(); - int ret = driver.run(new LlapStatusOptionsProcessor.LlapStatusOptions(clusterName)); + int ret = driver.run(new LlapStatusOptionsProcessor.LlapStatusOptions(clusterName), 0); if (ret == LlapStatusServiceDriver.ExitCode.SUCCESS.getInt()) { driver.outputJson(writer); }
