Nimbus discovery through new thrift API instead of using zk.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/27ad1fb2 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/27ad1fb2 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/27ad1fb2 Branch: refs/heads/0.11.x-branch Commit: 27ad1fb25caa5eb20f477c89cf57c801b6cec3c2 Parents: 0b454e8 Author: Parth Brahmbhatt <[email protected]> Authored: Thu Feb 12 15:47:38 2015 -0800 Committer: Parth Brahmbhatt <[email protected]> Committed: Thu Feb 12 15:47:38 2015 -0800 ---------------------------------------------------------------------- storm-core/src/clj/backtype/storm/thrift.clj | 5 +-- storm-core/src/clj/backtype/storm/ui/core.clj | 37 ++++++++-------------- 2 files changed, 16 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/27ad1fb2/storm-core/src/clj/backtype/storm/thrift.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/thrift.clj b/storm-core/src/clj/backtype/storm/thrift.clj index 0cac7b8..474ea67 100644 --- a/storm-core/src/clj/backtype/storm/thrift.clj +++ b/storm-core/src/clj/backtype/storm/thrift.clj @@ -84,8 +84,9 @@ (defmacro with-configured-nimbus-connection [client-sym & body] `(let [conf# (read-storm-config) - ~client-sym (NimbusClient/getConfiguredClient conf#) - conn# (.transport ~client-sym) + nimbusClient# (NimbusClient/getConfiguredClient conf#) + ~client-sym (.getClient nimbusClient#) + conn# (.transport nimbusClient#) ] (try ~@body http://git-wip-us.apache.org/repos/asf/storm/blob/27ad1fb2/storm-core/src/clj/backtype/storm/ui/core.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/backtype/storm/ui/core.clj b/storm-core/src/clj/backtype/storm/ui/core.clj index 94b0311..8fd22a6 100644 --- a/storm-core/src/clj/backtype/storm/ui/core.clj +++ b/storm-core/src/clj/backtype/storm/ui/core.clj @@ -47,17 +47,6 @@ (def http-creds-handler (AuthUtils/GetUiHttpCredentialsPlugin *STORM-CONF*)) -(defmacro with-nimbus - [nimbus-sym & body] - `(let [leader-elector# (zk-leader-elector *STORM-CONF*) - leader-nimbus# (.getLeader leader-elector#) - host# (.getHost leader-nimbus#) - port# (.getPort leader-nimbus#) - no-op# (.close leader-elector#)] - (thrift/with-nimbus-connection - [~nimbus-sym host# port#] - ~@body))) - (defn assert-authorized-user ([servlet-request op] (assert-authorized-user servlet-request op nil)) @@ -468,7 +457,7 @@ (defn mk-visualization-data [id window include-sys?] - (with-nimbus + (thrift/with-configured-nimbus-connection nimbus (let [window (if window window ":all-time") topology (.getTopology ^Nimbus$Client nimbus id) @@ -490,12 +479,12 @@ spout-comp-summs bolt-comp-summs window id)))) (defn cluster-configuration [] - (with-nimbus nimbus + (thrift/with-configured-nimbus-connection nimbus (.getNimbusConf ^Nimbus$Client nimbus))) (defn cluster-summary ([user] - (with-nimbus nimbus + (thrift/with-configured-nimbus-connection nimbus (cluster-summary (.getClusterInfo ^Nimbus$Client nimbus) user))) ([^ClusterSummary summ user] (let [sups (.get_supervisors summ) @@ -519,7 +508,7 @@ (defn nimbus-summary ([] - (with-nimbus nimbus + (thrift/with-configured-nimbus-connection nimbus (nimbus-summary (.get_nimbuses (.getClusterInfo ^Nimbus$Client nimbus))))) ([nimbuses] @@ -535,7 +524,7 @@ (defn supervisor-summary ([] - (with-nimbus nimbus + (thrift/with-configured-nimbus-connection nimbus (supervisor-summary (.get_supervisors (.getClusterInfo ^Nimbus$Client nimbus))))) ([summs] @@ -549,7 +538,7 @@ (defn all-topologies-summary ([] - (with-nimbus + (thrift/with-configured-nimbus-connection nimbus (all-topologies-summary (.get_topologies (.getClusterInfo ^Nimbus$Client nimbus))))) @@ -665,7 +654,7 @@ "failed" (get-in stats [:failed k])}))) (defn topology-page [id window include-sys? user] - (with-nimbus nimbus + (thrift/with-configured-nimbus-connection nimbus (let [window (if window window ":all-time") window-hint (window-hint window) summ (.getTopologyInfo ^Nimbus$Client nimbus id) @@ -848,7 +837,7 @@ (defn component-page [topology-id component window include-sys? user] - (with-nimbus nimbus + (thrift/with-configured-nimbus-connection nimbus (let [window (if window window ":all-time") summ (.getTopologyInfo ^Nimbus$Client nimbus topology-id) topology (.getTopology ^Nimbus$Client nimbus topology-id) @@ -873,7 +862,7 @@ spec errors)))) (defn topology-config [topology-id] - (with-nimbus nimbus + (thrift/with-configured-nimbus-connection nimbus (from-json (.getTopologyConf ^Nimbus$Client nimbus topology-id)))) (defn check-include-sys? @@ -923,7 +912,7 @@ (assert-authorized-user servlet-request "getTopology" (topology-config id)) (json-response (component-page id component (:window m) (check-include-sys? (:sys m)) user) (:callback m)))) (POST "/api/v1/topology/:id/activate" [:as {:keys [cookies servlet-request]} id] - (with-nimbus nimbus + (thrift/with-configured-nimbus-connection nimbus (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id) name (.get_name tplg)] (assert-authorized-user servlet-request "activate" (topology-config id)) @@ -931,7 +920,7 @@ (log-message "Activating topology '" name "'"))) (resp/redirect (str "/api/v1/topology/" id))) (POST "/api/v1/topology/:id/deactivate" [:as {:keys [cookies servlet-request]} id] - (with-nimbus nimbus + (thrift/with-configured-nimbus-connection nimbus (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id) name (.get_name tplg)] (assert-authorized-user servlet-request "deactivate" (topology-config id)) @@ -939,7 +928,7 @@ (log-message "Deactivating topology '" name "'"))) (resp/redirect (str "/api/v1/topology/" (url-encode id)))) (POST "/api/v1/topology/:id/rebalance/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time] - (with-nimbus nimbus + (thrift/with-configured-nimbus-connection nimbus (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id) name (.get_name tplg) options (RebalanceOptions.)] @@ -949,7 +938,7 @@ (log-message "Rebalancing topology '" name "' with wait time: " wait-time " secs"))) (resp/redirect (str "/api/v1/topology/" (url-encode id)))) (POST "/api/v1/topology/:id/kill/:wait-time" [:as {:keys [cookies servlet-request]} id wait-time] - (with-nimbus nimbus + (thrift/with-configured-nimbus-connection nimbus (let [tplg (.getTopologyInfo ^Nimbus$Client nimbus id) name (.get_name tplg) options (KillOptions.)]
