Changing nimbus discovery to use thrift API instead of using zookeeper.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7cae523e Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7cae523e Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7cae523e Branch: refs/heads/0.11.x-branch Commit: 7cae523e3c8e86688a371da9bae809cb4c244358 Parents: aa24375 Author: Parth Brahmbhatt <[email protected]> Authored: Thu Feb 12 14:57:26 2015 -0800 Committer: Parth Brahmbhatt <[email protected]> Committed: Thu Feb 12 14:57:26 2015 -0800 ---------------------------------------------------------------------- conf/defaults.yaml | 1 + storm-core/src/clj/backtype/storm/thrift.clj | 13 +++--- storm-core/src/jvm/backtype/storm/Config.java | 6 +++ .../jvm/backtype/storm/utils/NimbusClient.java | 42 +++++++++++++------- 4 files changed, 40 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/7cae523e/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index fbea948..2d8b62c 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -51,6 +51,7 @@ storm.meta.serialization.delegate: "backtype.storm.serialization.DefaultSerializ storm.codedistributor.class: "backtype.storm.codedistributor.LocalFileSystemCodeDistributor" ### nimbus.* configs are for the master +nimbus.seeds : ["localhost:6627"] nimbus.thrift.port: 6627 nimbus.thrift.threads: 64 nimbus.thrift.max_buffer_size: 1048576 http://git-wip-us.apache.org/repos/asf/storm/blob/7cae523e/storm-core/src/clj/backtype/storm/thrift.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/thrift.clj b/storm-core/src/clj/backtype/storm/thrift.clj index 2b860b3..0cac7b8 100644 --- a/storm-core/src/clj/backtype/storm/thrift.clj +++ b/storm-core/src/clj/backtype/storm/thrift.clj @@ -84,13 +84,12 @@ (defmacro with-configured-nimbus-connection [client-sym & body] `(let [conf# (read-storm-config) - zk-leader-elector# (zk-leader-elector conf#) - leader-nimbus# (.getLeader zk-leader-elector#) - host# (.getHost leader-nimbus#) - port# (.getPort leader-nimbus#) - no-op# (.close zk-leader-elector#)] - (with-nimbus-connection [~client-sym host# port#] - ~@body ))) + ~client-sym (NimbusClient/getConfiguredClient conf#) + conn# (.transport ~client-sym) + ] + (try + ~@body + (finally (.close conn#))))) (defn direct-output-fields [fields] http://git-wip-us.apache.org/repos/asf/storm/blob/7cae523e/storm-core/src/jvm/backtype/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/Config.java b/storm-core/src/jvm/backtype/storm/Config.java index c0668ee..106f1f3 100644 --- a/storm-core/src/jvm/backtype/storm/Config.java +++ b/storm-core/src/jvm/backtype/storm/Config.java @@ -307,6 +307,12 @@ public class Config extends HashMap<String, Object> { public static final Object NIMBUS_THRIFT_TRANSPORT_PLUGIN_SCHEMA = String.class; /** + * List of seed nimbus hosts:port to use for leader nimbus discovery. + */ + public static final String NIMBUS_SEEDS = "nimbus.seeds"; + public static final Object NIMBUS_SEEDS_SCHEMA = ConfigValidation.StringsValidator; + + /** * Which port the Thrift interface of Nimbus should run on. Clients should * connect to this port to upload jars and submit topologies. */ http://git-wip-us.apache.org/repos/asf/storm/blob/7cae523e/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java index da10a4f..e4222e4 100644 --- a/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java +++ b/storm-core/src/jvm/backtype/storm/utils/NimbusClient.java @@ -18,17 +18,22 @@ package backtype.storm.utils; +import backtype.storm.Config; +import backtype.storm.generated.ClusterSummary; import backtype.storm.generated.Nimbus; +import backtype.storm.generated.NimbusSummary; import backtype.storm.nimbus.ILeaderElector; import backtype.storm.nimbus.NimbusInfo; import backtype.storm.security.auth.ThriftClient; import backtype.storm.security.auth.ThriftConnectionType; import clojure.lang.IFn; import clojure.lang.PersistentArrayMap; +import com.google.common.base.Splitter; import org.apache.thrift.transport.TTransportException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; import java.util.Map; public class NimbusClient extends ThriftClient { @@ -36,23 +41,30 @@ public class NimbusClient extends ThriftClient { private static final Logger LOG = LoggerFactory.getLogger(NimbusClient.class); public static NimbusClient getConfiguredClient(Map conf) { - ILeaderElector zkLeaderElector = null; - try { - IFn zkLeaderElectorFn = Utils.loadClojureFn("backtype.storm.zookeeper", "zk-leader-elector"); - zkLeaderElector = (ILeaderElector) zkLeaderElectorFn.invoke(PersistentArrayMap.create(conf)); - NimbusInfo leaderInfo = zkLeaderElector.getLeader(); - String nimbusHost = leaderInfo.getHost(); - int nimbusPort = leaderInfo.getPort(); - return new NimbusClient(conf, nimbusHost, nimbusPort); - } catch (TTransportException ex) { - throw new RuntimeException(ex); - } catch (Exception e) { - throw new RuntimeException(e); - } finally { - if(zkLeaderElector != null) { - zkLeaderElector.close(); + List<String> seeds = (List<String>) conf.get(Config.NIMBUS_SEEDS); + for(String seed : seeds) { + String[] split = seed.split(":"); + String host = split[0]; + int port = Integer.parseInt(split[1]); + try { + NimbusClient client = new NimbusClient(conf,host,port); + ClusterSummary clusterInfo = client.getClient().getClusterInfo(); + List<NimbusSummary> nimbuses = clusterInfo.get_nimbuses(); + if(nimbuses != null) { + for(NimbusSummary nimbusSummary : nimbuses) { + if(nimbusSummary.is_isLeader()) { + return new NimbusClient(conf, nimbusSummary.get_host(), nimbusSummary.get_port()); + } + } + } + throw new RuntimeException("Found nimbuses " + nimbuses + " none of which is elected as leader, please try " + + "again after some time."); + } catch (Exception e) { + LOG.warn("Ignoring exception while trying to get leader nimbus info from {}", seed); } } + throw new RuntimeException("Could not find leader nimbus from seed hosts " + seeds +". " + + "Did you specify a valid list of nimbus host:port for config " + Config.NIMBUS_SEEDS); } public NimbusClient(Map conf, String host, int port) throws TTransportException {
