YARN-5623. Apply SLIDER-1166 to yarn-native-services branch. Contributed by 
Gour Saha


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/cc7b200b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/cc7b200b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/cc7b200b

Branch: refs/heads/yarn-native-services
Commit: cc7b200b37a099cd1503c050968abb0541cf0536
Parents: cb61fe3
Author: Jian He <jia...@apache.org>
Authored: Mon Sep 12 10:44:39 2016 +0800
Committer: Jian He <jia...@apache.org>
Committed: Wed Dec 7 13:00:06 2016 -0800

----------------------------------------------------------------------
 .../org/apache/slider/client/SliderClient.java  |  9 ++++--
 .../apache/slider/core/zk/ZKIntegration.java    | 34 +++++++++++++++++---
 2 files changed, 36 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc7b200b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
index 5096bb7..fe4f1d2 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/client/SliderClient.java
@@ -604,11 +604,14 @@ public class SliderClient extends 
AbstractSliderLaunchedService implements RunSe
       BlockingZKWatcher watcher = new BlockingZKWatcher();
       client = ZKIntegration.newInstance(registryQuorum, user, clusterName, 
true, false, watcher,
           ZKIntegration.SESSION_TIMEOUT);
-      client.init();
-      watcher.waitForZKConnection(2 * 1000);
+      boolean fromCache = client.init();
+      if (!fromCache) {
+        watcher.waitForZKConnection(2 * 1000);
+      }
     } catch (InterruptedException e) {
       client = null;
-      log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
+      log.warn("Interrupted - unable to connect to zookeeper quorum {}",
+          registryQuorum, e);
     } catch (IOException e) {
       log.warn("Unable to connect to zookeeper quorum {}", registryQuorum, e);
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/cc7b200b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
index ca41e4b..6ed58d5 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-slider/hadoop-yarn-slider-core/src/main/java/org/apache/slider/core/zk/ZKIntegration.java
@@ -33,6 +33,8 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 
@@ -65,6 +67,8 @@ public class ZKIntegration implements Watcher, Closeable {
   private final String clustername;
   private final String userPath;
   private int sessionTimeout = SESSION_TIMEOUT;
+  private static final Map<String, ZooKeeper> ZK_SESSIONS = new HashMap<>();
+
 /**
  flag to set to indicate that the user path should be created if
  it is not already there
@@ -93,10 +97,32 @@ public class ZKIntegration implements Watcher, Closeable {
     this.userPath = mkSliderUserPath(username);
   }
 
-  public void init() throws IOException {
-    assert zookeeper == null;
-    log.debug("Binding ZK client to {}", zkConnection);
-    zookeeper = new ZooKeeper(zkConnection, sessionTimeout, this, 
canBeReadOnly);
+  /**
+   * Returns true only if an active ZK session is available and retrieved from
+   * cache, false when it has to create a new one.
+   *
+   * @return true if from cache, false when new session created
+   * @throws IOException
+   */
+  public synchronized boolean init() throws IOException {
+    if (zookeeper != null && getAlive()) {
+      return true;
+    }
+
+    synchronized (ZK_SESSIONS) {
+      if (ZK_SESSIONS.containsKey(zkConnection)) {
+        zookeeper = ZK_SESSIONS.get(zkConnection);
+      }
+      if (zookeeper == null || !getAlive()) {
+        log.info("Binding ZK client to {}", zkConnection);
+        zookeeper = new ZooKeeper(zkConnection, sessionTimeout, this,
+            canBeReadOnly);
+        ZK_SESSIONS.put(zkConnection, zookeeper);
+        return false;
+      } else {
+        return true;
+      }
+    }
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to