delete zookeeper.clj zookeeper_state_factory.clj cluster.clj, but some tests still can't pass
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9a79fb7d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9a79fb7d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9a79fb7d Branch: refs/heads/master Commit: 9a79fb7de0e824e73c294738521e892f1d81fbb0 Parents: 682d31c Author: xiaojian.fxj <[email protected]> Authored: Wed Feb 3 20:28:05 2016 +0800 Committer: xiaojian.fxj <[email protected]> Committed: Wed Feb 3 20:28:05 2016 +0800 ---------------------------------------------------------------------- conf/defaults.yaml | 2 +- storm-core/src/clj/org/apache/storm/cluster.clj | 691 ------------------- .../cluster_state/zookeeper_state_factory.clj | 163 ----- .../org/apache/storm/command/dev_zookeeper.clj | 2 +- .../clj/org/apache/storm/command/heartbeats.clj | 6 +- .../apache/storm/command/shell_submission.clj | 2 +- .../src/clj/org/apache/storm/converter.clj | 14 +- .../src/clj/org/apache/storm/daemon/common.clj | 13 +- .../clj/org/apache/storm/daemon/executor.clj | 12 +- .../src/clj/org/apache/storm/daemon/nimbus.clj | 138 ++-- .../clj/org/apache/storm/daemon/supervisor.clj | 35 +- .../src/clj/org/apache/storm/daemon/worker.clj | 43 +- .../storm/pacemaker/pacemaker_state_factory.clj | 12 +- storm-core/src/clj/org/apache/storm/stats.clj | 3 +- storm-core/src/clj/org/apache/storm/testing.clj | 14 +- storm-core/src/clj/org/apache/storm/thrift.clj | 2 +- storm-core/src/clj/org/apache/storm/ui/core.clj | 2 +- storm-core/src/clj/org/apache/storm/util.clj | 11 + .../src/clj/org/apache/storm/zookeeper.clj | 75 -- .../jvm/org/apache/storm/callback/Callback.java | 3 + .../jvm/org/apache/storm/cluster/Cluster.java | 38 +- .../org/apache/storm/cluster/ClusterState.java | 2 +- .../storm/cluster/DistributedClusterState.java | 7 +- .../apache/storm/cluster/StormClusterState.java | 34 +- .../storm/cluster/StormZkClusterState.java | 109 +-- .../testing/staticmocking/MockedCluster.java | 31 + .../org/apache/storm/integration_test.clj | 15 +- .../test/clj/org/apache/storm/cluster_test.clj | 103 ++- .../test/clj/org/apache/storm/nimbus_test.clj | 148 ++-- .../storm/security/auth/nimbus_auth_test.clj | 3 +- .../clj/org/apache/storm/supervisor_test.clj | 29 +- .../test/jvm/org/apache/storm/ClusterTest.java | 22 + 32 files changed, 488 insertions(+), 1296 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index 8873d12..74605bb 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -51,7 +51,7 @@ storm.auth.simple-white-list.users: [] storm.auth.simple-acl.users: [] storm.auth.simple-acl.users.commands: [] storm.auth.simple-acl.admins: [] -storm.cluster.state.store: "org.apache.storm.cluster_state.zookeeper_state_factory" +storm.cluster.state.store: "org.apache.storm.cluster.StormZkClusterState" storm.meta.serialization.delegate: "org.apache.storm.serialization.GzipThriftSerializationDelegate" storm.codedistributor.class: "org.apache.storm.codedistributor.LocalFileSystemCodeDistributor" storm.workers.artifacts.dir: "workers-artifacts" http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/storm-core/src/clj/org/apache/storm/cluster.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/cluster.clj b/storm-core/src/clj/org/apache/storm/cluster.clj deleted file mode 100644 index 152423a..0000000 --- a/storm-core/src/clj/org/apache/storm/cluster.clj +++ /dev/null @@ -1,691 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns org.apache.storm.cluster - (:import [org.apache.zookeeper.data Stat ACL Id] - [org.apache.storm.generated SupervisorInfo Assignment StormBase ClusterWorkerHeartbeat ErrorInfo Credentials NimbusSummary - LogConfig ProfileAction ProfileRequest NodeInfo] - [java.io Serializable]) - (:import [org.apache.zookeeper KeeperException KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms]) - (:import [org.apache.curator.framework CuratorFramework]) - (:import [org.apache.storm.utils Utils]) - (:import [org.apache.storm.cluster ClusterState ClusterStateContext ClusterStateListener ConnectionState]) - (:import [java.security MessageDigest]) - (:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider]) - (:import [org.apache.storm.nimbus NimbusInfo]) - (:use [org.apache.storm util log config converter]) - (:require [org.apache.storm [zookeeper :as zk]]) - (:require [org.apache.storm.daemon [common :as common]])) - -(defn mk-topo-only-acls - [topo-conf] - (let [payload (.get topo-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)] - (when (Utils/isZkAuthenticationConfiguredTopology topo-conf) - [(first ZooDefs$Ids/CREATOR_ALL_ACL) - (ACL. ZooDefs$Perms/READ (Id. "digest" (DigestAuthenticationProvider/generateDigest payload)))]))) - -(defnk mk-distributed-cluster-state - [conf :auth-conf nil :acls nil :context (ClusterStateContext.)] - (let [clazz (Class/forName (or (conf STORM-CLUSTER-STATE-STORE) - "org.apache.storm.cluster_state.zookeeper_state_factory")) - state-instance (.newInstance clazz)] - (log-debug "Creating cluster state: " (.toString clazz)) - (or (.mkState state-instance conf auth-conf acls context) - nil))) - -(defprotocol StormClusterState - (assignments [this callback]) - (assignment-info [this storm-id callback]) - (assignment-info-with-version [this storm-id callback]) - (assignment-version [this storm-id callback]) - ;returns key information under /storm/blobstore/key - (blobstore-info [this blob-key]) - ;returns list of nimbus summaries stored under /stormroot/nimbuses/<nimbus-ids> -> <data> - (nimbuses [this]) - ;adds the NimbusSummary to /stormroot/nimbuses/nimbus-id - (add-nimbus-host! [this nimbus-id nimbus-summary]) - - (active-storms [this]) - (storm-base [this storm-id callback]) - (get-worker-heartbeat [this storm-id node port]) - (get-worker-profile-requests [this storm-id nodeinfo thrift?]) - (get-topology-profile-requests [this storm-id thrift?]) - (set-worker-profile-request [this storm-id profile-request]) - (delete-topology-profile-requests [this storm-id profile-request]) - (executor-beats [this storm-id executor->node+port]) - (supervisors [this callback]) - (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist - (setup-heartbeats! [this storm-id]) - (teardown-heartbeats! [this storm-id]) - (teardown-topology-errors! [this storm-id]) - (heartbeat-storms [this]) - (error-topologies [this]) - (set-topology-log-config! [this storm-id log-config]) - (topology-log-config [this storm-id cb]) - (worker-heartbeat! [this storm-id node port info]) - (remove-worker-heartbeat! [this storm-id node port]) - (supervisor-heartbeat! [this supervisor-id info]) - (worker-backpressure! [this storm-id node port info]) - (topology-backpressure [this storm-id callback]) - (setup-backpressure! [this storm-id]) - (remove-worker-backpressure! [this storm-id node port]) - (activate-storm! [this storm-id storm-base]) - (update-storm! [this storm-id new-elems]) - (remove-storm-base! [this storm-id]) - (set-assignment! [this storm-id info]) - ;; sets up information related to key consisting of nimbus - ;; host:port and version info of the blob - (setup-blobstore! [this key nimbusInfo versionInfo]) - (active-keys [this]) - (blobstore [this callback]) - (remove-storm! [this storm-id]) - (remove-blobstore-key! [this blob-key]) - (remove-key-version! [this blob-key]) - (report-error [this storm-id component-id node port error]) - (errors [this storm-id component-id]) - (last-error [this storm-id component-id]) - (set-credentials! [this storm-id creds topo-conf]) - (credentials [this storm-id callback]) - (disconnect [this])) - -(def ASSIGNMENTS-ROOT "assignments") -(def CODE-ROOT "code") -(def STORMS-ROOT "storms") -(def SUPERVISORS-ROOT "supervisors") -(def WORKERBEATS-ROOT "workerbeats") -(def BACKPRESSURE-ROOT "backpressure") -(def ERRORS-ROOT "errors") -(def BLOBSTORE-ROOT "blobstore") -; Stores the latest update sequence for a blob -(def BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-ROOT "blobstoremaxkeysequencenumber") -(def NIMBUSES-ROOT "nimbuses") -(def CREDENTIALS-ROOT "credentials") -(def LOGCONFIG-ROOT "logconfigs") -(def PROFILERCONFIG-ROOT "profilerconfigs") - -(def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT)) -(def STORMS-SUBTREE (str "/" STORMS-ROOT)) -(def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT)) -(def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT)) -(def BACKPRESSURE-SUBTREE (str "/" BACKPRESSURE-ROOT)) -(def ERRORS-SUBTREE (str "/" ERRORS-ROOT)) -;; Blobstore subtree /storm/blobstore -(def BLOBSTORE-SUBTREE (str "/" BLOBSTORE-ROOT)) -(def BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-SUBTREE (str "/" BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-ROOT)) -(def NIMBUSES-SUBTREE (str "/" NIMBUSES-ROOT)) -(def CREDENTIALS-SUBTREE (str "/" CREDENTIALS-ROOT)) -(def LOGCONFIG-SUBTREE (str "/" LOGCONFIG-ROOT)) -(def PROFILERCONFIG-SUBTREE (str "/" PROFILERCONFIG-ROOT)) - -(defn supervisor-path - [id] - (str SUPERVISORS-SUBTREE "/" id)) - -(defn assignment-path - [id] - (str ASSIGNMENTS-SUBTREE "/" id)) - -(defn blobstore-path - [key] - (str BLOBSTORE-SUBTREE "/" key)) - -(defn blobstore-max-key-sequence-number-path - [key] - (str BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-SUBTREE "/" key)) - -(defn nimbus-path - [id] - (str NIMBUSES-SUBTREE "/" id)) - -(defn storm-path - [id] - (str STORMS-SUBTREE "/" id)) - -(defn workerbeat-storm-root - [storm-id] - (str WORKERBEATS-SUBTREE "/" storm-id)) - -(defn workerbeat-path - [storm-id node port] - (str (workerbeat-storm-root storm-id) "/" node "-" port)) - -(defn backpressure-storm-root - [storm-id] - (str BACKPRESSURE-SUBTREE "/" storm-id)) - -(defn backpressure-path - [storm-id node port] - (str (backpressure-storm-root storm-id) "/" node "-" port)) - -(defn error-storm-root - [storm-id] - (str ERRORS-SUBTREE "/" storm-id)) - -(defn error-path - [storm-id component-id] - (str (error-storm-root storm-id) "/" (url-encode component-id))) - -(def last-error-path-seg "last-error") - -(defn last-error-path - [storm-id component-id] - (str (error-storm-root storm-id) - "/" - (url-encode component-id) - "-" - last-error-path-seg)) - -(defn credentials-path - [storm-id] - (str CREDENTIALS-SUBTREE "/" storm-id)) - -(defn log-config-path - [storm-id] - (str LOGCONFIG-SUBTREE "/" storm-id)) - -(defn profiler-config-path - ([storm-id] - (str PROFILERCONFIG-SUBTREE "/" storm-id)) - ([storm-id host port request-type] - (str (profiler-config-path storm-id) "/" host "_" port "_" request-type))) - -(defn- issue-callback! - [cb-atom] - (let [cb @cb-atom] - (reset! cb-atom nil) - (when cb - (cb)))) - -(defn- issue-map-callback! - [cb-atom id] - (let [cb (@cb-atom id)] - (swap! cb-atom dissoc id) - (when cb - (cb id)))) - -(defn- maybe-deserialize - [ser clazz] - (when ser - (Utils/deserialize ser clazz))) - -(defrecord TaskError [error time-secs host port]) - -(defn- parse-error-path - [^String p] - (Long/parseLong (.substring p 1))) - -(defn convert-executor-beats - "Ensures that we only return heartbeats for executors assigned to - this worker." - [executors worker-hb] - (let [executor-stats (:executor-stats worker-hb)] - (->> executors - (map (fn [t] - (if (contains? executor-stats t) - {t {:time-secs (:time-secs worker-hb) - :uptime (:uptime worker-hb) - :stats (get executor-stats t)}}))) - (into {})))) - -;; Watches should be used for optimization. When ZK is reconnecting, they're not guaranteed to be called. -(defnk mk-storm-cluster-state - [cluster-state-spec :acls nil :context (ClusterStateContext.)] - (let [[solo? cluster-state] (if (instance? ClusterState cluster-state-spec) - [false cluster-state-spec] - [true (mk-distributed-cluster-state cluster-state-spec :auth-conf cluster-state-spec :acls acls :context context)]) - assignment-info-callback (atom {}) - assignment-info-with-version-callback (atom {}) - assignment-version-callback (atom {}) - supervisors-callback (atom nil) - backpressure-callback (atom {}) ;; we want to reigister a topo directory getChildren callback for all workers of this dir - assignments-callback (atom nil) - storm-base-callback (atom {}) - blobstore-callback (atom nil) - credentials-callback (atom {}) - log-config-callback (atom {}) - state-id (.register - cluster-state - (fn [type path] - (let [[subtree & args] (tokenize-path path)] - (condp = subtree - ASSIGNMENTS-ROOT (if (empty? args) - (issue-callback! assignments-callback) - (do - (issue-map-callback! assignment-info-callback (first args)) - (issue-map-callback! assignment-version-callback (first args)) - (issue-map-callback! assignment-info-with-version-callback (first args)))) - SUPERVISORS-ROOT (issue-callback! supervisors-callback) - BLOBSTORE-ROOT (issue-callback! blobstore-callback) ;; callback register for blobstore - STORMS-ROOT (issue-map-callback! storm-base-callback (first args)) - CREDENTIALS-ROOT (issue-map-callback! credentials-callback (first args)) - LOGCONFIG-ROOT (issue-map-callback! log-config-callback (first args)) - BACKPRESSURE-ROOT (issue-map-callback! backpressure-callback (first args)) - ;; this should never happen - (exit-process! 30 "Unknown callback for subtree " subtree args)))))] - (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE WORKERBEATS-SUBTREE ERRORS-SUBTREE BLOBSTORE-SUBTREE NIMBUSES-SUBTREE - LOGCONFIG-SUBTREE]] - (.mkdirs cluster-state p acls)) - (reify - StormClusterState - - (assignments - [this callback] - (when callback - (reset! assignments-callback callback)) - (.get_children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback))) - - (assignment-info - [this storm-id callback] - (when callback - (swap! assignment-info-callback assoc storm-id callback)) - (clojurify-assignment (maybe-deserialize (.get_data cluster-state (assignment-path storm-id) (not-nil? callback)) Assignment))) - - (assignment-info-with-version - [this storm-id callback] - (when callback - (swap! assignment-info-with-version-callback assoc storm-id callback)) - (let [{data :data version :version} - (.get_data_with_version cluster-state (assignment-path storm-id) (not-nil? callback))] - {:data (clojurify-assignment (maybe-deserialize data Assignment)) - :version version})) - - (assignment-version - [this storm-id callback] - (when callback - (swap! assignment-version-callback assoc storm-id callback)) - (.get_version cluster-state (assignment-path storm-id) (not-nil? callback))) - - ;; blobstore state - (blobstore - [this callback] - (when callback - (reset! blobstore-callback callback)) - (.sync_path cluster-state BLOBSTORE-SUBTREE) - (.get_children cluster-state BLOBSTORE-SUBTREE (not-nil? callback))) - - (nimbuses - [this] - (map #(maybe-deserialize (.get_data cluster-state (nimbus-path %1) false) NimbusSummary) - (.get_children cluster-state NIMBUSES-SUBTREE false))) - - (add-nimbus-host! - [this nimbus-id nimbus-summary] - ;explicit delete for ephmeral node to ensure this session creates the entry. - (.delete_node cluster-state (nimbus-path nimbus-id)) - - (.add_listener cluster-state (reify ClusterStateListener - (^void stateChanged[this ^ConnectionState newState] - (log-message "Connection state listener invoked, zookeeper connection state has changed to " newState) - (if (.equals newState ConnectionState/RECONNECTED) - (do - (log-message "Connection state has changed to reconnected so setting nimbuses entry one more time") - (.set_ephemeral_node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls)))))) - - (.set_ephemeral_node cluster-state (nimbus-path nimbus-id) (Utils/serialize nimbus-summary) acls)) - - (setup-blobstore! - [this key nimbusInfo versionInfo] - (let [path (str (blobstore-path key) "/" (.toHostPortString nimbusInfo) "-" versionInfo)] - (log-message "setup-path: " path) - (.mkdirs cluster-state (blobstore-path key) acls) - ;we delete the node first to ensure the node gets created as part of this session only. - (.delete_node_blobstore cluster-state (str (blobstore-path key)) (.toHostPortString nimbusInfo)) - (.set_ephemeral_node cluster-state path nil acls))) - - (blobstore-info - [this blob-key] - (let [path (blobstore-path blob-key)] - (.sync_path cluster-state path) - (.get_children cluster-state path false))) - - (active-storms - [this] - (.get_children cluster-state STORMS-SUBTREE false)) - - (active-keys - [this] - (.get_children cluster-state BLOBSTORE-SUBTREE false)) - - (heartbeat-storms - [this] - (.get_worker_hb_children cluster-state WORKERBEATS-SUBTREE false)) - - (error-topologies - [this] - (.get_children cluster-state ERRORS-SUBTREE false)) - - (get-worker-heartbeat - [this storm-id node port] - (let [worker-hb (.get_worker_hb cluster-state (workerbeat-path storm-id node port) false)] - (if worker-hb - (-> worker-hb - (maybe-deserialize ClusterWorkerHeartbeat) - clojurify-zk-worker-hb)))) - - (executor-beats - [this storm-id executor->node+port] - ;; need to take executor->node+port in explicitly so that we don't run into a situation where a - ;; long dead worker with a skewed clock overrides all the timestamps. By only checking heartbeats - ;; with an assigned node+port, and only reading executors from that heartbeat that are actually assigned, - ;; we avoid situations like that - (let [node+port->executors (reverse-map executor->node+port) - all-heartbeats (for [[[node port] executors] node+port->executors] - (->> (get-worker-heartbeat this storm-id node port) - (convert-executor-beats executors) - ))] - (apply merge all-heartbeats))) - - (supervisors - [this callback] - (when callback - (reset! supervisors-callback callback)) - (.get_children cluster-state SUPERVISORS-SUBTREE (not-nil? callback))) - - (supervisor-info - [this supervisor-id] - (clojurify-supervisor-info (maybe-deserialize (.get_data cluster-state (supervisor-path supervisor-id) false) SupervisorInfo))) - - (topology-log-config - [this storm-id cb] - (when cb - (swap! log-config-callback assoc storm-id cb)) - (maybe-deserialize (.get_data cluster-state (log-config-path storm-id) (not-nil? cb)) LogConfig)) - - (set-topology-log-config! - [this storm-id log-config] - (.set_data cluster-state (log-config-path storm-id) (Utils/serialize log-config) acls)) - - (set-worker-profile-request - [this storm-id profile-request] - (let [request-type (.get_action profile-request) - host (.get_node (.get_nodeInfo profile-request)) - port (first (.get_port (.get_nodeInfo profile-request)))] - (.set_data cluster-state - (profiler-config-path storm-id host port request-type) - (Utils/serialize profile-request) - acls))) - - (get-topology-profile-requests - [this storm-id thrift?] - (let [path (profiler-config-path storm-id) - requests (if (.node_exists cluster-state path false) - (dofor [c (.get_children cluster-state path false)] - (let [raw (.get_data cluster-state (str path "/" c) false) - request (maybe-deserialize raw ProfileRequest)] - (if thrift? - request - (clojurify-profile-request request)))))] - requests)) - - (delete-topology-profile-requests - [this storm-id profile-request] - (let [profile-request-inst (thriftify-profile-request profile-request) - action (:action profile-request) - host (:host profile-request) - port (:port profile-request)] - (.delete_node cluster-state - (profiler-config-path storm-id host port action)))) - - (get-worker-profile-requests - [this storm-id node-info thrift?] - (let [host (:host node-info) - port (:port node-info) - profile-requests (get-topology-profile-requests this storm-id thrift?)] - (if thrift? - (filter #(and (= host (.get_node (.get_nodeInfo %))) (= port (first (.get_port (.get_nodeInfo %))))) - profile-requests) - (filter #(and (= host (:host %)) (= port (:port %))) - profile-requests)))) - - (worker-heartbeat! - [this storm-id node port info] - (let [thrift-worker-hb (thriftify-zk-worker-hb info)] - (if thrift-worker-hb - (.set_worker_hb cluster-state (workerbeat-path storm-id node port) (Utils/serialize thrift-worker-hb) acls)))) - - (remove-worker-heartbeat! - [this storm-id node port] - (.delete_worker_hb cluster-state (workerbeat-path storm-id node port))) - - (setup-heartbeats! - [this storm-id] - (.mkdirs cluster-state (workerbeat-storm-root storm-id) acls)) - - (teardown-heartbeats! - [this storm-id] - (try-cause - (.delete_worker_hb cluster-state (workerbeat-storm-root storm-id)) - (catch KeeperException e - (log-warn-error e "Could not teardown heartbeats for " storm-id)))) - - (worker-backpressure! - [this storm-id node port on?] - "if znode exists and to be not on?, delete; if exists and on?, do nothing; - if not exists and to be on?, create; if not exists and not on?, do nothing" - (let [path (backpressure-path storm-id node port) - existed (.node_exists cluster-state path false)] - (if existed - (if (not on?) - (.delete_node cluster-state path)) ;; delete the znode since the worker is not congested - (if on? - (.set_ephemeral_node cluster-state path nil acls))))) ;; create the znode since worker is congested - - (topology-backpressure - [this storm-id callback] - "if the backpresure/storm-id dir is empty, this topology has throttle-on, otherwise not." - (when callback - (swap! backpressure-callback assoc storm-id callback)) - (let [path (backpressure-storm-root storm-id) - children (.get_children cluster-state path (not-nil? callback))] - (> (count children) 0))) - - (setup-backpressure! - [this storm-id] - (.mkdirs cluster-state (backpressure-storm-root storm-id) acls)) - - (remove-worker-backpressure! - [this storm-id node port] - (.delete_node cluster-state (backpressure-path storm-id node port))) - - (teardown-topology-errors! - [this storm-id] - (try-cause - (.delete_node cluster-state (error-storm-root storm-id)) - (catch KeeperException e - (log-warn-error e "Could not teardown errors for " storm-id)))) - - (supervisor-heartbeat! - [this supervisor-id info] - (let [thrift-supervisor-info (thriftify-supervisor-info info)] - (.set_ephemeral_node cluster-state (supervisor-path supervisor-id) (Utils/serialize thrift-supervisor-info) acls))) - - (activate-storm! - [this storm-id storm-base] - (let [thrift-storm-base (thriftify-storm-base storm-base)] - (.set_data cluster-state (storm-path storm-id) (Utils/serialize thrift-storm-base) acls))) - - (update-storm! - [this storm-id new-elems] - (let [base (storm-base this storm-id nil) - executors (:component->executors base) - component->debug (:component->debug base) - new-elems (update new-elems :component->executors (partial merge executors)) - new-elems (update new-elems :component->debug (partial merge-with merge component->debug))] - (.set_data cluster-state (storm-path storm-id) - (-> base - (merge new-elems) - thriftify-storm-base - Utils/serialize) - acls))) - - (storm-base - [this storm-id callback] - (when callback - (swap! storm-base-callback assoc storm-id callback)) - (clojurify-storm-base (maybe-deserialize (.get_data cluster-state (storm-path storm-id) (not-nil? callback)) StormBase))) - - (remove-storm-base! - [this storm-id] - (.delete_node cluster-state (storm-path storm-id))) - - (set-assignment! - [this storm-id info] - (let [thrift-assignment (thriftify-assignment info)] - (.set_data cluster-state (assignment-path storm-id) (Utils/serialize thrift-assignment) acls))) - - (remove-blobstore-key! - [this blob-key] - (log-debug "removing key" blob-key) - (.delete_node cluster-state (blobstore-path blob-key))) - - (remove-key-version! - [this blob-key] - (.delete_node cluster-state (blobstore-max-key-sequence-number-path blob-key))) - - (remove-storm! - [this storm-id] - (.delete_node cluster-state (assignment-path storm-id)) - (.delete_node cluster-state (credentials-path storm-id)) - (.delete_node cluster-state (log-config-path storm-id)) - (.delete_node cluster-state (profiler-config-path storm-id)) - (remove-storm-base! this storm-id)) - - (set-credentials! - [this storm-id creds topo-conf] - (let [topo-acls (mk-topo-only-acls topo-conf) - path (credentials-path storm-id) - thriftified-creds (thriftify-credentials creds)] - (.set_data cluster-state path (Utils/serialize thriftified-creds) topo-acls))) - - (credentials - [this storm-id callback] - (when callback - (swap! credentials-callback assoc storm-id callback)) - (clojurify-crdentials (maybe-deserialize (.get_data cluster-state (credentials-path storm-id) (not-nil? callback)) Credentials))) - - (report-error - [this storm-id component-id node port error] - (let [path (error-path storm-id component-id) - last-error-path (last-error-path storm-id component-id) - data (thriftify-error {:time-secs (current-time-secs) :error (stringify-error error) :host node :port port}) - _ (.mkdirs cluster-state path acls) - ser-data (Utils/serialize data) - _ (.mkdirs cluster-state path acls) - _ (.create_sequential cluster-state (str path "/e") ser-data acls) - _ (.set_data cluster-state last-error-path ser-data acls) - to-kill (->> (.get_children cluster-state path false) - (sort-by parse-error-path) - reverse - (drop 10))] - (doseq [k to-kill] - (.delete_node cluster-state (str path "/" k))))) - - (errors - [this storm-id component-id] - (let [path (error-path storm-id component-id) - errors (if (.node_exists cluster-state path false) - (dofor [c (.get_children cluster-state path false)] - (if-let [data (-> (.get_data cluster-state - (str path "/" c) - false) - (maybe-deserialize ErrorInfo) - clojurify-error)] - (map->TaskError data))) - ())] - (->> (filter not-nil? errors) - (sort-by (comp - :time-secs))))) - - (last-error - [this storm-id component-id] - (let [path (last-error-path storm-id component-id)] - (if (.node_exists cluster-state path false) - (if-let [data (-> (.get_data cluster-state path false) - (maybe-deserialize ErrorInfo) - clojurify-error)] - (map->TaskError data))))) - - (disconnect - [this] - (.unregister cluster-state state-id) - (when solo? - (.close cluster-state)))))) - -;; daemons have a single thread that will respond to events -;; start with initialize event -;; callbacks add events to the thread's queue - -;; keeps in memory cache of the state, only for what client subscribes to. Any subscription is automatically kept in sync, and when there are changes, client is notified. -;; master gives orders through state, and client records status in state (ephemerally) - -;; master tells nodes what workers to launch - -;; master writes this. supervisors and workers subscribe to this to understand complete topology. each storm is a map from nodes to workers to tasks to ports whenever topology changes everyone will be notified -;; master includes timestamp of each assignment so that appropriate time can be given to each worker to start up -;; /assignments/{storm id} - -;; which tasks they talk to, etc. (immutable until shutdown) -;; everyone reads this in full to understand structure -;; /tasks/{storm id}/{task id} ; just contains bolt id - -;; supervisors send heartbeats here, master doesn't subscribe but checks asynchronously -;; /supervisors/status/{ephemeral node ids} ;; node metadata such as port ranges are kept here - -;; tasks send heartbeats here, master doesn't subscribe, just checks asynchronously -;; /taskbeats/{storm id}/{ephemeral task id} - -;; contains data about whether it's started or not, tasks and workers subscribe to specific storm here to know when to shutdown -;; master manipulates -;; /storms/{storm id} - -;; Zookeeper flows: - -;; Master: -;; job submit: -;; 1. read which nodes are available -;; 2. set up the worker/{storm}/{task} stuff (static) -;; 3. set assignments -;; 4. start storm - necessary in case master goes down, when goes back up can remember to take down the storm (2 states: on or off) - -;; Monitoring (or by checking when nodes go down or heartbeats aren't received): -;; 1. read assignment -;; 2. see which tasks/nodes are up -;; 3. make new assignment to fix any problems -;; 4. if a storm exists but is not taken down fully, ensure that storm takedown is launched (step by step remove tasks and finally remove assignments) - -;; masters only possible watches is on ephemeral nodes and tasks, and maybe not even - -;; Supervisor: -;; 1. monitor /storms/* and assignments -;; 2. local state about which workers are local -;; 3. when storm is on, check that workers are running locally & start/kill if different than assignments -;; 4. when storm is off, monitor tasks for workers - when they all die or don't hearbeat, kill the process and cleanup - -;; Worker: -;; 1. On startup, start the tasks if the storm is on - -;; Task: -;; 1. monitor assignments, reroute when assignments change -;; 2. monitor storm (when storm turns off, error if assignments change) - take down tasks as master turns them off - -;; locally on supervisor: workers write pids locally on startup, supervisor deletes it on shutdown (associates pid with worker name) -;; supervisor periodically checks to make sure processes are alive -;; {rootdir}/workers/{storm id}/{worker id} ;; contains pid inside - -;; all tasks in a worker share the same cluster state -;; workers, supervisors, and tasks subscribes to storm to know when it's started or stopped -;; on stopped, master removes records in order (tasks need to subscribe to themselves to see if they disappear) -;; when a master removes a worker, the supervisor should kill it (and escalate to kill -9) -;; on shutdown, tasks subscribe to tasks that send data to them to wait for them to die. when node disappears, they can die http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj b/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj deleted file mode 100644 index dcfa8d8..0000000 --- a/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj +++ /dev/null @@ -1,163 +0,0 @@ -;; Licensed to the Apache Software Foundation (ASF) under one -;; or more contributor license agreements. See the NOTICE file -;; distributed with this work for additional information -;; regarding copyright ownership. The ASF licenses this file -;; to you under the Apache License, Version 2.0 (the -;; "License"); you may not use this file except in compliance -;; with the License. You may obtain a copy of the License at -;; -;; http://www.apache.org/licenses/LICENSE-2.0 -;; -;; Unless required by applicable law or agreed to in writing, software -;; distributed under the License is distributed on an "AS IS" BASIS, -;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -;; See the License for the specific language governing permissions and -;; limitations under the License. - -(ns org.apache.storm.cluster-state.zookeeper-state-factory - (:import [org.apache.curator.framework.state ConnectionStateListener] - [org.apache.storm.zookeeper Zookeeper]) - (:import [org.apache.zookeeper KeeperException$NoNodeException CreateMode - Watcher$Event$EventType Watcher$Event$KeeperState] - [org.apache.storm.cluster ClusterState DaemonType]) - (:use [org.apache.storm cluster config log util]) - (:require [org.apache.storm [zookeeper :as zk]]) - (:gen-class - :implements [org.apache.storm.cluster.ClusterStateFactory])) - -(defn -mkState [this conf auth-conf acls context] - (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf STORM-ZOOKEEPER-PORT) :auth-conf auth-conf)] - (Zookeeper/mkdirs zk (conf STORM-ZOOKEEPER-ROOT) acls) - (.close zk)) - (let [callbacks (atom {}) - active (atom true) - zk-writer (zk/mk-client conf - (conf STORM-ZOOKEEPER-SERVERS) - (conf STORM-ZOOKEEPER-PORT) - :auth-conf auth-conf - :root (conf STORM-ZOOKEEPER-ROOT) - :watcher (fn [state type path] - (when @active - (when-not (= Watcher$Event$KeeperState/SyncConnected state) - (log-warn "Received event " state ":" type ":" path " with disconnected Writer Zookeeper.")) - (when-not (= Watcher$Event$EventType/None type) - (doseq [callback (vals @callbacks)] - (callback type path)))))) - is-nimbus? (= (.getDaemonType context) DaemonType/NIMBUS) - zk-reader (if is-nimbus? - (zk/mk-client conf - (conf STORM-ZOOKEEPER-SERVERS) - (conf STORM-ZOOKEEPER-PORT) - :auth-conf auth-conf - :root (conf STORM-ZOOKEEPER-ROOT) - :watcher (fn [state type path] - (when @active - (when-not (= Watcher$Event$KeeperState/SyncConnected state) - (log-warn "Received event " state ":" type ":" path " with disconnected Reader Zookeeper.")) - (when-not (= Watcher$Event$EventType/None type) - (doseq [callback (vals @callbacks)] - (callback type path)))))) - zk-writer)] - (reify - ClusterState - - (register - [this callback] - (let [id (uuid)] - (swap! callbacks assoc id callback) - id)) - - (unregister - [this id] - (swap! callbacks dissoc id)) - - (set-ephemeral-node - [this path data acls] - (Zookeeper/mkdirs zk-writer (parent-path path) acls) - (if (Zookeeper/exists zk-writer path false) - (try-cause - (Zookeeper/setData zk-writer path data) ; should verify that it's ephemeral - (catch KeeperException$NoNodeException e - (log-warn-error e "Ephemeral node disappeared between checking for existing and setting data") - (Zookeeper/createNode zk-writer path data CreateMode/EPHEMERAL acls))) - (Zookeeper/createNode zk-writer path data CreateMode/EPHEMERAL acls))) - - (create-sequential - [this path data acls] - (Zookeeper/createNode zk-writer path data CreateMode/PERSISTENT_SEQUENTIAL acls)) - - (set-data - [this path data acls] - ;; note: this does not turn off any existing watches - (if (Zookeeper/exists zk-writer path false) - (Zookeeper/setData zk-writer path data) - (do - (Zookeeper/mkdirs zk-writer (parent-path path) acls) - (Zookeeper/createNode zk-writer path data CreateMode/PERSISTENT acls)))) - - (set-worker-hb - [this path data acls] - (.set_data this path data acls)) - - (delete-node - [this path] - (Zookeeper/deleteNode zk-writer path)) - - (delete-worker-hb - [this path] - (.delete_node this path)) - - (get-data - [this path watch?] - (Zookeeper/getData zk-reader path watch?)) - - (get-data-with-version - [this path watch?] - (Zookeeper/getDataWithVersion zk-reader path watch?)) - - (get-version - [this path watch?] - (Zookeeper/getVersion zk-reader path watch?)) - - (get-worker-hb - [this path watch?] - (.get_data this path watch?)) - - (get-children - [this path watch?] - (Zookeeper/getChildren zk-reader path watch?)) - - (get-worker-hb-children - [this path watch?] - (.get_children this path watch?)) - - (mkdirs - [this path acls] - (Zookeeper/mkdirs zk-writer path acls)) - - (node-exists - [this path watch?] - (Zookeeper/existsNode zk-reader path watch?)) - - (add-listener - [this listener] - (let [curator-listener (reify ConnectionStateListener - (stateChanged - [this client newState] - (.stateChanged listener client newState)))] - (Zookeeper/addListener zk-reader curator-listener))) - - (sync-path - [this path] - (Zookeeper/syncPath zk-writer path)) - - (delete-node-blobstore - [this path nimbus-host-port-info] - (Zookeeper/deleteNodeBlobstore zk-writer path nimbus-host-port-info)) - - (close - [this] - (reset! active false) - (.close zk-writer) - (if is-nimbus? - (.close zk-reader)))))) http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj b/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj index ef9ecbb..7be526d 100644 --- a/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj +++ b/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj @@ -14,7 +14,7 @@ ;; See the License for the specific language governing permissions and ;; limitations under the License. (ns org.apache.storm.command.dev-zookeeper - (:use [org.apache.storm zookeeper util config]) + (:use [org.apache.storm util config]) (:import [org.apache.storm.utils ConfigUtils]) (:import [org.apache.storm.zookeeper Zookeeper]) (:gen-class)) http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/storm-core/src/clj/org/apache/storm/command/heartbeats.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/heartbeats.clj b/storm-core/src/clj/org/apache/storm/command/heartbeats.clj index be8d030..954042f 100644 --- a/storm-core/src/clj/org/apache/storm/command/heartbeats.clj +++ b/storm-core/src/clj/org/apache/storm/command/heartbeats.clj @@ -18,16 +18,16 @@ [config :refer :all] [log :refer :all] [util :refer :all] - [cluster :refer :all] [converter :refer :all]] [clojure.string :as string]) (:import [org.apache.storm.generated ClusterWorkerHeartbeat] - [org.apache.storm.utils Utils ConfigUtils]) + [org.apache.storm.utils Utils ConfigUtils] + [org.apache.storm.cluster DistributedClusterState ClusterStateContext]) (:gen-class)) (defn -main [command path & args] (let [conf (clojurify-structure (ConfigUtils/readStormConfig)) - cluster (mk-distributed-cluster-state conf :auth-conf conf)] + cluster (DistributedClusterState. conf conf nil (ClusterStateContext.))] (println "Command: [" command "]") (condp = command "list" http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/storm-core/src/clj/org/apache/storm/command/shell_submission.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj index 8a5eb21..3978d2f 100644 --- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj +++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj @@ -16,7 +16,7 @@ (ns org.apache.storm.command.shell-submission (:import [org.apache.storm StormSubmitter] [org.apache.storm.zookeeper Zookeeper]) - (:use [org.apache.storm thrift util config log zookeeper]) + (:use [org.apache.storm thrift util config log]) (:require [clojure.string :as str]) (:import [org.apache.storm.utils ConfigUtils]) (:gen-class)) http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/storm-core/src/clj/org/apache/storm/converter.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/converter.clj b/storm-core/src/clj/org/apache/storm/converter.clj index bb2dc87..d169301 100644 --- a/storm-core/src/clj/org/apache/storm/converter.clj +++ b/storm-core/src/clj/org/apache/storm/converter.clj @@ -181,9 +181,9 @@ (defn thriftify-storm-base [storm-base] (doto (StormBase.) (.set_name (:storm-name storm-base)) - (.set_launch_time_secs (int (:launch-time-secs storm-base))) + (.set_launch_time_secs (if (:launch-time-secs storm-base) (int (:launch-time-secs storm-base)) 0)) (.set_status (convert-to-status-from-symbol (:status storm-base))) - (.set_num_workers (int (:num-workers storm-base))) + (.set_num_workers (if (:num-workers storm-base) (int (:num-workers storm-base)) 0)) (.set_component_executors (map-val int (:component->executors storm-base))) (.set_owner (:owner storm-base)) (.set_topology_action_options (thriftify-topology-action-options storm-base)) @@ -234,16 +234,6 @@ (.set_executor_stats (thriftify-stats (filter second (:executor-stats worker-hb)))) (.set_time_secs (:time-secs worker-hb))))) -(defn clojurify-error [^ErrorInfo error] - (if error - { - :error (.get_error error) - :time-secs (.get_error_time_secs error) - :host (.get_host error) - :port (.get_port error) - } - )) - (defn thriftify-error [error] (doto (ErrorInfo. (:error error) (:time-secs error)) (.set_host (:host error)) http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/storm-core/src/clj/org/apache/storm/daemon/common.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj b/storm-core/src/clj/org/apache/storm/daemon/common.clj index 6c184fd..c9534f4 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/common.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj @@ -13,14 +13,16 @@ ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ;; See the License for the specific language governing permissions and ;; limitations under the License. +;TopologyActionOptions TopologyStatus StormBase RebalanceOptions KillOptions (ns org.apache.storm.daemon.common (:use [org.apache.storm log config util]) - (:import [org.apache.storm.generated StormTopology + (:import [org.apache.storm.generated StormTopology NodeInfo InvalidTopologyException GlobalStreamId] [org.apache.storm.utils ThriftTopologyUtils]) (:import [org.apache.storm.utils Utils ConfigUtils]) (:import [org.apache.storm.task WorkerTopologyContext]) (:import [org.apache.storm Constants]) + (:import [org.apache.storm.cluster StormZkClusterState]) (:import [org.apache.storm.metric SystemBolt]) (:import [org.apache.storm.metric EventLoggerBolt]) (:import [org.apache.storm.security.auth IAuthorizer]) @@ -72,18 +74,19 @@ (defn new-executor-stats [] (ExecutorStats. 0 0 0 0 0)) + (defn get-storm-id [storm-cluster-state storm-name] - (let [active-storms (.active-storms storm-cluster-state)] + (let [active-storms (.activeStorms storm-cluster-state)] (find-first - #(= storm-name (:storm-name (.storm-base storm-cluster-state % nil))) + #(= storm-name (.get_name (.stormBase storm-cluster-state % nil))) active-storms) )) (defn topology-bases [storm-cluster-state] - (let [active-topologies (.active-storms storm-cluster-state)] + (let [active-topologies (.activeStorms storm-cluster-state)] (into {} (dofor [id active-topologies] - [id (.storm-base storm-cluster-state id nil)] + [id (.stormBase storm-cluster-state id nil)] )) )) http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index 82d56a9..e50e150 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -34,11 +34,10 @@ (:import [org.apache.storm.daemon Shutdownable]) (:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric]) (:import [org.apache.storm Config Constants]) - (:import [org.apache.storm.cluster ClusterStateContext DaemonType]) + (:import [org.apache.storm.cluster ClusterStateContext DaemonType StormZkClusterState Cluster]) (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping]) (:import [java.util.concurrent ConcurrentLinkedQueue]) - (:require [org.apache.storm [thrift :as thrift] - [cluster :as cluster] [disruptor :as disruptor] [stats :as stats]]) + (:require [org.apache.storm [thrift :as thrift] [disruptor :as disruptor] [stats :as stats]]) (:require [org.apache.storm.daemon [task :as task]]) (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics]) (:require [clojure.set :as set])) @@ -207,7 +206,7 @@ (swap! interval-errors inc) (when (<= @interval-errors max-per-interval) - (cluster/report-error (:storm-cluster-state executor) (:storm-id executor) (:component-id executor) + (.reportError (:storm-cluster-state executor) (:storm-id executor) (:component-id executor) (hostname storm-conf) (.getThisWorkerPort (:worker-context executor)) error) )))) @@ -252,9 +251,8 @@ :batch-transfer-queue batch-transfer->worker :transfer-fn (mk-executor-transfer-fn batch-transfer->worker storm-conf) :suicide-fn (:suicide-fn worker) - :storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state worker) - :acls (Utils/getWorkerACL storm-conf) - :context (ClusterStateContext. DaemonType/WORKER)) + :storm-cluster-state (StormZkClusterState. (:cluster-state worker) (Utils/getWorkerACL storm-conf) + (ClusterStateContext. DaemonType/WORKER)) :type executor-type ;; TODO: should refactor this to be part of the executor specific map (spout or bolt with :common field) :stats (mk-executor-stats <> (ConfigUtils/samplingRate storm-conf)) http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index de5a14e..9b00df3 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@ -40,7 +40,7 @@ (:import [org.apache.storm.nimbus NimbusInfo]) (:import [org.apache.storm.utils TimeCacheMap TimeCacheMap$ExpiredCallback Utils ConfigUtils TupleUtils ThriftTopologyUtils BufferFileInputStream BufferInputStream]) - (:import [org.apache.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo + (:import [org.apache.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo ClusterWorkerHeartbeat ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo TopologyHistoryInfo ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice SettableBlobMeta ReadableBlobMeta @@ -48,10 +48,9 @@ ProfileRequest ProfileAction NodeInfo]) (:import [org.apache.storm.daemon Shutdownable]) (:import [org.apache.storm.validation ConfigValidation]) - (:import [org.apache.storm.cluster ClusterStateContext DaemonType]) - (:use [org.apache.storm util config log timer zookeeper local-state]) - (:require [org.apache.storm [cluster :as cluster] - [converter :as converter] + (:import [org.apache.storm.cluster ClusterStateContext DaemonType StormZkClusterState]) + (:use [org.apache.storm util config log timer local-state converter]) + (:require [org.apache.storm [converter :as converter] [stats :as stats]]) (:require [clojure.set :as set]) (:import [org.apache.storm.daemon.common StormBase Assignment]) @@ -174,11 +173,11 @@ :authorization-handler (mk-authorization-handler (conf NIMBUS-AUTHORIZER) conf) :impersonation-authorization-handler (mk-authorization-handler (conf NIMBUS-IMPERSONATION-AUTHORIZER) conf) :submitted-count (atom 0) - :storm-cluster-state (cluster/mk-storm-cluster-state conf :acls (when + :storm-cluster-state (StormZkClusterState. conf (when (Utils/isZkAuthenticationConfiguredStormServer conf) NIMBUS-ZK-ACLS) - :context (ClusterStateContext. DaemonType/NIMBUS)) + (ClusterStateContext. DaemonType/NIMBUS)) :submit-lock (Object.) :cred-update-lock (Object.) :log-update-lock (Object.) @@ -275,11 +274,11 @@ (defn do-rebalance [nimbus storm-id status storm-base] (let [rebalance-options (:topology-action-options storm-base)] - (.update-storm! (:storm-cluster-state nimbus) + (.updateStorm (:storm-cluster-state nimbus) storm-id - (-> {:topology-action-options nil} + (thriftify-storm-base (-> {:topology-action-options nil} (assoc-non-nil :component->executors (:component->executors rebalance-options)) - (assoc-non-nil :num-workers (:num-workers rebalance-options))))) + (assoc-non-nil :num-workers (:num-workers rebalance-options)))))) (mk-assignments nimbus :scratch-topology-id storm-id)) (defn state-transitions [nimbus storm-id status storm-base] @@ -303,12 +302,12 @@ :kill (kill-transition nimbus storm-id) :remove (fn [] (log-message "Killing topology: " storm-id) - (.remove-storm! (:storm-cluster-state nimbus) + (.removeStorm (:storm-cluster-state nimbus) storm-id) (when (instance? LocalFsBlobStore (:blob-store nimbus)) (doseq [blob-key (get-key-list-from-id (:conf nimbus) storm-id)] - (.remove-blobstore-key! (:storm-cluster-state nimbus) blob-key) - (.remove-key-version! (:storm-cluster-state nimbus) blob-key))) + (.removeBlobstoreKey (:storm-cluster-state nimbus) blob-key) + (.removeKeyVersion (:storm-cluster-state nimbus) blob-key))) nil) } :rebalancing {:startup (fn [] (delay-event nimbus @@ -332,7 +331,7 @@ (locking (:submit-lock nimbus) (let [system-events #{:startup} [event & event-args] (if (keyword? event) [event] event) - storm-base (-> nimbus :storm-cluster-state (.storm-base storm-id nil)) + storm-base (clojurify-storm-base (-> nimbus :storm-cluster-state (.stormBase storm-id nil))) status (:status storm-base)] ;; handles the case where event was scheduled but topology has been removed (if-not status @@ -362,7 +361,7 @@ storm-base-updates)] (when storm-base-updates - (.update-storm! (:storm-cluster-state nimbus) storm-id storm-base-updates))))) + (.updateStorm (:storm-cluster-state nimbus) storm-id (thriftify-storm-base storm-base-updates)))))) ))) (defn transition-name! [nimbus storm-name event & args] @@ -411,7 +410,7 @@ (defaulted (apply merge-with set/union (for [a assignments - [_ [node port]] (-> (.assignment-info storm-cluster-state a nil) :executor->node+port)] + [_ [node port]] (-> (clojurify-assignment (.assignmentInfo storm-cluster-state a nil)) :executor->node+port)] {node #{port}} )) {}) @@ -424,7 +423,7 @@ (into {} (mapcat (fn [id] - (if-let [info (.supervisor-info storm-cluster-state id)] + (if-let [info (clojurify-supervisor-info (.supervisorInfo storm-cluster-state id))] [[id info]] )) supervisor-ids)) @@ -469,13 +468,13 @@ (when tmp-jar-location ;;in local mode there is no jar (.createBlob blob-store jar-key (FileInputStream. tmp-jar-location) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject) (if (instance? LocalFsBlobStore blob-store) - (.setup-blobstore! storm-cluster-state jar-key nimbus-host-port-info (get-version-for-key jar-key nimbus-host-port-info conf)))) + (.setupBlobstore storm-cluster-state jar-key nimbus-host-port-info (get-version-for-key jar-key nimbus-host-port-info conf)))) (.createBlob blob-store conf-key (Utils/toCompressedJsonConf storm-conf) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject) (if (instance? LocalFsBlobStore blob-store) - (.setup-blobstore! storm-cluster-state conf-key nimbus-host-port-info (get-version-for-key conf-key nimbus-host-port-info conf))) + (.setupBlobstore storm-cluster-state conf-key nimbus-host-port-info (get-version-for-key conf-key nimbus-host-port-info conf))) (.createBlob blob-store code-key (Utils/serialize topology) (SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject) (if (instance? LocalFsBlobStore blob-store) - (.setup-blobstore! storm-cluster-state code-key nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info conf))))) + (.setupBlobstore storm-cluster-state code-key nimbus-host-port-info (get-version-for-key code-key nimbus-host-port-info conf))))) (defn- read-storm-topology [storm-id blob-store] (Utils/deserialize @@ -540,7 +539,7 @@ (defn read-topology-details [nimbus storm-id] (let [blob-store (:blob-store nimbus) storm-base (or - (.storm-base (:storm-cluster-state nimbus) storm-id nil) + (clojurify-storm-base (.stormBase (:storm-cluster-state nimbus) storm-id nil)) (throw (NotAliveException. storm-id))) topology-conf (read-storm-conf-as-nimbus storm-id blob-store) topology (read-storm-topology-as-nimbus storm-id blob-store) @@ -587,7 +586,12 @@ (defn update-heartbeats! [nimbus storm-id all-executors existing-assignment] (log-debug "Updating heartbeats for " storm-id " " (pr-str all-executors)) (let [storm-cluster-state (:storm-cluster-state nimbus) - executor-beats (.executor-beats storm-cluster-state storm-id (:executor->node+port existing-assignment)) + executor-beats (let [executor-stats-java-map (.executorBeats storm-cluster-state storm-id (.get_executor_node_port (thriftify-assignment existing-assignment)))] + (->> (clojurify-structure executor-stats-java-map) + (map (fn [^ExecutorInfo executor-info ^ClusterWorkerHeartbeat cluster-worker-heartbeat] + {[(.get_task_start executor-info) (.get_task_end executor-info)] (clojurify-zk-worker-hb cluster-worker-heartbeat)})) + (into {}))) + cache (update-heartbeat-cache (@(:heartbeats-cache nimbus) storm-id) executor-beats all-executors @@ -637,7 +641,7 @@ (defn- compute-executors [nimbus storm-id] (let [conf (:conf nimbus) blob-store (:blob-store nimbus) - storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil) + storm-base (clojurify-storm-base (.stormBase (:storm-cluster-state nimbus) storm-id nil)) component->executors (:component->executors storm-base) storm-conf (read-storm-conf-as-nimbus storm-id blob-store) topology (read-storm-topology-as-nimbus storm-id blob-store) @@ -897,7 +901,7 @@ storm-cluster-state (:storm-cluster-state nimbus) ^INimbus inimbus (:inimbus nimbus) ;; read all the topologies - topology-ids (.active-storms storm-cluster-state) + topology-ids (.activeStorms storm-cluster-state) topologies (into {} (for [tid topology-ids] {tid (read-topology-details nimbus tid)})) topologies (Topologies. topologies) @@ -908,7 +912,7 @@ ;; we exclude its assignment, meaning that all the slots occupied by its assignment ;; will be treated as free slot in the scheduler code. (when (or (nil? scratch-topology-id) (not= tid scratch-topology-id)) - {tid (.assignment-info storm-cluster-state tid nil)}))) + {tid (clojurify-assignment (.assignmentInfo storm-cluster-state tid nil))}))) ;; make the new assignments for topologies new-scheduler-assignments (compute-new-scheduler-assignments nimbus @@ -957,7 +961,7 @@ (log-debug "Assignment for " topology-id " hasn't changed") (do (log-message "Setting new assignment for topology id " topology-id ": " (pr-str assignment)) - (.set-assignment! storm-cluster-state topology-id assignment) + (.setAssignment storm-cluster-state topology-id (thriftify-assignment assignment)) ))) (->> new-assignments (map (fn [[topology-id assignment]] @@ -984,9 +988,9 @@ topology (system-topology! storm-conf (read-storm-topology storm-id blob-store)) num-executors (->> (all-components topology) (map-val num-start-executors))] (log-message "Activating " storm-name ": " storm-id) - (.activate-storm! storm-cluster-state + (.activateStorm storm-cluster-state storm-id - (StormBase. storm-name + (thriftify-storm-base (StormBase. storm-name (current-time-secs) {:type topology-initial-status} (storm-conf TOPOLOGY-WORKERS) @@ -994,7 +998,7 @@ (storm-conf TOPOLOGY-SUBMITTER-USER) nil nil - {})) + {}))) (notify-topology-action-listener nimbus storm-name "activate"))) ;; Master: @@ -1046,10 +1050,10 @@ (set (.filterAndListKeys blob-store to-id)))) (defn cleanup-storm-ids [conf storm-cluster-state blob-store] - (let [heartbeat-ids (set (.heartbeat-storms storm-cluster-state)) - error-ids (set (.error-topologies storm-cluster-state)) + (let [heartbeat-ids (set (.heartbeatStorms storm-cluster-state)) + error-ids (set (.errorTopologies storm-cluster-state)) code-ids (code-ids blob-store) - assigned-ids (set (.active-storms storm-cluster-state))] + assigned-ids (set (.activeStorms storm-cluster-state))] (set/difference (set/union heartbeat-ids error-ids code-ids) assigned-ids) )) @@ -1113,7 +1117,7 @@ (try (.deleteBlob blob-store key nimbus-subject) (if (instance? LocalFsBlobStore blob-store) - (.remove-blobstore-key! storm-cluster-state key)) + (.removeBlobstoreKey storm-cluster-state key)) (catch Exception e (log-message "Exception" e)))) @@ -1133,8 +1137,8 @@ (when-not (empty? to-cleanup-ids) (doseq [id to-cleanup-ids] (log-message "Cleaning up " id) - (.teardown-heartbeats! storm-cluster-state id) - (.teardown-topology-errors! storm-cluster-state id) + (.teardownHeartbeats storm-cluster-state id) + (.teardownTopologyErrors storm-cluster-state id) (rmr (ConfigUtils/masterStormDistRoot conf id)) (blob-rm-topology-keys id blob-store storm-cluster-state) (swap! (:heartbeats-cache nimbus) dissoc id))))) @@ -1169,21 +1173,21 @@ (let [storm-cluster-state (:storm-cluster-state nimbus) blob-store (:blob-store nimbus) code-ids (set (code-ids blob-store)) - active-topologies (set (.active-storms storm-cluster-state)) + active-topologies (set (.activeStorms storm-cluster-state)) corrupt-topologies (set/difference active-topologies code-ids)] (doseq [corrupt corrupt-topologies] (log-message "Corrupt topology " corrupt " has state on zookeeper but doesn't have a local dir on Nimbus. Cleaning up...") - (.remove-storm! storm-cluster-state corrupt) + (.removeStorm storm-cluster-state corrupt) (if (instance? LocalFsBlobStore blob-store) (doseq [blob-key (get-key-list-from-id (:conf nimbus) corrupt)] - (.remove-blobstore-key! storm-cluster-state blob-key)))))) + (.removeBlobstoreKey storm-cluster-state blob-key)))))) (defn setup-blobstore [nimbus] "Sets up blobstore state for all current keys." (let [storm-cluster-state (:storm-cluster-state nimbus) blob-store (:blob-store nimbus) local-set-of-keys (set (get-key-seq-from-blob-store blob-store)) - all-keys (set (.active-keys storm-cluster-state)) + all-keys (set (.activeKeys storm-cluster-state)) locally-available-active-keys (set/intersection local-set-of-keys all-keys) keys-to-delete (set/difference local-set-of-keys all-keys) conf (:conf nimbus) @@ -1193,10 +1197,10 @@ (.deleteBlob blob-store key nimbus-subject)) (log-debug "Creating list of key entries for blobstore inside zookeeper" all-keys "local" locally-available-active-keys) (doseq [key locally-available-active-keys] - (.setup-blobstore! storm-cluster-state key (:nimbus-host-port-info nimbus) (get-version-for-key key nimbus-host-port-info conf))))) + (.setupBlobstore storm-cluster-state key (:nimbus-host-port-info nimbus) (get-version-for-key key nimbus-host-port-info conf))))) (defn- get-errors [storm-cluster-state storm-id component-id] - (->> (.errors storm-cluster-state storm-id component-id) + (->> (apply clojurify-error (.errors storm-cluster-state storm-id component-id)) (map #(doto (ErrorInfo. (:error %) (:time-secs %)) (.set_host (:host %)) (.set_port (:port %)))))) @@ -1293,11 +1297,11 @@ blob-store (:blob-store nimbus) renewers (:cred-renewers nimbus) update-lock (:cred-update-lock nimbus) - assigned-ids (set (.active-storms storm-cluster-state))] + assigned-ids (set (.activeStorms storm-cluster-state))] (when-not (empty? assigned-ids) (doseq [id assigned-ids] (locking update-lock - (let [orig-creds (.credentials storm-cluster-state id nil) + (let [orig-creds (clojurify-crdentials (.credentials storm-cluster-state id nil)) topology-conf (try-read-storm-conf (:conf nimbus) id blob-store)] (if orig-creds (let [new-creds (HashMap. orig-creds)] @@ -1305,7 +1309,7 @@ (log-message "Renewing Creds For " id " with " renewer) (.renew renewer new-creds (Collections/unmodifiableMap topology-conf))) (when-not (= orig-creds new-creds) - (.set-credentials! storm-cluster-state id new-creds topology-conf) + (.setCredentials storm-cluster-state id (thriftify-credentials new-creds) topology-conf) )))))))) (log-message "not a leader skipping , credential renweal."))) @@ -1370,11 +1374,11 @@ operation) topology (try-read-storm-topology storm-id blob-store) task->component (storm-task-info topology topology-conf) - base (.storm-base storm-cluster-state storm-id nil) + base (clojurify-storm-base (.stormBase storm-cluster-state storm-id nil)) launch-time-secs (if base (:launch-time-secs base) (throw (NotAliveException. (str storm-id)))) - assignment (.assignment-info storm-cluster-state storm-id nil) + assignment (clojurify-assignment (.assignmentInfo storm-cluster-state storm-id nil)) beats (map-val :heartbeat (get @(:heartbeats-cache nimbus) storm-id)) all-components (set (vals task->component))] @@ -1388,16 +1392,16 @@ :task->component task->component :base base})) get-last-error (fn [storm-cluster-state storm-id component-id] - (if-let [e (.last-error storm-cluster-state + (if-let [e (clojurify-error (.lastError storm-cluster-state storm-id - component-id)] + component-id))] (doto (ErrorInfo. (:error e) (:time-secs e)) (.set_host (:host e)) (.set_port (:port e)))))] (.prepare ^org.apache.storm.nimbus.ITopologyValidator (:validator nimbus) conf) ;add to nimbuses - (.add-nimbus-host! (:storm-cluster-state nimbus) (.toHostPortString (:nimbus-host-port-info nimbus)) + (.addNimbusHost (:storm-cluster-state nimbus) (.toHostPortString (:nimbus-host-port-info nimbus)) (NimbusSummary. (.getHost (:nimbus-host-port-info nimbus)) (.getPort (:nimbus-host-port-info nimbus)) @@ -1413,7 +1417,7 @@ (setup-blobstore nimbus)) (when (is-leader nimbus :throw-exception false) - (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))] + (doseq [storm-id (.activeStorms (:storm-cluster-state nimbus))] (transition! nimbus storm-id :startup))) (schedule-recurring (:timer nimbus) 0 @@ -1520,12 +1524,12 @@ (locking (:submit-lock nimbus) (check-storm-active! nimbus storm-name false) ;;cred-update-lock is not needed here because creds are being added for the first time. - (.set-credentials! storm-cluster-state storm-id credentials storm-conf) + (.setCredentials storm-cluster-state storm-id (thriftify-credentials credentials) storm-conf) (log-message "uploadedJar " uploadedJarLocation) (setup-storm-code nimbus conf storm-id uploadedJarLocation total-storm-conf topology) (wait-for-desired-code-replication nimbus total-storm-conf storm-id) - (.setup-heartbeats! storm-cluster-state storm-id) - (.setup-backpressure! storm-cluster-state storm-id) + (.setupHeatbeats storm-cluster-state storm-id) + (.setupBackpressure storm-cluster-state storm-id) (notify-topology-action-listener nimbus storm-name "submitTopology") (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE :inactive TopologyInitialStatus/ACTIVE :active}] @@ -1613,7 +1617,7 @@ (log-message "Nimbus setting debug to " enable? " for storm-name '" storm-name "' storm-id '" storm-id "' sampling pct '" spct "'" (if (not (clojure.string/blank? component-id)) (str " component-id '" component-id "'"))) (locking (:submit-lock nimbus) - (.update-storm! storm-cluster-state storm-id storm-base-updates)))) + (.updateStorm storm-cluster-state (thriftify-storm-base storm-id storm-base-updates))))) (^void setWorkerProfiler [this ^String id ^ProfileRequest profileRequest] @@ -1622,7 +1626,7 @@ storm-name (topology-conf TOPOLOGY-NAME) _ (check-authorization! nimbus storm-name topology-conf "setWorkerProfiler") storm-cluster-state (:storm-cluster-state nimbus)] - (.set-worker-profile-request storm-cluster-state id profileRequest))) + (.setWorkerProfileRequest storm-cluster-state id profileRequest))) (^List getComponentPendingProfileActions [this ^String id ^String component_id ^ProfileAction action] @@ -1635,7 +1639,7 @@ [(node->host node) port]) executor->node+port) nodeinfos (stats/extract-nodeinfos-from-hb-for-comp executor->host+port task->component false component_id) - all-pending-actions-for-topology (.get-topology-profile-requests storm-cluster-state id true) + all-pending-actions-for-topology (clojurify-profile-request (.getTopologyProfileRequests storm-cluster-state id true)) latest-profile-actions (remove nil? (map (fn [nodeInfo] (->> all-pending-actions-for-topology (filter #(and (= (:host nodeInfo) (.get_node (.get_nodeInfo %))) @@ -1653,7 +1657,7 @@ storm-name (topology-conf TOPOLOGY-NAME) _ (check-authorization! nimbus storm-name topology-conf "setLogConfig") storm-cluster-state (:storm-cluster-state nimbus) - merged-log-config (or (.topology-log-config storm-cluster-state id nil) (LogConfig.)) + merged-log-config (or (.topologyLogConfig storm-cluster-state id nil) (LogConfig.)) named-loggers (.get_named_logger_level merged-log-config)] (doseq [[_ level] named-loggers] (.set_action level LogLevelAction/UNCHANGED)) @@ -1671,7 +1675,7 @@ (.containsKey named-loggers logger-name)) (.remove named-loggers logger-name)))))) (log-message "Setting log config for " storm-name ":" merged-log-config) - (.set-topology-log-config! storm-cluster-state id merged-log-config))) + (.setTopologyLogConfig storm-cluster-state id merged-log-config))) (uploadNewCredentials [this storm-name credentials] (mark! nimbus:num-uploadNewCredentials-calls) @@ -1680,7 +1684,7 @@ topology-conf (try-read-storm-conf conf storm-id blob-store) creds (when credentials (.get_creds credentials))] (check-authorization! nimbus storm-name topology-conf "uploadNewCredentials") - (locking (:cred-update-lock nimbus) (.set-credentials! storm-cluster-state storm-id creds topology-conf)))) + (locking (:cred-update-lock nimbus) (.setCredentials storm-cluster-state storm-id (thriftify-credentials creds) topology-conf)))) (beginFileUpload [this] (mark! nimbus:num-beginFileUpload-calls) @@ -1755,7 +1759,7 @@ storm-name (topology-conf TOPOLOGY-NAME) _ (check-authorization! nimbus storm-name topology-conf "getLogConfig") storm-cluster-state (:storm-cluster-state nimbus) - log-config (.topology-log-config storm-cluster-state id nil)] + log-config (.topologyLogConfig storm-cluster-state id nil)] (if log-config log-config (LogConfig.)))) (^String getTopologyConf [this ^String id] @@ -1800,7 +1804,8 @@ (when-let [version (:version info)] (.set_version sup-sum version)) sup-sum)) nimbus-uptime ((:uptime nimbus)) - bases (topology-bases storm-cluster-state) + javabases (topology-bases storm-cluster-state) + bases (into {} (dofor [[id base] javabases][id (clojurify-storm-base base)])) nimbuses (.nimbuses storm-cluster-state) ;;update the isLeader field for each nimbus summary @@ -1812,7 +1817,7 @@ (.set_isLeader nimbus-summary (and (= leader-host (.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary)))))) topology-summaries (dofor [[id base] bases :when base] - (let [assignment (.assignment-info storm-cluster-state id nil) + (let [assignment (clojurify-assignment (.assignmentInfo storm-cluster-state id nil)) topo-summ (TopologySummary. id (:storm-name base) (->> (:executor->node+port assignment) @@ -1939,7 +1944,7 @@ nimbus-host-port-info (:nimbus-host-port-info nimbus) conf (:conf nimbus)] (if (instance? LocalFsBlobStore blob-store) - (.setup-blobstore! storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf))) + (.setupBlobstore storm-cluster-state blob-key nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info conf))) (log-debug "Created state in zookeeper" storm-cluster-state blob-store nimbus-host-port-info))) (^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk] @@ -2019,8 +2024,8 @@ (.subject))] (.deleteBlob (:blob-store nimbus) blob-key subject) (when (instance? LocalFsBlobStore blob-store) - (.remove-blobstore-key! (:storm-cluster-state nimbus) blob-key) - (.remove-key-version! (:storm-cluster-state nimbus) blob-key)) + (.removeBlobstoreKey (:storm-cluster-state nimbus) blob-key) + (.removeKeyVersion (:storm-cluster-state nimbus) blob-key)) (log-message "Deleted blob for key " blob-key))) (^ListBlobsResult listBlobs [this ^String session] @@ -2157,7 +2162,8 @@ (^TopologyHistoryInfo getTopologyHistory [this ^String user] (let [storm-cluster-state (:storm-cluster-state nimbus) - bases (topology-bases storm-cluster-state) + javabases (topology-bases storm-cluster-state) + bases (into {} (dofor [[id base] javabases][id (clojurify-storm-base base)])) assigned-topology-ids (.assignments storm-cluster-state nil) user-group-match-fn (fn [topo-id user conf] (let [topology-conf (try-read-storm-conf conf topo-id (:blob-store nimbus)) http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index 337a1b4..079b221 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@ -19,11 +19,11 @@ [org.apache.storm.utils LocalState Time Utils ConfigUtils] [org.apache.storm.daemon Shutdownable] [org.apache.storm Constants] - [org.apache.storm.cluster ClusterStateContext DaemonType] + [org.apache.storm.cluster ClusterStateContext DaemonType StormZkClusterState Cluster] [java.net JarURLConnection] [java.net URI] [org.apache.commons.io FileUtils]) - (:use [org.apache.storm config util log timer local-state]) + (:use [org.apache.storm config util log timer local-state converter]) (:import [org.apache.storm.generated AuthorizationException KeyNotFoundException WorkerResources]) (:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo]) (:import [java.nio.file Files StandardCopyOption]) @@ -33,7 +33,7 @@ (:use [org.apache.storm.daemon common]) (:require [org.apache.storm.command [healthcheck :as healthcheck]]) (:require [org.apache.storm.daemon [worker :as worker]] - [org.apache.storm [process-simulator :as psim] [cluster :as cluster] [event :as event]] + [org.apache.storm [process-simulator :as psim] [event :as event]] [clojure.set :as set]) (:import [org.apache.thrift.transport TTransportException]) (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms]) @@ -63,21 +63,22 @@ (->> (dofor [sid storm-ids] (let [recorded-version (:version (get assignment-versions sid))] - (if-let [assignment-version (.assignment-version storm-cluster-state sid callback)] + (if-let [assignment-version (.assignmentVersion storm-cluster-state sid callback)] (if (= assignment-version recorded-version) {sid (get assignment-versions sid)} - {sid (.assignment-info-with-version storm-cluster-state sid callback)}) + {sid (.assignmentInfoWithVersion storm-cluster-state sid callback)}) {sid nil}))) (apply merge) (filter-val not-nil?)) new-profiler-actions (->> (dofor [sid (distinct storm-ids)] - (if-let [topo-profile-actions (.get-topology-profile-requests storm-cluster-state sid false)] + + (if-let [topo-profile-actions (into [] (for [request (.getTopologyProfileRequests storm-cluster-state sid false)] (clojurify-profile-request request)))] {sid topo-profile-actions})) (apply merge))] - - {:assignments (into {} (for [[k v] new-assignments] [k (:data v)])) + + {:assignments (into {} (for [[k v] new-assignments] [k (clojurify-assignment (:data v))])) :profiler-actions new-profiler-actions :versions new-assignments}))) @@ -316,11 +317,9 @@ :uptime (uptime-computer) :version STORM-VERSION :worker-thread-pids-atom (atom {}) - :storm-cluster-state (cluster/mk-storm-cluster-state conf :acls (when - (Utils/isZkAuthenticationConfiguredStormServer - conf) - SUPERVISOR-ZK-ACLS) - :context (ClusterStateContext. DaemonType/SUPERVISOR)) + :storm-cluster-state (Cluster/mkStormClusterState conf (when (Utils/isZkAuthenticationConfiguredStormServer conf) + SUPERVISOR-ZK-ACLS) + (ClusterStateContext. DaemonType/SUPERVISOR)) :local-state (ConfigUtils/supervisorState conf) :supervisor-id (.getSupervisorId isupervisor) :assignment-id (.getAssignmentId isupervisor) @@ -675,7 +674,7 @@ (defn- delete-topology-profiler-action [storm-cluster-state storm-id profile-action] (log-message "Deleting profiler action.." profile-action) - (.delete-topology-profile-requests storm-cluster-state storm-id profile-action)) + (.deleteTopologyProfileRequests storm-cluster-state storm-id (thriftify-profile-request profile-action))) (defnk launch-profiler-action-for-worker "Launch profiler action for a worker" @@ -743,7 +742,7 @@ action-on-exit (fn [exit-code] (log-message log-prefix " profile-action exited for code: " exit-code) (if (and (= exit-code 0) stop?) - (delete-topology-profiler-action storm-cluster-state storm-id pro-action))) + (delete-topology-profiler-action storm-cluster-state storm-id (thriftify-profile-request pro-action)))) command (->> command (map str) (filter (complement empty?)))] (try @@ -776,10 +775,10 @@ synchronize-blobs-fn (update-blobs-for-all-topologies-fn supervisor) downloaded-storm-ids (set (read-downloaded-storm-ids conf)) run-profiler-actions-fn (mk-run-profiler-actions-for-all-topologies supervisor) - heartbeat-fn (fn [] (.supervisor-heartbeat! + heartbeat-fn (fn [] (.supervisorHeartbeat (:storm-cluster-state supervisor) (:supervisor-id supervisor) - (->SupervisorInfo (current-time-secs) + (thriftify-supervisor-info (->SupervisorInfo (current-time-secs) (:my-hostname supervisor) (:assignment-id supervisor) (keys @(:curr-assignment supervisor)) @@ -788,7 +787,7 @@ (conf SUPERVISOR-SCHEDULER-META) ((:uptime supervisor)) (:version supervisor) - (mk-supervisor-capacities conf))))] + (mk-supervisor-capacities conf)))))] (heartbeat-fn) ;; should synchronize supervisor so it doesn't launch anything after being down (optimization)
