Changing nimbus discovery to use thrift API instead of using zookeeper.

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7cae523e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7cae523e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7cae523e

Branch: refs/heads/0.11.x-branch
Commit: 7cae523e3c8e86688a371da9bae809cb4c244358
Parents: aa24375
Author: Parth Brahmbhatt <[email protected]>
Authored: Thu Feb 12 14:57:26 2015 -0800
Committer: Parth Brahmbhatt <[email protected]>
Committed: Thu Feb 12 14:57:26 2015 -0800

----------------------------------------------------------------------
 conf/defaults.yaml                              |  1 +
 storm-core/src/clj/backtype/storm/thrift.clj    | 13 +++---
 storm-core/src/jvm/backtype/storm/Config.java   |  6 +++
 .../jvm/backtype/storm/utils/NimbusClient.java  | 42 +++++++++++++-------
 4 files changed, 40 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7cae523e/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index fbea948..2d8b62c 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -51,6 +51,7 @@ storm.meta.serialization.delegate: 
"backtype.storm.serialization.DefaultSerializ
 storm.codedistributor.class: 
"backtype.storm.codedistributor.LocalFileSystemCodeDistributor"
 
 ### nimbus.* configs are for the master
+nimbus.seeds : ["localhost:6627"]
 nimbus.thrift.port: 6627
 nimbus.thrift.threads: 64
 nimbus.thrift.max_buffer_size: 1048576

http://git-wip-us.apache.org/repos/asf/storm/blob/7cae523e/storm-core/src/clj/backtype/storm/thrift.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/backtype/storm/thrift.clj 
b/storm-core/src/clj/backtype/storm/thrift.clj
index 2b860b3..0cac7b8 100644
--- a/storm-core/src/clj/backtype/storm/thrift.clj
+++ b/storm-core/src/clj/backtype/storm/thrift.clj
@@ -84,13 +84,12 @@
 (defmacro with-configured-nimbus-connection
   [client-sym & body]
   `(let [conf# (read-storm-config)
-         zk-leader-elector# (zk-leader-elector conf#)
-         leader-nimbus# (.getLeader zk-leader-elector#)
-         host# (.getHost leader-nimbus#)
-         port# (.getPort leader-nimbus#)
-         no-op# (.close zk-leader-elector#)]
-     (with-nimbus-connection [~client-sym host# port#]
-       ~@body )))
+         ~client-sym (NimbusClient/getConfiguredClient conf#)
+         conn# (.transport ~client-sym)
+         ]
+     (try
+       ~@body
+     (finally (.close conn#)))))
 
 (defn direct-output-fields
   [fields]

http://git-wip-us.apache.org/repos/asf/storm/blob/7cae523e/storm-core/src/jvm/backtype/storm/Config.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/Config.java 
b/storm-core/src/jvm/backtype/storm/Config.java
index c0668ee..106f1f3 100644
--- a/storm-core/src/jvm/backtype/storm/Config.java
+++ b/storm-core/src/jvm/backtype/storm/Config.java
@@ -307,6 +307,12 @@ public class Config extends HashMap<String, Object> {
     public static final Object NIMBUS_THRIFT_TRANSPORT_PLUGIN_SCHEMA = 
String.class;
 
     /**
+     * List of seed nimbus hosts:port to use for leader nimbus discovery.
+     */
+    public static final String NIMBUS_SEEDS = "nimbus.seeds";
+    public static final Object NIMBUS_SEEDS_SCHEMA = 
ConfigValidation.StringsValidator;
+
+    /**
      * Which port the Thrift interface of Nimbus should run on. Clients should
      * connect to this port to upload jars and submit topologies.
      */

http://git-wip-us.apache.org/repos/asf/storm/blob/7cae523e/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
----------------------------------------------------------------------
diff --git a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java 
b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
index da10a4f..e4222e4 100644
--- a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
+++ b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java
@@ -18,17 +18,22 @@
 package backtype.storm.utils;
 
 
+import backtype.storm.Config;
+import backtype.storm.generated.ClusterSummary;
 import backtype.storm.generated.Nimbus;
+import backtype.storm.generated.NimbusSummary;
 import backtype.storm.nimbus.ILeaderElector;
 import backtype.storm.nimbus.NimbusInfo;
 import backtype.storm.security.auth.ThriftClient;
 import backtype.storm.security.auth.ThriftConnectionType;
 import clojure.lang.IFn;
 import clojure.lang.PersistentArrayMap;
+import com.google.common.base.Splitter;
 import org.apache.thrift.transport.TTransportException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.List;
 import java.util.Map;
 
 public class NimbusClient extends ThriftClient {
@@ -36,23 +41,30 @@ public class NimbusClient extends ThriftClient {
     private static final Logger LOG = 
LoggerFactory.getLogger(NimbusClient.class);
 
     public static NimbusClient getConfiguredClient(Map conf) {
-        ILeaderElector zkLeaderElector = null;
-        try {
-            IFn zkLeaderElectorFn = 
Utils.loadClojureFn("backtype.storm.zookeeper", "zk-leader-elector");
-            zkLeaderElector = (ILeaderElector) 
zkLeaderElectorFn.invoke(PersistentArrayMap.create(conf));
-            NimbusInfo leaderInfo = zkLeaderElector.getLeader();
-            String nimbusHost = leaderInfo.getHost();
-            int nimbusPort = leaderInfo.getPort();
-            return new NimbusClient(conf, nimbusHost, nimbusPort);
-        } catch (TTransportException ex) {
-            throw new RuntimeException(ex);
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        } finally {
-            if(zkLeaderElector != null) {
-                zkLeaderElector.close();
+        List<String> seeds = (List<String>) conf.get(Config.NIMBUS_SEEDS);
+        for(String seed : seeds) {
+            String[] split = seed.split(":");
+            String host = split[0];
+            int port = Integer.parseInt(split[1]);
+            try {
+                NimbusClient client = new NimbusClient(conf,host,port);
+                ClusterSummary clusterInfo = 
client.getClient().getClusterInfo();
+                List<NimbusSummary> nimbuses = clusterInfo.get_nimbuses();
+                if(nimbuses != null) {
+                    for(NimbusSummary nimbusSummary : nimbuses) {
+                        if(nimbusSummary.is_isLeader()) {
+                            return new NimbusClient(conf, 
nimbusSummary.get_host(), nimbusSummary.get_port());
+                        }
+                    }
+                }
+                throw new RuntimeException("Found nimbuses " + nimbuses + " 
none of which is elected as leader, please try " +
+                        "again after some time.");
+            } catch (Exception e) {
+                LOG.warn("Ignoring exception while trying to get leader nimbus 
info from {}", seed);
             }
         }
+        throw new RuntimeException("Could not find leader nimbus from seed 
hosts " + seeds +". " +
+                "Did you specify a valid list of nimbus host:port for config " 
+ Config.NIMBUS_SEEDS);
     }
 
     public NimbusClient(Map conf, String host, int port) throws 
TTransportException {

Reply via email to