YARN-5623. Apply SLIDER-1166 to yarn-native-services branch. Contributed by Gour Saha
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cc7b200b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cc7b200b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cc7b200b Branch: refs/heads/yarn-native-services Commit: cc7b200b37a099cd1503c050968abb0541cf0536 Parents: cb61fe3 Author: Jian He <jia...@apache.org> Authored: Mon Sep 12 10:44:39 2016 +0800 Committer: Jian He <jia...@apache.org> Committed: Wed Dec 7 13:00:06 2016 -0800 ---------------------------------------------------------------------- .../org/apache/slider/client/SliderClient.java | 9 ++++-- .../apache/slider/core/zk/ZKIntegration.java | 34 +++++++++++++++++--- 2 files changed, 36 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc7b200b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java index 5096bb7..fe4f1d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java @@ -604,11 +604,14 @@ public class SliderClient extends AbstractSliderLaunchedService implements RunSe BlockingZKWatcher watcher = new BlockingZKWatcher(); client = ZKIntegration.newInstance(registryQuorum, user, clusterName, true, false, watcher, ZKIntegration.SESSION_TIMEOUT); - client.init(); - watcher.waitForZKConnection(2 * 1000); + boolean fromCache = client.init(); + if (!fromCache) { + watcher.waitForZKConnection(2 * 1000); + } } catch (InterruptedException e) { client = null; - log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e); + log.warn("Interrupted - unable to connect to zookeeper quorum {}", + registryQuorum, e); } catch (IOException e) { log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc7b200b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java ---------------------------------------------------------------------- diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java index ca41e4b..6ed58d5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java @@ -33,6 +33,8 @@ import java.io.Closeable; import java.io.IOException; import java.util.ArrayList; import java.util.List; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicBoolean; @@ -65,6 +67,8 @@ public class ZKIntegration implements Watcher, Closeable { private final String clustername; private final String userPath; private int sessionTimeout = SESSION_TIMEOUT; + private static final Map<String, ZooKeeper> ZK_SESSIONS = new HashMap<>(); + /** flag to set to indicate that the user path should be created if it is not already there @@ -93,10 +97,32 @@ public class ZKIntegration implements Watcher, Closeable { this.userPath = mkSliderUserPath(username); } - public void init() throws IOException { - assert zookeeper == null; - log.debug("Binding ZK client to {}", zkConnection); - zookeeper = new ZooKeeper(zkConnection, sessionTimeout, this, canBeReadOnly); + /** + * Returns true only if an active ZK session is available and retrieved from + * cache, false when it has to create a new one. + * + * @return true if from cache, false when new session created + * @throws IOException + */ + public synchronized boolean init() throws IOException { + if (zookeeper != null && getAlive()) { + return true; + } + + synchronized (ZK_SESSIONS) { + if (ZK_SESSIONS.containsKey(zkConnection)) { + zookeeper = ZK_SESSIONS.get(zkConnection); + } + if (zookeeper == null || !getAlive()) { + log.info("Binding ZK client to {}", zkConnection); + zookeeper = new ZooKeeper(zkConnection, sessionTimeout, this, + canBeReadOnly); + ZK_SESSIONS.put(zkConnection, zookeeper); + return false; + } else { + return true; + } + } } /** --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org