delete zookeeper.clj zookeeper_state_factory.clj cluster.clj, but some tests 
still can't pass


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9a79fb7d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9a79fb7d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9a79fb7d

Branch: refs/heads/master
Commit: 9a79fb7de0e824e73c294738521e892f1d81fbb0
Parents: 682d31c
Author: xiaojian.fxj <[email protected]>
Authored: Wed Feb 3 20:28:05 2016 +0800
Committer: xiaojian.fxj <[email protected]>
Committed: Wed Feb 3 20:28:05 2016 +0800

----------------------------------------------------------------------
 conf/defaults.yaml                              |   2 +-
 storm-core/src/clj/org/apache/storm/cluster.clj | 691 -------------------
 .../cluster_state/zookeeper_state_factory.clj   | 163 -----
 .../org/apache/storm/command/dev_zookeeper.clj  |   2 +-
 .../clj/org/apache/storm/command/heartbeats.clj |   6 +-
 .../apache/storm/command/shell_submission.clj   |   2 +-
 .../src/clj/org/apache/storm/converter.clj      |  14 +-
 .../src/clj/org/apache/storm/daemon/common.clj  |  13 +-
 .../clj/org/apache/storm/daemon/executor.clj    |  12 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 138 ++--
 .../clj/org/apache/storm/daemon/supervisor.clj  |  35 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |  43 +-
 .../storm/pacemaker/pacemaker_state_factory.clj |  12 +-
 storm-core/src/clj/org/apache/storm/stats.clj   |   3 +-
 storm-core/src/clj/org/apache/storm/testing.clj |  14 +-
 storm-core/src/clj/org/apache/storm/thrift.clj  |   2 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj |   2 +-
 storm-core/src/clj/org/apache/storm/util.clj    |  11 +
 .../src/clj/org/apache/storm/zookeeper.clj      |  75 --
 .../jvm/org/apache/storm/callback/Callback.java |   3 +
 .../jvm/org/apache/storm/cluster/Cluster.java   |  38 +-
 .../org/apache/storm/cluster/ClusterState.java  |   2 +-
 .../storm/cluster/DistributedClusterState.java  |   7 +-
 .../apache/storm/cluster/StormClusterState.java |  34 +-
 .../storm/cluster/StormZkClusterState.java      | 109 +--
 .../testing/staticmocking/MockedCluster.java    |  31 +
 .../org/apache/storm/integration_test.clj       |  15 +-
 .../test/clj/org/apache/storm/cluster_test.clj  | 103 ++-
 .../test/clj/org/apache/storm/nimbus_test.clj   | 148 ++--
 .../storm/security/auth/nimbus_auth_test.clj    |   3 +-
 .../clj/org/apache/storm/supervisor_test.clj    |  29 +-
 .../test/jvm/org/apache/storm/ClusterTest.java  |  22 +
 32 files changed, 488 insertions(+), 1296 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/conf/defaults.yaml
----------------------------------------------------------------------
diff --git a/conf/defaults.yaml b/conf/defaults.yaml
index 8873d12..74605bb 100644
--- a/conf/defaults.yaml
+++ b/conf/defaults.yaml
@@ -51,7 +51,7 @@ storm.auth.simple-white-list.users: []
 storm.auth.simple-acl.users: []
 storm.auth.simple-acl.users.commands: []
 storm.auth.simple-acl.admins: []
-storm.cluster.state.store: 
"org.apache.storm.cluster_state.zookeeper_state_factory"
+storm.cluster.state.store: "org.apache.storm.cluster.StormZkClusterState"
 storm.meta.serialization.delegate: 
"org.apache.storm.serialization.GzipThriftSerializationDelegate"
 storm.codedistributor.class: 
"org.apache.storm.codedistributor.LocalFileSystemCodeDistributor"
 storm.workers.artifacts.dir: "workers-artifacts"

http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/storm-core/src/clj/org/apache/storm/cluster.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/cluster.clj 
b/storm-core/src/clj/org/apache/storm/cluster.clj
deleted file mode 100644
index 152423a..0000000
--- a/storm-core/src/clj/org/apache/storm/cluster.clj
+++ /dev/null
@@ -1,691 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.cluster
-  (:import [org.apache.zookeeper.data Stat ACL Id]
-           [org.apache.storm.generated SupervisorInfo Assignment StormBase 
ClusterWorkerHeartbeat ErrorInfo Credentials NimbusSummary
-            LogConfig ProfileAction ProfileRequest NodeInfo]
-           [java.io Serializable])
-  (:import [org.apache.zookeeper KeeperException 
KeeperException$NoNodeException ZooDefs ZooDefs$Ids ZooDefs$Perms])
-  (:import [org.apache.curator.framework CuratorFramework])
-  (:import [org.apache.storm.utils Utils])
-  (:import [org.apache.storm.cluster ClusterState ClusterStateContext 
ClusterStateListener ConnectionState])
-  (:import [java.security MessageDigest])
-  (:import [org.apache.zookeeper.server.auth DigestAuthenticationProvider])
-  (:import [org.apache.storm.nimbus NimbusInfo])
-  (:use [org.apache.storm util log config converter])
-  (:require [org.apache.storm [zookeeper :as zk]])
-  (:require [org.apache.storm.daemon [common :as common]]))
-
-(defn mk-topo-only-acls
-  [topo-conf]
-  (let [payload (.get topo-conf STORM-ZOOKEEPER-TOPOLOGY-AUTH-PAYLOAD)]
-    (when (Utils/isZkAuthenticationConfiguredTopology topo-conf)
-      [(first ZooDefs$Ids/CREATOR_ALL_ACL)
-       (ACL. ZooDefs$Perms/READ (Id. "digest" 
(DigestAuthenticationProvider/generateDigest payload)))])))
- 
-(defnk mk-distributed-cluster-state
-  [conf :auth-conf nil :acls nil :context (ClusterStateContext.)]
-  (let [clazz (Class/forName (or (conf STORM-CLUSTER-STATE-STORE)
-                                 
"org.apache.storm.cluster_state.zookeeper_state_factory"))
-        state-instance (.newInstance clazz)]
-    (log-debug "Creating cluster state: " (.toString clazz))
-    (or (.mkState state-instance conf auth-conf acls context)
-        nil)))
-
-(defprotocol StormClusterState
-  (assignments [this callback])
-  (assignment-info [this storm-id callback])
-  (assignment-info-with-version [this storm-id callback])
-  (assignment-version [this storm-id callback])
-  ;returns key information under /storm/blobstore/key
-  (blobstore-info [this blob-key])
-  ;returns list of nimbus summaries stored under 
/stormroot/nimbuses/<nimbus-ids> -> <data>
-  (nimbuses [this])
-  ;adds the NimbusSummary to /stormroot/nimbuses/nimbus-id
-  (add-nimbus-host! [this nimbus-id nimbus-summary])
-
-  (active-storms [this])
-  (storm-base [this storm-id callback])
-  (get-worker-heartbeat [this storm-id node port])
-  (get-worker-profile-requests [this storm-id nodeinfo thrift?])
-  (get-topology-profile-requests [this storm-id thrift?])
-  (set-worker-profile-request [this storm-id profile-request])
-  (delete-topology-profile-requests [this storm-id profile-request])
-  (executor-beats [this storm-id executor->node+port])
-  (supervisors [this callback])
-  (supervisor-info [this supervisor-id]) ;; returns nil if doesn't exist
-  (setup-heartbeats! [this storm-id])
-  (teardown-heartbeats! [this storm-id])
-  (teardown-topology-errors! [this storm-id])
-  (heartbeat-storms [this])
-  (error-topologies [this])
-  (set-topology-log-config! [this storm-id log-config])
-  (topology-log-config [this storm-id cb])
-  (worker-heartbeat! [this storm-id node port info])
-  (remove-worker-heartbeat! [this storm-id node port])
-  (supervisor-heartbeat! [this supervisor-id info])
-  (worker-backpressure! [this storm-id node port info])
-  (topology-backpressure [this storm-id callback])
-  (setup-backpressure! [this storm-id])
-  (remove-worker-backpressure! [this storm-id node port])
-  (activate-storm! [this storm-id storm-base])
-  (update-storm! [this storm-id new-elems])
-  (remove-storm-base! [this storm-id])
-  (set-assignment! [this storm-id info])
-  ;; sets up information related to key consisting of nimbus
-  ;; host:port and version info of the blob
-  (setup-blobstore! [this key nimbusInfo versionInfo])
-  (active-keys [this])
-  (blobstore [this callback])
-  (remove-storm! [this storm-id])
-  (remove-blobstore-key! [this blob-key])
-  (remove-key-version! [this blob-key])
-  (report-error [this storm-id component-id node port error])
-  (errors [this storm-id component-id])
-  (last-error [this storm-id component-id])
-  (set-credentials! [this storm-id creds topo-conf])
-  (credentials [this storm-id callback])
-  (disconnect [this]))
-
-(def ASSIGNMENTS-ROOT "assignments")
-(def CODE-ROOT "code")
-(def STORMS-ROOT "storms")
-(def SUPERVISORS-ROOT "supervisors")
-(def WORKERBEATS-ROOT "workerbeats")
-(def BACKPRESSURE-ROOT "backpressure")
-(def ERRORS-ROOT "errors")
-(def BLOBSTORE-ROOT "blobstore")
-; Stores the latest update sequence for a blob
-(def BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-ROOT "blobstoremaxkeysequencenumber")
-(def NIMBUSES-ROOT "nimbuses")
-(def CREDENTIALS-ROOT "credentials")
-(def LOGCONFIG-ROOT "logconfigs")
-(def PROFILERCONFIG-ROOT "profilerconfigs")
-
-(def ASSIGNMENTS-SUBTREE (str "/" ASSIGNMENTS-ROOT))
-(def STORMS-SUBTREE (str "/" STORMS-ROOT))
-(def SUPERVISORS-SUBTREE (str "/" SUPERVISORS-ROOT))
-(def WORKERBEATS-SUBTREE (str "/" WORKERBEATS-ROOT))
-(def BACKPRESSURE-SUBTREE (str "/" BACKPRESSURE-ROOT))
-(def ERRORS-SUBTREE (str "/" ERRORS-ROOT))
-;; Blobstore subtree /storm/blobstore
-(def BLOBSTORE-SUBTREE (str "/" BLOBSTORE-ROOT))
-(def BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-SUBTREE (str "/" 
BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-ROOT))
-(def NIMBUSES-SUBTREE (str "/" NIMBUSES-ROOT))
-(def CREDENTIALS-SUBTREE (str "/" CREDENTIALS-ROOT))
-(def LOGCONFIG-SUBTREE (str "/" LOGCONFIG-ROOT))
-(def PROFILERCONFIG-SUBTREE (str "/" PROFILERCONFIG-ROOT))
-
-(defn supervisor-path
-  [id]
-  (str SUPERVISORS-SUBTREE "/" id))
-
-(defn assignment-path
-  [id]
-  (str ASSIGNMENTS-SUBTREE "/" id))
-
-(defn blobstore-path
-  [key]
-  (str BLOBSTORE-SUBTREE "/" key))
-
-(defn blobstore-max-key-sequence-number-path
-  [key]
-  (str BLOBSTORE-MAX-KEY-SEQUENCE-NUMBER-SUBTREE "/" key))
-
-(defn nimbus-path
-  [id]
-  (str NIMBUSES-SUBTREE "/" id))
-
-(defn storm-path
-  [id]
-  (str STORMS-SUBTREE "/" id))
-
-(defn workerbeat-storm-root
-  [storm-id]
-  (str WORKERBEATS-SUBTREE "/" storm-id))
-
-(defn workerbeat-path
-  [storm-id node port]
-  (str (workerbeat-storm-root storm-id) "/" node "-" port))
-
-(defn backpressure-storm-root
-  [storm-id]
-  (str BACKPRESSURE-SUBTREE "/" storm-id))
-
-(defn backpressure-path
-  [storm-id node port]
-  (str (backpressure-storm-root storm-id) "/" node "-" port))
-
-(defn error-storm-root
-  [storm-id]
-  (str ERRORS-SUBTREE "/" storm-id))
-
-(defn error-path
-  [storm-id component-id]
-  (str (error-storm-root storm-id) "/" (url-encode component-id)))
-
-(def last-error-path-seg "last-error")
-
-(defn last-error-path
-  [storm-id component-id]
-  (str (error-storm-root storm-id)
-       "/"
-       (url-encode component-id)
-       "-"
-       last-error-path-seg))
-
-(defn credentials-path
-  [storm-id]
-  (str CREDENTIALS-SUBTREE "/" storm-id))
-
-(defn log-config-path
-  [storm-id]
-  (str LOGCONFIG-SUBTREE "/" storm-id))
-
-(defn profiler-config-path
-  ([storm-id]
-   (str PROFILERCONFIG-SUBTREE "/" storm-id))
-  ([storm-id host port request-type]
-   (str (profiler-config-path storm-id) "/" host "_" port "_" request-type)))
-
-(defn- issue-callback!
-  [cb-atom]
-  (let [cb @cb-atom]
-    (reset! cb-atom nil)
-    (when cb
-      (cb))))
-
-(defn- issue-map-callback!
-  [cb-atom id]
-  (let [cb (@cb-atom id)]
-    (swap! cb-atom dissoc id)
-    (when cb
-      (cb id))))
-
-(defn- maybe-deserialize
-  [ser clazz]
-  (when ser
-    (Utils/deserialize ser clazz)))
-
-(defrecord TaskError [error time-secs host port])
-
-(defn- parse-error-path
-  [^String p]
-  (Long/parseLong (.substring p 1)))
-
-(defn convert-executor-beats
-  "Ensures that we only return heartbeats for executors assigned to
-  this worker."
-  [executors worker-hb]
-  (let [executor-stats (:executor-stats worker-hb)]
-    (->> executors
-         (map (fn [t]
-                (if (contains? executor-stats t)
-                  {t {:time-secs (:time-secs worker-hb)
-                      :uptime (:uptime worker-hb)
-                      :stats (get executor-stats t)}})))
-         (into {}))))
-
-;; Watches should be used for optimization. When ZK is reconnecting, they're 
not guaranteed to be called.
-(defnk mk-storm-cluster-state
-  [cluster-state-spec :acls nil :context (ClusterStateContext.)]
-  (let [[solo? cluster-state] (if (instance? ClusterState cluster-state-spec)
-                                [false cluster-state-spec]
-                                [true (mk-distributed-cluster-state 
cluster-state-spec :auth-conf cluster-state-spec :acls acls :context context)])
-        assignment-info-callback (atom {})
-        assignment-info-with-version-callback (atom {})
-        assignment-version-callback (atom {})
-        supervisors-callback (atom nil)
-        backpressure-callback (atom {})   ;; we want to reigister a topo 
directory getChildren callback for all workers of this dir
-        assignments-callback (atom nil)
-        storm-base-callback (atom {})
-        blobstore-callback (atom nil)
-        credentials-callback (atom {})
-        log-config-callback (atom {})
-        state-id (.register
-                  cluster-state
-                  (fn [type path]
-                    (let [[subtree & args] (tokenize-path path)]
-                      (condp = subtree
-                         ASSIGNMENTS-ROOT (if (empty? args)
-                                             (issue-callback! 
assignments-callback)
-                                             (do
-                                               (issue-map-callback! 
assignment-info-callback (first args))
-                                               (issue-map-callback! 
assignment-version-callback (first args))
-                                               (issue-map-callback! 
assignment-info-with-version-callback (first args))))
-                         SUPERVISORS-ROOT (issue-callback! 
supervisors-callback)
-                         BLOBSTORE-ROOT (issue-callback! blobstore-callback) 
;; callback register for blobstore
-                         STORMS-ROOT (issue-map-callback! storm-base-callback 
(first args))
-                         CREDENTIALS-ROOT (issue-map-callback! 
credentials-callback (first args))
-                         LOGCONFIG-ROOT (issue-map-callback! 
log-config-callback (first args))
-                         BACKPRESSURE-ROOT (issue-map-callback! 
backpressure-callback (first args))
-                         ;; this should never happen
-                         (exit-process! 30 "Unknown callback for subtree " 
subtree args)))))]
-    (doseq [p [ASSIGNMENTS-SUBTREE STORMS-SUBTREE SUPERVISORS-SUBTREE 
WORKERBEATS-SUBTREE ERRORS-SUBTREE BLOBSTORE-SUBTREE NIMBUSES-SUBTREE
-               LOGCONFIG-SUBTREE]]
-      (.mkdirs cluster-state p acls))
-    (reify
-      StormClusterState
-
-      (assignments
-        [this callback]
-        (when callback
-          (reset! assignments-callback callback))
-        (.get_children cluster-state ASSIGNMENTS-SUBTREE (not-nil? callback)))
-
-      (assignment-info
-        [this storm-id callback]
-        (when callback
-          (swap! assignment-info-callback assoc storm-id callback))
-        (clojurify-assignment (maybe-deserialize (.get_data cluster-state 
(assignment-path storm-id) (not-nil? callback)) Assignment)))
-
-      (assignment-info-with-version 
-        [this storm-id callback]
-        (when callback
-          (swap! assignment-info-with-version-callback assoc storm-id 
callback))
-        (let [{data :data version :version} 
-              (.get_data_with_version cluster-state (assignment-path storm-id) 
(not-nil? callback))]
-        {:data (clojurify-assignment (maybe-deserialize data Assignment))
-         :version version}))
-
-      (assignment-version 
-        [this storm-id callback]
-        (when callback
-          (swap! assignment-version-callback assoc storm-id callback))
-        (.get_version cluster-state (assignment-path storm-id) (not-nil? 
callback)))
-
-      ;; blobstore state
-      (blobstore
-        [this callback]
-        (when callback
-          (reset! blobstore-callback callback))
-        (.sync_path cluster-state BLOBSTORE-SUBTREE)
-        (.get_children cluster-state BLOBSTORE-SUBTREE (not-nil? callback)))
-
-      (nimbuses
-        [this]
-        (map #(maybe-deserialize (.get_data cluster-state (nimbus-path %1) 
false) NimbusSummary)
-          (.get_children cluster-state NIMBUSES-SUBTREE false)))
-
-      (add-nimbus-host!
-        [this nimbus-id nimbus-summary]
-        ;explicit delete for ephmeral node to ensure this session creates the 
entry.
-        (.delete_node cluster-state (nimbus-path nimbus-id))
-
-        (.add_listener cluster-state (reify ClusterStateListener
-                        (^void stateChanged[this ^ConnectionState newState]
-                          (log-message "Connection state listener invoked, 
zookeeper connection state has changed to " newState)
-                          (if (.equals newState ConnectionState/RECONNECTED)
-                            (do
-                              (log-message "Connection state has changed to 
reconnected so setting nimbuses entry one more time")
-                              (.set_ephemeral_node cluster-state (nimbus-path 
nimbus-id) (Utils/serialize nimbus-summary) acls))))))
-        
-        (.set_ephemeral_node cluster-state (nimbus-path nimbus-id) 
(Utils/serialize nimbus-summary) acls))
-
-      (setup-blobstore!
-        [this key nimbusInfo versionInfo]
-        (let [path (str (blobstore-path key) "/" (.toHostPortString 
nimbusInfo) "-" versionInfo)]
-          (log-message "setup-path: " path)
-          (.mkdirs cluster-state (blobstore-path key) acls)
-          ;we delete the node first to ensure the node gets created as part of 
this session only.
-          (.delete_node_blobstore cluster-state (str (blobstore-path key)) 
(.toHostPortString nimbusInfo))
-          (.set_ephemeral_node cluster-state path nil acls)))
-
-      (blobstore-info
-        [this blob-key]
-        (let [path (blobstore-path blob-key)]
-          (.sync_path cluster-state path)
-          (.get_children cluster-state path false)))
-
-      (active-storms
-        [this]
-        (.get_children cluster-state STORMS-SUBTREE false))
-
-      (active-keys
-        [this]
-        (.get_children cluster-state BLOBSTORE-SUBTREE false))
-
-      (heartbeat-storms
-        [this]
-        (.get_worker_hb_children cluster-state WORKERBEATS-SUBTREE false))
-
-      (error-topologies
-        [this]
-        (.get_children cluster-state ERRORS-SUBTREE false))
-
-      (get-worker-heartbeat
-        [this storm-id node port]
-        (let [worker-hb (.get_worker_hb cluster-state (workerbeat-path 
storm-id node port) false)]
-          (if worker-hb
-            (-> worker-hb
-              (maybe-deserialize ClusterWorkerHeartbeat)
-              clojurify-zk-worker-hb))))
-
-      (executor-beats
-        [this storm-id executor->node+port]
-        ;; need to take executor->node+port in explicitly so that we don't run 
into a situation where a
-        ;; long dead worker with a skewed clock overrides all the timestamps. 
By only checking heartbeats
-        ;; with an assigned node+port, and only reading executors from that 
heartbeat that are actually assigned,
-        ;; we avoid situations like that
-        (let [node+port->executors (reverse-map executor->node+port)
-              all-heartbeats (for [[[node port] executors] 
node+port->executors]
-                               (->> (get-worker-heartbeat this storm-id node 
port)
-                                    (convert-executor-beats executors)
-                                    ))]
-          (apply merge all-heartbeats)))
-
-      (supervisors
-        [this callback]
-        (when callback
-          (reset! supervisors-callback callback))
-        (.get_children cluster-state SUPERVISORS-SUBTREE (not-nil? callback)))
-
-      (supervisor-info
-        [this supervisor-id]
-        (clojurify-supervisor-info (maybe-deserialize (.get_data cluster-state 
(supervisor-path supervisor-id) false) SupervisorInfo)))
-
-      (topology-log-config
-        [this storm-id cb]
-        (when cb
-          (swap! log-config-callback assoc storm-id cb))
-        (maybe-deserialize (.get_data cluster-state (log-config-path storm-id) 
(not-nil? cb)) LogConfig))
-
-      (set-topology-log-config!
-        [this storm-id log-config]
-        (.set_data cluster-state (log-config-path storm-id) (Utils/serialize 
log-config) acls))
-
-      (set-worker-profile-request
-        [this storm-id profile-request]
-        (let [request-type (.get_action profile-request)
-              host (.get_node (.get_nodeInfo profile-request))
-              port (first (.get_port (.get_nodeInfo profile-request)))]
-          (.set_data cluster-state
-                     (profiler-config-path storm-id host port request-type)
-                     (Utils/serialize profile-request)
-                     acls)))
-
-      (get-topology-profile-requests
-        [this storm-id thrift?]
-        (let [path (profiler-config-path storm-id)
-              requests (if (.node_exists cluster-state path false)
-                         (dofor [c (.get_children cluster-state path false)]
-                                (let [raw (.get_data cluster-state (str path 
"/" c) false)
-                                      request (maybe-deserialize raw 
ProfileRequest)]
-                                      (if thrift?
-                                        request
-                                        (clojurify-profile-request 
request)))))]
-          requests))
-
-      (delete-topology-profile-requests
-        [this storm-id profile-request]
-        (let [profile-request-inst (thriftify-profile-request profile-request)
-              action (:action profile-request)
-              host (:host profile-request)
-              port (:port profile-request)]
-          (.delete_node cluster-state
-           (profiler-config-path storm-id host port action))))
-          
-      (get-worker-profile-requests
-        [this storm-id node-info thrift?]
-        (let [host (:host node-info)
-              port (:port node-info)
-              profile-requests (get-topology-profile-requests this storm-id 
thrift?)]
-          (if thrift?
-            (filter #(and (= host (.get_node (.get_nodeInfo %))) (= port 
(first (.get_port (.get_nodeInfo  %)))))
-                    profile-requests)
-            (filter #(and (= host (:host %)) (= port (:port %)))
-                    profile-requests))))
-      
-      (worker-heartbeat!
-        [this storm-id node port info]
-        (let [thrift-worker-hb (thriftify-zk-worker-hb info)]
-          (if thrift-worker-hb
-            (.set_worker_hb cluster-state (workerbeat-path storm-id node port) 
(Utils/serialize thrift-worker-hb) acls))))
-
-      (remove-worker-heartbeat!
-        [this storm-id node port]
-        (.delete_worker_hb cluster-state (workerbeat-path storm-id node port)))
-
-      (setup-heartbeats!
-        [this storm-id]
-        (.mkdirs cluster-state (workerbeat-storm-root storm-id) acls))
-
-      (teardown-heartbeats!
-        [this storm-id]
-        (try-cause
-          (.delete_worker_hb cluster-state (workerbeat-storm-root storm-id))
-          (catch KeeperException e
-            (log-warn-error e "Could not teardown heartbeats for " storm-id))))
-
-      (worker-backpressure!
-        [this storm-id node port on?]
-        "if znode exists and to be not on?, delete; if exists and on?, do 
nothing;
-        if not exists and to be on?, create; if not exists and not on?, do 
nothing"
-        (let [path (backpressure-path storm-id node port)
-              existed (.node_exists cluster-state path false)]
-          (if existed
-            (if (not on?)
-              (.delete_node cluster-state path))   ;; delete the znode since 
the worker is not congested
-            (if on?
-              (.set_ephemeral_node cluster-state path nil acls))))) ;; create 
the znode since worker is congested
-    
-      (topology-backpressure
-        [this storm-id callback]
-        "if the backpresure/storm-id dir is empty, this topology has 
throttle-on, otherwise not."
-        (when callback
-          (swap! backpressure-callback assoc storm-id callback))
-        (let [path (backpressure-storm-root storm-id)
-              children (.get_children cluster-state path (not-nil? callback))]
-              (> (count children) 0)))
-      
-      (setup-backpressure!
-        [this storm-id]
-        (.mkdirs cluster-state (backpressure-storm-root storm-id) acls))
-
-      (remove-worker-backpressure!
-        [this storm-id node port]
-        (.delete_node cluster-state (backpressure-path storm-id node port)))
-
-      (teardown-topology-errors!
-        [this storm-id]
-        (try-cause
-          (.delete_node cluster-state (error-storm-root storm-id))
-          (catch KeeperException e
-            (log-warn-error e "Could not teardown errors for " storm-id))))
-
-      (supervisor-heartbeat!
-        [this supervisor-id info]
-        (let [thrift-supervisor-info (thriftify-supervisor-info info)]
-          (.set_ephemeral_node cluster-state (supervisor-path supervisor-id) 
(Utils/serialize thrift-supervisor-info) acls)))
-
-      (activate-storm!
-        [this storm-id storm-base]
-        (let [thrift-storm-base (thriftify-storm-base storm-base)]
-          (.set_data cluster-state (storm-path storm-id) (Utils/serialize 
thrift-storm-base) acls)))
-
-      (update-storm!
-        [this storm-id new-elems]
-        (let [base (storm-base this storm-id nil)
-              executors (:component->executors base)
-              component->debug (:component->debug base)
-              new-elems (update new-elems :component->executors (partial merge 
executors))
-              new-elems (update new-elems :component->debug (partial 
merge-with merge component->debug))]
-          (.set_data cluster-state (storm-path storm-id)
-                    (-> base
-                        (merge new-elems)
-                        thriftify-storm-base
-                        Utils/serialize)
-                    acls)))
-
-      (storm-base
-        [this storm-id callback]
-        (when callback
-          (swap! storm-base-callback assoc storm-id callback))
-        (clojurify-storm-base (maybe-deserialize (.get_data cluster-state 
(storm-path storm-id) (not-nil? callback)) StormBase)))
-
-      (remove-storm-base!
-        [this storm-id]
-        (.delete_node cluster-state (storm-path storm-id)))
-
-      (set-assignment!
-        [this storm-id info]
-        (let [thrift-assignment (thriftify-assignment info)]
-          (.set_data cluster-state (assignment-path storm-id) (Utils/serialize 
thrift-assignment) acls)))
-
-      (remove-blobstore-key!
-        [this blob-key]
-        (log-debug "removing key" blob-key)
-        (.delete_node cluster-state (blobstore-path blob-key)))
-
-      (remove-key-version!
-        [this blob-key]
-        (.delete_node cluster-state (blobstore-max-key-sequence-number-path 
blob-key)))
-
-      (remove-storm!
-        [this storm-id]
-        (.delete_node cluster-state (assignment-path storm-id))
-        (.delete_node cluster-state (credentials-path storm-id))
-        (.delete_node cluster-state (log-config-path storm-id))
-        (.delete_node cluster-state (profiler-config-path storm-id))
-        (remove-storm-base! this storm-id))
-
-      (set-credentials!
-         [this storm-id creds topo-conf]
-         (let [topo-acls (mk-topo-only-acls topo-conf)
-               path (credentials-path storm-id)
-               thriftified-creds (thriftify-credentials creds)]
-           (.set_data cluster-state path (Utils/serialize thriftified-creds) 
topo-acls)))
-
-      (credentials
-        [this storm-id callback]
-        (when callback
-          (swap! credentials-callback assoc storm-id callback))
-        (clojurify-crdentials (maybe-deserialize (.get_data cluster-state 
(credentials-path storm-id) (not-nil? callback)) Credentials)))
-
-      (report-error
-         [this storm-id component-id node port error]
-         (let [path (error-path storm-id component-id)
-               last-error-path (last-error-path storm-id component-id)
-               data (thriftify-error {:time-secs (current-time-secs) :error 
(stringify-error error) :host node :port port})
-               _ (.mkdirs cluster-state path acls)
-               ser-data (Utils/serialize data)
-               _ (.mkdirs cluster-state path acls)
-               _ (.create_sequential cluster-state (str path "/e") ser-data 
acls)
-               _ (.set_data cluster-state last-error-path ser-data acls)
-               to-kill (->> (.get_children cluster-state path false)
-                            (sort-by parse-error-path)
-                            reverse
-                            (drop 10))]
-           (doseq [k to-kill]
-             (.delete_node cluster-state (str path "/" k)))))
-
-      (errors
-         [this storm-id component-id]
-         (let [path (error-path storm-id component-id)
-               errors (if (.node_exists cluster-state path false)
-                        (dofor [c (.get_children cluster-state path false)]
-                          (if-let [data (-> (.get_data cluster-state
-                                                      (str path "/" c)
-                                                      false)
-                                          (maybe-deserialize ErrorInfo)
-                                          clojurify-error)]
-                            (map->TaskError data)))
-                        ())]
-           (->> (filter not-nil? errors)
-                (sort-by (comp - :time-secs)))))
-
-      (last-error
-        [this storm-id component-id]
-        (let [path (last-error-path storm-id component-id)]
-          (if (.node_exists cluster-state path false)
-            (if-let [data (-> (.get_data cluster-state path false)
-                              (maybe-deserialize ErrorInfo)
-                              clojurify-error)]
-              (map->TaskError data)))))
-      
-      (disconnect
-         [this]
-        (.unregister cluster-state state-id)
-        (when solo?
-          (.close cluster-state))))))
-
-;; daemons have a single thread that will respond to events
-;; start with initialize event
-;; callbacks add events to the thread's queue
-
-;; keeps in memory cache of the state, only for what client subscribes to. Any 
subscription is automatically kept in sync, and when there are changes, client 
is notified.
-;; master gives orders through state, and client records status in state 
(ephemerally)
-
-;; master tells nodes what workers to launch
-
-;; master writes this. supervisors and workers subscribe to this to understand 
complete topology. each storm is a map from nodes to workers to tasks to ports 
whenever topology changes everyone will be notified
-;; master includes timestamp of each assignment so that appropriate time can 
be given to each worker to start up
-;; /assignments/{storm id}
-
-;; which tasks they talk to, etc. (immutable until shutdown)
-;; everyone reads this in full to understand structure
-;; /tasks/{storm id}/{task id} ; just contains bolt id
-
-;; supervisors send heartbeats here, master doesn't subscribe but checks 
asynchronously
-;; /supervisors/status/{ephemeral node ids}  ;; node metadata such as port 
ranges are kept here
-
-;; tasks send heartbeats here, master doesn't subscribe, just checks 
asynchronously
-;; /taskbeats/{storm id}/{ephemeral task id}
-
-;; contains data about whether it's started or not, tasks and workers 
subscribe to specific storm here to know when to shutdown
-;; master manipulates
-;; /storms/{storm id}
-
-;; Zookeeper flows:
-
-;; Master:
-;; job submit:
-;; 1. read which nodes are available
-;; 2. set up the worker/{storm}/{task} stuff (static)
-;; 3. set assignments
-;; 4. start storm - necessary in case master goes down, when goes back up can 
remember to take down the storm (2 states: on or off)
-
-;; Monitoring (or by checking when nodes go down or heartbeats aren't 
received):
-;; 1. read assignment
-;; 2. see which tasks/nodes are up
-;; 3. make new assignment to fix any problems
-;; 4. if a storm exists but is not taken down fully, ensure that storm 
takedown is launched (step by step remove tasks and finally remove assignments)
-
-;; masters only possible watches is on ephemeral nodes and tasks, and maybe 
not even
-
-;; Supervisor:
-;; 1. monitor /storms/* and assignments
-;; 2. local state about which workers are local
-;; 3. when storm is on, check that workers are running locally & start/kill if 
different than assignments
-;; 4. when storm is off, monitor tasks for workers - when they all die or 
don't hearbeat, kill the process and cleanup
-
-;; Worker:
-;; 1. On startup, start the tasks if the storm is on
-
-;; Task:
-;; 1. monitor assignments, reroute when assignments change
-;; 2. monitor storm (when storm turns off, error if assignments change) - take 
down tasks as master turns them off
-
-;; locally on supervisor: workers write pids locally on startup, supervisor 
deletes it on shutdown (associates pid with worker name)
-;; supervisor periodically checks to make sure processes are alive
-;; {rootdir}/workers/{storm id}/{worker id}   ;; contains pid inside
-
-;; all tasks in a worker share the same cluster state
-;; workers, supervisors, and tasks subscribes to storm to know when it's 
started or stopped
-;; on stopped, master removes records in order (tasks need to subscribe to 
themselves to see if they disappear)
-;; when a master removes a worker, the supervisor should kill it (and escalate 
to kill -9)
-;; on shutdown, tasks subscribe to tasks that send data to them to wait for 
them to die. when node disappears, they can die

http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
----------------------------------------------------------------------
diff --git 
a/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj 
b/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
deleted file mode 100644
index dcfa8d8..0000000
--- 
a/storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
+++ /dev/null
@@ -1,163 +0,0 @@
-;; Licensed to the Apache Software Foundation (ASF) under one
-;; or more contributor license agreements.  See the NOTICE file
-;; distributed with this work for additional information
-;; regarding copyright ownership.  The ASF licenses this file
-;; to you under the Apache License, Version 2.0 (the
-;; "License"); you may not use this file except in compliance
-;; with the License.  You may obtain a copy of the License at
-;;
-;; http://www.apache.org/licenses/LICENSE-2.0
-;;
-;; Unless required by applicable law or agreed to in writing, software
-;; distributed under the License is distributed on an "AS IS" BASIS,
-;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-;; See the License for the specific language governing permissions and
-;; limitations under the License.
-
-(ns org.apache.storm.cluster-state.zookeeper-state-factory
-  (:import [org.apache.curator.framework.state ConnectionStateListener]
-           [org.apache.storm.zookeeper Zookeeper])
-  (:import [org.apache.zookeeper KeeperException$NoNodeException CreateMode
-             Watcher$Event$EventType Watcher$Event$KeeperState]
-           [org.apache.storm.cluster ClusterState DaemonType])
-  (:use [org.apache.storm cluster config log util])
-  (:require [org.apache.storm [zookeeper :as zk]])
-  (:gen-class
-   :implements [org.apache.storm.cluster.ClusterStateFactory]))
-
-(defn -mkState [this conf auth-conf acls context]
-  (let [zk (zk/mk-client conf (conf STORM-ZOOKEEPER-SERVERS) (conf 
STORM-ZOOKEEPER-PORT) :auth-conf auth-conf)]
-    (Zookeeper/mkdirs zk (conf STORM-ZOOKEEPER-ROOT) acls)
-    (.close zk))
-  (let [callbacks (atom {})
-        active (atom true)
-        zk-writer (zk/mk-client conf
-                         (conf STORM-ZOOKEEPER-SERVERS)
-                         (conf STORM-ZOOKEEPER-PORT)
-                         :auth-conf auth-conf
-                         :root (conf STORM-ZOOKEEPER-ROOT)
-                         :watcher (fn [state type path]
-                                    (when @active
-                                      (when-not (= 
Watcher$Event$KeeperState/SyncConnected state)
-                                        (log-warn "Received event " state ":" 
type ":" path " with disconnected Writer Zookeeper."))
-                                      (when-not (= 
Watcher$Event$EventType/None type)
-                                        (doseq [callback (vals @callbacks)]
-                                          (callback type path))))))
-        is-nimbus? (= (.getDaemonType context) DaemonType/NIMBUS)
-        zk-reader (if is-nimbus?
-                    (zk/mk-client conf
-                         (conf STORM-ZOOKEEPER-SERVERS)
-                         (conf STORM-ZOOKEEPER-PORT)
-                         :auth-conf auth-conf
-                         :root (conf STORM-ZOOKEEPER-ROOT)
-                         :watcher (fn [state type path]
-                                    (when @active
-                                      (when-not (= 
Watcher$Event$KeeperState/SyncConnected state)
-                                        (log-warn "Received event " state ":" 
type ":" path " with disconnected Reader Zookeeper."))
-                                      (when-not (= 
Watcher$Event$EventType/None type)
-                                        (doseq [callback (vals @callbacks)]
-                                          (callback type path))))))
-                    zk-writer)]
-    (reify
-     ClusterState
-
-     (register
-       [this callback]
-       (let [id (uuid)]
-         (swap! callbacks assoc id callback)
-         id))
-
-     (unregister
-       [this id]
-       (swap! callbacks dissoc id))
-
-     (set-ephemeral-node
-       [this path data acls]
-       (Zookeeper/mkdirs zk-writer (parent-path path) acls)
-       (if (Zookeeper/exists zk-writer path false)
-         (try-cause
-           (Zookeeper/setData zk-writer path data) ; should verify that it's 
ephemeral
-           (catch KeeperException$NoNodeException e
-             (log-warn-error e "Ephemeral node disappeared between checking 
for existing and setting data")
-             (Zookeeper/createNode zk-writer path data CreateMode/EPHEMERAL 
acls)))
-         (Zookeeper/createNode zk-writer path data CreateMode/EPHEMERAL acls)))
-
-     (create-sequential
-       [this path data acls]
-       (Zookeeper/createNode zk-writer path data 
CreateMode/PERSISTENT_SEQUENTIAL acls))
-
-     (set-data
-       [this path data acls]
-       ;; note: this does not turn off any existing watches
-       (if (Zookeeper/exists zk-writer path false)
-         (Zookeeper/setData zk-writer path data)
-         (do
-           (Zookeeper/mkdirs zk-writer (parent-path path) acls)
-           (Zookeeper/createNode zk-writer path data CreateMode/PERSISTENT 
acls))))
-
-     (set-worker-hb
-       [this path data acls]
-       (.set_data this path data acls))
-
-     (delete-node
-       [this path]
-       (Zookeeper/deleteNode zk-writer path))
-
-     (delete-worker-hb
-       [this path]
-       (.delete_node this path))
-
-     (get-data
-       [this path watch?]
-       (Zookeeper/getData zk-reader path watch?))
-
-     (get-data-with-version
-       [this path watch?]
-       (Zookeeper/getDataWithVersion zk-reader path watch?))
-
-     (get-version
-       [this path watch?]
-       (Zookeeper/getVersion zk-reader path watch?))
-
-     (get-worker-hb
-       [this path watch?]
-       (.get_data this path watch?))
-
-     (get-children
-       [this path watch?]
-       (Zookeeper/getChildren zk-reader path watch?))
-
-     (get-worker-hb-children
-       [this path watch?]
-       (.get_children this path watch?))
-
-     (mkdirs
-       [this path acls]
-       (Zookeeper/mkdirs zk-writer path acls))
-
-     (node-exists
-       [this path watch?]
-       (Zookeeper/existsNode zk-reader path watch?))
-
-     (add-listener
-       [this listener]
-       (let [curator-listener (reify ConnectionStateListener
-                                (stateChanged
-                                  [this client newState]
-                                  (.stateChanged listener client newState)))]
-         (Zookeeper/addListener zk-reader curator-listener)))
-
-     (sync-path
-       [this path]
-       (Zookeeper/syncPath zk-writer path))
-
-      (delete-node-blobstore
-        [this path nimbus-host-port-info]
-        (Zookeeper/deleteNodeBlobstore zk-writer path nimbus-host-port-info))
-
-     (close
-       [this]
-       (reset! active false)
-       (.close zk-writer)
-       (if is-nimbus?
-         (.close zk-reader))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj 
b/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
index ef9ecbb..7be526d 100644
--- a/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
+++ b/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
@@ -14,7 +14,7 @@
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
 (ns org.apache.storm.command.dev-zookeeper
-  (:use [org.apache.storm zookeeper util config])
+  (:use [org.apache.storm util config])
   (:import [org.apache.storm.utils ConfigUtils])
   (:import [org.apache.storm.zookeeper Zookeeper])
   (:gen-class))

http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/storm-core/src/clj/org/apache/storm/command/heartbeats.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/heartbeats.clj 
b/storm-core/src/clj/org/apache/storm/command/heartbeats.clj
index be8d030..954042f 100644
--- a/storm-core/src/clj/org/apache/storm/command/heartbeats.clj
+++ b/storm-core/src/clj/org/apache/storm/command/heartbeats.clj
@@ -18,16 +18,16 @@
              [config :refer :all]
              [log :refer :all]
              [util :refer :all]
-             [cluster :refer :all]
              [converter :refer :all]]
             [clojure.string :as string])
   (:import [org.apache.storm.generated ClusterWorkerHeartbeat]
-           [org.apache.storm.utils Utils ConfigUtils])
+           [org.apache.storm.utils Utils ConfigUtils]
+           [org.apache.storm.cluster DistributedClusterState 
ClusterStateContext])
   (:gen-class))
 
 (defn -main [command path & args]
   (let [conf (clojurify-structure (ConfigUtils/readStormConfig))
-        cluster (mk-distributed-cluster-state conf :auth-conf conf)]
+        cluster (DistributedClusterState. conf conf nil 
(ClusterStateContext.))]
     (println "Command: [" command "]")
     (condp = command
       "list"

http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj 
b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
index 8a5eb21..3978d2f 100644
--- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@ -16,7 +16,7 @@
 (ns org.apache.storm.command.shell-submission
   (:import [org.apache.storm StormSubmitter]
            [org.apache.storm.zookeeper Zookeeper])
-  (:use [org.apache.storm thrift util config log zookeeper])
+  (:use [org.apache.storm thrift util config log])
   (:require [clojure.string :as str])
   (:import [org.apache.storm.utils ConfigUtils])
   (:gen-class))

http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/storm-core/src/clj/org/apache/storm/converter.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/converter.clj 
b/storm-core/src/clj/org/apache/storm/converter.clj
index bb2dc87..d169301 100644
--- a/storm-core/src/clj/org/apache/storm/converter.clj
+++ b/storm-core/src/clj/org/apache/storm/converter.clj
@@ -181,9 +181,9 @@
 (defn thriftify-storm-base [storm-base]
   (doto (StormBase.)
     (.set_name (:storm-name storm-base))
-    (.set_launch_time_secs (int (:launch-time-secs storm-base)))
+    (.set_launch_time_secs (if (:launch-time-secs storm-base) (int 
(:launch-time-secs storm-base)) 0))
     (.set_status (convert-to-status-from-symbol (:status storm-base)))
-    (.set_num_workers (int (:num-workers storm-base)))
+    (.set_num_workers (if (:num-workers storm-base) (int (:num-workers 
storm-base)) 0))
     (.set_component_executors (map-val int (:component->executors storm-base)))
     (.set_owner (:owner storm-base))
     (.set_topology_action_options (thriftify-topology-action-options 
storm-base))
@@ -234,16 +234,6 @@
       (.set_executor_stats (thriftify-stats (filter second (:executor-stats 
worker-hb))))
       (.set_time_secs (:time-secs worker-hb)))))
 
-(defn clojurify-error [^ErrorInfo error]
-  (if error
-    {
-      :error (.get_error error)
-      :time-secs (.get_error_time_secs error)
-      :host (.get_host error)
-      :port (.get_port error)
-      }
-    ))
-
 (defn thriftify-error [error]
   (doto (ErrorInfo. (:error error) (:time-secs error))
     (.set_host (:host error))

http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/common.clj 
b/storm-core/src/clj/org/apache/storm/daemon/common.clj
index 6c184fd..c9534f4 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@ -13,14 +13,16 @@
 ;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 ;; See the License for the specific language governing permissions and
 ;; limitations under the License.
+;TopologyActionOptions TopologyStatus StormBase RebalanceOptions KillOptions
 (ns org.apache.storm.daemon.common
   (:use [org.apache.storm log config util])
-  (:import [org.apache.storm.generated StormTopology
+  (:import [org.apache.storm.generated StormTopology NodeInfo
             InvalidTopologyException GlobalStreamId]
            [org.apache.storm.utils ThriftTopologyUtils])
   (:import [org.apache.storm.utils Utils ConfigUtils])
   (:import [org.apache.storm.task WorkerTopologyContext])
   (:import [org.apache.storm Constants])
+  (:import [org.apache.storm.cluster StormZkClusterState])
   (:import [org.apache.storm.metric SystemBolt])
   (:import [org.apache.storm.metric EventLoggerBolt])
   (:import [org.apache.storm.security.auth IAuthorizer]) 
@@ -72,18 +74,19 @@
 (defn new-executor-stats []
   (ExecutorStats. 0 0 0 0 0))
 
+
 (defn get-storm-id [storm-cluster-state storm-name]
-  (let [active-storms (.active-storms storm-cluster-state)]
+  (let [active-storms (.activeStorms storm-cluster-state)]
     (find-first
-      #(= storm-name (:storm-name (.storm-base storm-cluster-state % nil)))
+      #(= storm-name (.get_name (.stormBase storm-cluster-state % nil)))
       active-storms)
     ))
 
 (defn topology-bases [storm-cluster-state]
-  (let [active-topologies (.active-storms storm-cluster-state)]
+  (let [active-topologies (.activeStorms storm-cluster-state)]
     (into {} 
           (dofor [id active-topologies]
-                 [id (.storm-base storm-cluster-state id nil)]
+                 [id  (.stormBase storm-cluster-state id nil)]
                  ))
     ))
 

http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj 
b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 82d56a9..e50e150 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@ -34,11 +34,10 @@
   (:import [org.apache.storm.daemon Shutdownable])
   (:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo 
IMetricsConsumer$DataPoint StateMetric])
   (:import [org.apache.storm Config Constants])
-  (:import [org.apache.storm.cluster ClusterStateContext DaemonType])
+  (:import [org.apache.storm.cluster ClusterStateContext DaemonType 
StormZkClusterState Cluster])
   (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping 
LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
   (:import [java.util.concurrent ConcurrentLinkedQueue])
-  (:require [org.apache.storm [thrift :as thrift]
-             [cluster :as cluster] [disruptor :as disruptor] [stats :as 
stats]])
+  (:require [org.apache.storm [thrift :as thrift] [disruptor :as disruptor] 
[stats :as stats]])
   (:require [org.apache.storm.daemon [task :as task]])
   (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics])
   (:require [clojure.set :as set]))
@@ -207,7 +206,7 @@
       (swap! interval-errors inc)
 
       (when (<= @interval-errors max-per-interval)
-        (cluster/report-error (:storm-cluster-state executor) (:storm-id 
executor) (:component-id executor)
+        (.reportError (:storm-cluster-state executor) (:storm-id executor) 
(:component-id executor)
                               (hostname storm-conf)
                               (.getThisWorkerPort (:worker-context executor)) 
error)
         ))))
@@ -252,9 +251,8 @@
      :batch-transfer-queue batch-transfer->worker
      :transfer-fn (mk-executor-transfer-fn batch-transfer->worker storm-conf)
      :suicide-fn (:suicide-fn worker)
-     :storm-cluster-state (cluster/mk-storm-cluster-state (:cluster-state 
worker) 
-                                                          :acls 
(Utils/getWorkerACL storm-conf)
-                                                          :context 
(ClusterStateContext. DaemonType/WORKER))
+     :storm-cluster-state (StormZkClusterState. (:cluster-state worker) 
(Utils/getWorkerACL storm-conf)
+                            (ClusterStateContext. DaemonType/WORKER))
      :type executor-type
      ;; TODO: should refactor this to be part of the executor specific map 
(spout or bolt with :common field)
      :stats (mk-executor-stats <> (ConfigUtils/samplingRate storm-conf))

http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj 
b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index de5a14e..9b00df3 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@ -40,7 +40,7 @@
   (:import [org.apache.storm.nimbus NimbusInfo])
   (:import [org.apache.storm.utils TimeCacheMap TimeCacheMap$ExpiredCallback 
Utils ConfigUtils TupleUtils ThriftTopologyUtils
             BufferFileInputStream BufferInputStream])
-  (:import [org.apache.storm.generated NotAliveException AlreadyAliveException 
StormTopology ErrorInfo
+  (:import [org.apache.storm.generated NotAliveException AlreadyAliveException 
StormTopology ErrorInfo ClusterWorkerHeartbeat
             ExecutorInfo InvalidTopologyException Nimbus$Iface 
Nimbus$Processor SubmitOptions TopologyInitialStatus
             KillOptions RebalanceOptions ClusterSummary SupervisorSummary 
TopologySummary TopologyInfo TopologyHistoryInfo
             ExecutorSummary AuthorizationException GetInfoOptions 
NumErrorsChoice SettableBlobMeta ReadableBlobMeta
@@ -48,10 +48,9 @@
             ProfileRequest ProfileAction NodeInfo])
   (:import [org.apache.storm.daemon Shutdownable])
   (:import [org.apache.storm.validation ConfigValidation])
-  (:import [org.apache.storm.cluster ClusterStateContext DaemonType])
-  (:use [org.apache.storm util config log timer zookeeper local-state])
-  (:require [org.apache.storm [cluster :as cluster]
-                            [converter :as converter]
+  (:import [org.apache.storm.cluster ClusterStateContext DaemonType 
StormZkClusterState])
+  (:use [org.apache.storm util config log timer local-state converter])
+  (:require [org.apache.storm [converter :as converter]
                             [stats :as stats]])
   (:require [clojure.set :as set])
   (:import [org.apache.storm.daemon.common StormBase Assignment])
@@ -174,11 +173,11 @@
      :authorization-handler (mk-authorization-handler (conf NIMBUS-AUTHORIZER) 
conf)
      :impersonation-authorization-handler (mk-authorization-handler (conf 
NIMBUS-IMPERSONATION-AUTHORIZER) conf)
      :submitted-count (atom 0)
-     :storm-cluster-state (cluster/mk-storm-cluster-state conf :acls (when
+     :storm-cluster-state (StormZkClusterState. conf  (when
                                                                        
(Utils/isZkAuthenticationConfiguredStormServer
                                                                          conf)
                                                                        
NIMBUS-ZK-ACLS)
-                                                          :context 
(ClusterStateContext. DaemonType/NIMBUS))
+                                                          
(ClusterStateContext. DaemonType/NIMBUS))
      :submit-lock (Object.)
      :cred-update-lock (Object.)
      :log-update-lock (Object.)
@@ -275,11 +274,11 @@
 
 (defn do-rebalance [nimbus storm-id status storm-base]
   (let [rebalance-options (:topology-action-options storm-base)]
-    (.update-storm! (:storm-cluster-state nimbus)
+    (.updateStorm (:storm-cluster-state nimbus)
       storm-id
-        (-> {:topology-action-options nil}
+      (thriftify-storm-base (-> {:topology-action-options nil}
           (assoc-non-nil :component->executors (:component->executors 
rebalance-options))
-          (assoc-non-nil :num-workers (:num-workers rebalance-options)))))
+          (assoc-non-nil :num-workers (:num-workers rebalance-options))))))
   (mk-assignments nimbus :scratch-topology-id storm-id))
 
 (defn state-transitions [nimbus storm-id status storm-base]
@@ -303,12 +302,12 @@
             :kill (kill-transition nimbus storm-id)
             :remove (fn []
                       (log-message "Killing topology: " storm-id)
-                      (.remove-storm! (:storm-cluster-state nimbus)
+                      (.removeStorm (:storm-cluster-state nimbus)
                                       storm-id)
                       (when (instance? LocalFsBlobStore (:blob-store nimbus))
                         (doseq [blob-key (get-key-list-from-id (:conf nimbus) 
storm-id)]
-                          (.remove-blobstore-key! (:storm-cluster-state 
nimbus) blob-key)
-                          (.remove-key-version! (:storm-cluster-state nimbus) 
blob-key)))
+                          (.removeBlobstoreKey (:storm-cluster-state nimbus) 
blob-key)
+                          (.removeKeyVersion (:storm-cluster-state nimbus) 
blob-key)))
                       nil)
             }
    :rebalancing {:startup (fn [] (delay-event nimbus
@@ -332,7 +331,7 @@
     (locking (:submit-lock nimbus)
        (let [system-events #{:startup}
              [event & event-args] (if (keyword? event) [event] event)
-             storm-base (-> nimbus :storm-cluster-state  (.storm-base storm-id 
nil))
+             storm-base (clojurify-storm-base (-> nimbus :storm-cluster-state  
(.stormBase storm-id nil)))
              status (:status storm-base)]
          ;; handles the case where event was scheduled but topology has been 
removed
          (if-not status
@@ -362,7 +361,7 @@
                                       storm-base-updates)]
 
              (when storm-base-updates
-               (.update-storm! (:storm-cluster-state nimbus) storm-id 
storm-base-updates)))))
+               (.updateStorm (:storm-cluster-state nimbus) storm-id 
(thriftify-storm-base storm-base-updates))))))
        )))
 
 (defn transition-name! [nimbus storm-name event & args]
@@ -411,7 +410,7 @@
     (defaulted
       (apply merge-with set/union
              (for [a assignments
-                   [_ [node port]] (-> (.assignment-info storm-cluster-state a 
nil) :executor->node+port)]
+                   [_ [node port]] (-> (clojurify-assignment (.assignmentInfo 
storm-cluster-state a nil)) :executor->node+port)]
                {node #{port}}
                ))
       {})
@@ -424,7 +423,7 @@
        (into {}
              (mapcat
               (fn [id]
-                (if-let [info (.supervisor-info storm-cluster-state id)]
+                (if-let [info (clojurify-supervisor-info (.supervisorInfo 
storm-cluster-state id))]
                   [[id info]]
                   ))
               supervisor-ids))
@@ -469,13 +468,13 @@
     (when tmp-jar-location ;;in local mode there is no jar
       (.createBlob blob-store jar-key (FileInputStream. tmp-jar-location) 
(SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
       (if (instance? LocalFsBlobStore blob-store)
-        (.setup-blobstore! storm-cluster-state jar-key nimbus-host-port-info 
(get-version-for-key jar-key nimbus-host-port-info conf))))
+        (.setupBlobstore storm-cluster-state jar-key nimbus-host-port-info 
(get-version-for-key jar-key nimbus-host-port-info conf))))
     (.createBlob blob-store conf-key (Utils/toCompressedJsonConf storm-conf) 
(SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
     (if (instance? LocalFsBlobStore blob-store)
-      (.setup-blobstore! storm-cluster-state conf-key nimbus-host-port-info 
(get-version-for-key conf-key nimbus-host-port-info conf)))
+      (.setupBlobstore storm-cluster-state conf-key nimbus-host-port-info 
(get-version-for-key conf-key nimbus-host-port-info conf)))
     (.createBlob blob-store code-key (Utils/serialize topology) 
(SettableBlobMeta. BlobStoreAclHandler/DEFAULT) subject)
     (if (instance? LocalFsBlobStore blob-store)
-      (.setup-blobstore! storm-cluster-state code-key nimbus-host-port-info 
(get-version-for-key code-key nimbus-host-port-info conf)))))
+      (.setupBlobstore storm-cluster-state code-key nimbus-host-port-info 
(get-version-for-key code-key nimbus-host-port-info conf)))))
 
 (defn- read-storm-topology [storm-id blob-store]
   (Utils/deserialize
@@ -540,7 +539,7 @@
 (defn read-topology-details [nimbus storm-id]
   (let [blob-store (:blob-store nimbus)
         storm-base (or
-                     (.storm-base (:storm-cluster-state nimbus) storm-id nil)
+                     (clojurify-storm-base (.stormBase (:storm-cluster-state 
nimbus) storm-id nil))
                      (throw (NotAliveException. storm-id)))
         topology-conf (read-storm-conf-as-nimbus storm-id blob-store)
         topology (read-storm-topology-as-nimbus storm-id blob-store)
@@ -587,7 +586,12 @@
 (defn update-heartbeats! [nimbus storm-id all-executors existing-assignment]
   (log-debug "Updating heartbeats for " storm-id " " (pr-str all-executors))
   (let [storm-cluster-state (:storm-cluster-state nimbus)
-        executor-beats (.executor-beats storm-cluster-state storm-id 
(:executor->node+port existing-assignment))
+        executor-beats (let [executor-stats-java-map (.executorBeats 
storm-cluster-state storm-id (.get_executor_node_port (thriftify-assignment 
existing-assignment)))]
+                         (->> (clojurify-structure executor-stats-java-map)
+                           (map (fn [^ExecutorInfo executor-info 
^ClusterWorkerHeartbeat cluster-worker-heartbeat]
+                                  {[(.get_task_start executor-info) 
(.get_task_end executor-info)] (clojurify-zk-worker-hb 
cluster-worker-heartbeat)}))
+                         (into {})))
+
         cache (update-heartbeat-cache (@(:heartbeats-cache nimbus) storm-id)
                                       executor-beats
                                       all-executors
@@ -637,7 +641,7 @@
 (defn- compute-executors [nimbus storm-id]
   (let [conf (:conf nimbus)
         blob-store (:blob-store nimbus)
-        storm-base (.storm-base (:storm-cluster-state nimbus) storm-id nil)
+        storm-base (clojurify-storm-base (.stormBase (:storm-cluster-state 
nimbus) storm-id nil))
         component->executors (:component->executors storm-base)
         storm-conf (read-storm-conf-as-nimbus storm-id blob-store)
         topology (read-storm-topology-as-nimbus storm-id blob-store)
@@ -897,7 +901,7 @@
         storm-cluster-state (:storm-cluster-state nimbus)
         ^INimbus inimbus (:inimbus nimbus)
         ;; read all the topologies
-        topology-ids (.active-storms storm-cluster-state)
+        topology-ids (.activeStorms storm-cluster-state)
         topologies (into {} (for [tid topology-ids]
                               {tid (read-topology-details nimbus tid)}))
         topologies (Topologies. topologies)
@@ -908,7 +912,7 @@
                                         ;; we exclude its assignment, meaning 
that all the slots occupied by its assignment
                                         ;; will be treated as free slot in the 
scheduler code.
                                         (when (or (nil? scratch-topology-id) 
(not= tid scratch-topology-id))
-                                          {tid (.assignment-info 
storm-cluster-state tid nil)})))
+                                          {tid (clojurify-assignment 
(.assignmentInfo storm-cluster-state tid nil))})))
         ;; make the new assignments for topologies
         new-scheduler-assignments (compute-new-scheduler-assignments
                                        nimbus
@@ -957,7 +961,7 @@
         (log-debug "Assignment for " topology-id " hasn't changed")
         (do
           (log-message "Setting new assignment for topology id " topology-id 
": " (pr-str assignment))
-          (.set-assignment! storm-cluster-state topology-id assignment)
+          (.setAssignment storm-cluster-state topology-id 
(thriftify-assignment assignment))
           )))
     (->> new-assignments
           (map (fn [[topology-id assignment]]
@@ -984,9 +988,9 @@
         topology (system-topology! storm-conf (read-storm-topology storm-id 
blob-store))
         num-executors (->> (all-components topology) (map-val 
num-start-executors))]
     (log-message "Activating " storm-name ": " storm-id)
-    (.activate-storm! storm-cluster-state
+    (.activateStorm storm-cluster-state
                       storm-id
-                      (StormBase. storm-name
+      (thriftify-storm-base (StormBase. storm-name
                                   (current-time-secs)
                                   {:type topology-initial-status}
                                   (storm-conf TOPOLOGY-WORKERS)
@@ -994,7 +998,7 @@
                                   (storm-conf TOPOLOGY-SUBMITTER-USER)
                                   nil
                                   nil
-                                  {}))
+                                  {})))
     (notify-topology-action-listener nimbus storm-name "activate")))
 
 ;; Master:
@@ -1046,10 +1050,10 @@
     (set (.filterAndListKeys blob-store to-id))))
 
 (defn cleanup-storm-ids [conf storm-cluster-state blob-store]
-  (let [heartbeat-ids (set (.heartbeat-storms storm-cluster-state))
-        error-ids (set (.error-topologies storm-cluster-state))
+  (let [heartbeat-ids (set (.heartbeatStorms storm-cluster-state))
+        error-ids (set (.errorTopologies storm-cluster-state))
         code-ids (code-ids blob-store)
-        assigned-ids (set (.active-storms storm-cluster-state))]
+        assigned-ids (set (.activeStorms storm-cluster-state))]
     (set/difference (set/union heartbeat-ids error-ids code-ids) assigned-ids)
     ))
 
@@ -1113,7 +1117,7 @@
   (try
     (.deleteBlob blob-store key nimbus-subject)
     (if (instance? LocalFsBlobStore blob-store)
-      (.remove-blobstore-key! storm-cluster-state key))
+      (.removeBlobstoreKey storm-cluster-state key))
     (catch Exception e
       (log-message "Exception" e))))
 
@@ -1133,8 +1137,8 @@
         (when-not (empty? to-cleanup-ids)
           (doseq [id to-cleanup-ids]
             (log-message "Cleaning up " id)
-            (.teardown-heartbeats! storm-cluster-state id)
-            (.teardown-topology-errors! storm-cluster-state id)
+            (.teardownHeartbeats storm-cluster-state id)
+            (.teardownTopologyErrors storm-cluster-state id)
             (rmr (ConfigUtils/masterStormDistRoot conf id))
             (blob-rm-topology-keys id blob-store storm-cluster-state)
             (swap! (:heartbeats-cache nimbus) dissoc id)))))
@@ -1169,21 +1173,21 @@
   (let [storm-cluster-state (:storm-cluster-state nimbus)
         blob-store (:blob-store nimbus)
         code-ids (set (code-ids blob-store))
-        active-topologies (set (.active-storms storm-cluster-state))
+        active-topologies (set (.activeStorms storm-cluster-state))
         corrupt-topologies (set/difference active-topologies code-ids)]
     (doseq [corrupt corrupt-topologies]
       (log-message "Corrupt topology " corrupt " has state on zookeeper but 
doesn't have a local dir on Nimbus. Cleaning up...")
-      (.remove-storm! storm-cluster-state corrupt)
+      (.removeStorm storm-cluster-state corrupt)
       (if (instance? LocalFsBlobStore blob-store)
         (doseq [blob-key (get-key-list-from-id (:conf nimbus) corrupt)]
-          (.remove-blobstore-key! storm-cluster-state blob-key))))))
+          (.removeBlobstoreKey storm-cluster-state blob-key))))))
 
 (defn setup-blobstore [nimbus]
   "Sets up blobstore state for all current keys."
   (let [storm-cluster-state (:storm-cluster-state nimbus)
         blob-store (:blob-store nimbus)
         local-set-of-keys (set (get-key-seq-from-blob-store blob-store))
-        all-keys (set (.active-keys storm-cluster-state))
+        all-keys (set (.activeKeys storm-cluster-state))
         locally-available-active-keys (set/intersection local-set-of-keys 
all-keys)
         keys-to-delete (set/difference local-set-of-keys all-keys)
         conf (:conf nimbus)
@@ -1193,10 +1197,10 @@
       (.deleteBlob blob-store key nimbus-subject))
     (log-debug "Creating list of key entries for blobstore inside zookeeper" 
all-keys "local" locally-available-active-keys)
     (doseq [key locally-available-active-keys]
-      (.setup-blobstore! storm-cluster-state key (:nimbus-host-port-info 
nimbus) (get-version-for-key key nimbus-host-port-info conf)))))
+      (.setupBlobstore storm-cluster-state key (:nimbus-host-port-info nimbus) 
(get-version-for-key key nimbus-host-port-info conf)))))
 
 (defn- get-errors [storm-cluster-state storm-id component-id]
-  (->> (.errors storm-cluster-state storm-id component-id)
+  (->> (apply clojurify-error (.errors storm-cluster-state storm-id 
component-id))
        (map #(doto (ErrorInfo. (:error %) (:time-secs %))
                    (.set_host (:host %))
                    (.set_port (:port %))))))
@@ -1293,11 +1297,11 @@
           blob-store (:blob-store nimbus)
           renewers (:cred-renewers nimbus)
           update-lock (:cred-update-lock nimbus)
-          assigned-ids (set (.active-storms storm-cluster-state))]
+          assigned-ids (set (.activeStorms storm-cluster-state))]
       (when-not (empty? assigned-ids)
         (doseq [id assigned-ids]
           (locking update-lock
-            (let [orig-creds (.credentials storm-cluster-state id nil)
+            (let [orig-creds (clojurify-crdentials (.credentials 
storm-cluster-state id nil))
                   topology-conf (try-read-storm-conf (:conf nimbus) id 
blob-store)]
               (if orig-creds
                 (let [new-creds (HashMap. orig-creds)]
@@ -1305,7 +1309,7 @@
                     (log-message "Renewing Creds For " id " with " renewer)
                     (.renew renewer new-creds (Collections/unmodifiableMap 
topology-conf)))
                   (when-not (= orig-creds new-creds)
-                    (.set-credentials! storm-cluster-state id new-creds 
topology-conf)
+                    (.setCredentials storm-cluster-state id 
(thriftify-credentials new-creds) topology-conf)
                     ))))))))
     (log-message "not a leader skipping , credential renweal.")))
 
@@ -1370,11 +1374,11 @@
                                           operation)
                   topology (try-read-storm-topology storm-id blob-store)
                   task->component (storm-task-info topology topology-conf)
-                  base (.storm-base storm-cluster-state storm-id nil)
+                  base (clojurify-storm-base (.stormBase storm-cluster-state 
storm-id nil))
                   launch-time-secs (if base (:launch-time-secs base)
                                      (throw
                                        (NotAliveException. (str storm-id))))
-                  assignment (.assignment-info storm-cluster-state storm-id 
nil)
+                  assignment (clojurify-assignment (.assignmentInfo 
storm-cluster-state storm-id nil))
                   beats (map-val :heartbeat (get @(:heartbeats-cache nimbus)
                                                  storm-id))
                   all-components (set (vals task->component))]
@@ -1388,16 +1392,16 @@
                :task->component task->component
                :base base}))
         get-last-error (fn [storm-cluster-state storm-id component-id]
-                         (if-let [e (.last-error storm-cluster-state
+                         (if-let [e (clojurify-error  (.lastError 
storm-cluster-state
                                                  storm-id
-                                                 component-id)]
+                                                 component-id))]
                            (doto (ErrorInfo. (:error e) (:time-secs e))
                              (.set_host (:host e))
                              (.set_port (:port e)))))]
     (.prepare ^org.apache.storm.nimbus.ITopologyValidator (:validator nimbus) 
conf)
 
     ;add to nimbuses
-    (.add-nimbus-host! (:storm-cluster-state nimbus) (.toHostPortString 
(:nimbus-host-port-info nimbus))
+    (.addNimbusHost (:storm-cluster-state nimbus) (.toHostPortString 
(:nimbus-host-port-info nimbus))
       (NimbusSummary.
         (.getHost (:nimbus-host-port-info nimbus))
         (.getPort (:nimbus-host-port-info nimbus))
@@ -1413,7 +1417,7 @@
       (setup-blobstore nimbus))
 
     (when (is-leader nimbus :throw-exception false)
-      (doseq [storm-id (.active-storms (:storm-cluster-state nimbus))]
+      (doseq [storm-id (.activeStorms (:storm-cluster-state nimbus))]
         (transition! nimbus storm-id :startup)))
     (schedule-recurring (:timer nimbus)
                         0
@@ -1520,12 +1524,12 @@
             (locking (:submit-lock nimbus)
               (check-storm-active! nimbus storm-name false)
               ;;cred-update-lock is not needed here because creds are being 
added for the first time.
-              (.set-credentials! storm-cluster-state storm-id credentials 
storm-conf)
+              (.setCredentials storm-cluster-state storm-id 
(thriftify-credentials credentials) storm-conf)
               (log-message "uploadedJar " uploadedJarLocation)
               (setup-storm-code nimbus conf storm-id uploadedJarLocation 
total-storm-conf topology)
               (wait-for-desired-code-replication nimbus total-storm-conf 
storm-id)
-              (.setup-heartbeats! storm-cluster-state storm-id)
-              (.setup-backpressure! storm-cluster-state storm-id)
+              (.setupHeatbeats storm-cluster-state storm-id)
+              (.setupBackpressure storm-cluster-state storm-id)
               (notify-topology-action-listener nimbus storm-name 
"submitTopology")
               (let [thrift-status->kw-status {TopologyInitialStatus/INACTIVE 
:inactive
                                               TopologyInitialStatus/ACTIVE 
:active}]
@@ -1613,7 +1617,7 @@
           (log-message "Nimbus setting debug to " enable? " for storm-name '" 
storm-name "' storm-id '" storm-id "' sampling pct '" spct "'"
             (if (not (clojure.string/blank? component-id)) (str " component-id 
'" component-id "'")))
           (locking (:submit-lock nimbus)
-            (.update-storm! storm-cluster-state storm-id storm-base-updates))))
+            (.updateStorm storm-cluster-state (thriftify-storm-base storm-id 
storm-base-updates)))))
 
       (^void setWorkerProfiler
         [this ^String id ^ProfileRequest profileRequest]
@@ -1622,7 +1626,7 @@
               storm-name (topology-conf TOPOLOGY-NAME)
               _ (check-authorization! nimbus storm-name topology-conf 
"setWorkerProfiler")
               storm-cluster-state (:storm-cluster-state nimbus)]
-          (.set-worker-profile-request storm-cluster-state id profileRequest)))
+          (.setWorkerProfileRequest storm-cluster-state id profileRequest)))
 
       (^List getComponentPendingProfileActions
         [this ^String id ^String component_id ^ProfileAction action]
@@ -1635,7 +1639,7 @@
                                              [(node->host node) port])
                                     executor->node+port)
               nodeinfos (stats/extract-nodeinfos-from-hb-for-comp 
executor->host+port task->component false component_id)
-              all-pending-actions-for-topology (.get-topology-profile-requests 
storm-cluster-state id true)
+              all-pending-actions-for-topology (clojurify-profile-request 
(.getTopologyProfileRequests storm-cluster-state id true))
               latest-profile-actions (remove nil? (map (fn [nodeInfo]
                                                          (->> 
all-pending-actions-for-topology
                                                               (filter #(and (= 
(:host nodeInfo) (.get_node (.get_nodeInfo %)))
@@ -1653,7 +1657,7 @@
               storm-name (topology-conf TOPOLOGY-NAME)
               _ (check-authorization! nimbus storm-name topology-conf 
"setLogConfig")
               storm-cluster-state (:storm-cluster-state nimbus)
-              merged-log-config (or (.topology-log-config storm-cluster-state 
id nil) (LogConfig.))
+              merged-log-config (or (.topologyLogConfig storm-cluster-state id 
nil) (LogConfig.))
               named-loggers (.get_named_logger_level merged-log-config)]
             (doseq [[_ level] named-loggers]
               (.set_action level LogLevelAction/UNCHANGED))
@@ -1671,7 +1675,7 @@
                                (.containsKey named-loggers logger-name))
                         (.remove named-loggers logger-name))))))
             (log-message "Setting log config for " storm-name ":" 
merged-log-config)
-            (.set-topology-log-config! storm-cluster-state id 
merged-log-config)))
+            (.setTopologyLogConfig storm-cluster-state id merged-log-config)))
 
       (uploadNewCredentials [this storm-name credentials]
         (mark! nimbus:num-uploadNewCredentials-calls)
@@ -1680,7 +1684,7 @@
               topology-conf (try-read-storm-conf conf storm-id blob-store)
               creds (when credentials (.get_creds credentials))]
           (check-authorization! nimbus storm-name topology-conf 
"uploadNewCredentials")
-          (locking (:cred-update-lock nimbus) (.set-credentials! 
storm-cluster-state storm-id creds topology-conf))))
+          (locking (:cred-update-lock nimbus) (.setCredentials 
storm-cluster-state storm-id (thriftify-credentials creds) topology-conf))))
 
       (beginFileUpload [this]
         (mark! nimbus:num-beginFileUpload-calls)
@@ -1755,7 +1759,7 @@
               storm-name (topology-conf TOPOLOGY-NAME)
               _ (check-authorization! nimbus storm-name topology-conf 
"getLogConfig")
              storm-cluster-state (:storm-cluster-state nimbus)
-             log-config (.topology-log-config storm-cluster-state id nil)]
+             log-config (.topologyLogConfig storm-cluster-state id nil)]
            (if log-config log-config (LogConfig.))))
 
       (^String getTopologyConf [this ^String id]
@@ -1800,7 +1804,8 @@
                                        (when-let [version (:version info)] 
(.set_version sup-sum version))
                                        sup-sum))
               nimbus-uptime ((:uptime nimbus))
-              bases (topology-bases storm-cluster-state)
+              javabases (topology-bases storm-cluster-state)
+              bases (into {} (dofor [[id base] javabases][id 
(clojurify-storm-base base)]))
               nimbuses (.nimbuses storm-cluster-state)
 
               ;;update the isLeader field for each nimbus summary
@@ -1812,7 +1817,7 @@
                     (.set_isLeader nimbus-summary (and (= leader-host 
(.get_host nimbus-summary)) (= leader-port (.get_port nimbus-summary))))))
 
               topology-summaries (dofor [[id base] bases :when base]
-                                   (let [assignment (.assignment-info 
storm-cluster-state id nil)
+                                   (let [assignment (clojurify-assignment 
(.assignmentInfo storm-cluster-state id nil))
                                          topo-summ (TopologySummary. id
                                                      (:storm-name base)
                                                      (->> 
(:executor->node+port assignment)
@@ -1939,7 +1944,7 @@
               nimbus-host-port-info (:nimbus-host-port-info nimbus)
               conf (:conf nimbus)]
           (if (instance? LocalFsBlobStore blob-store)
-              (.setup-blobstore! storm-cluster-state blob-key 
nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info 
conf)))
+              (.setupBlobstore storm-cluster-state blob-key 
nimbus-host-port-info (get-version-for-key blob-key nimbus-host-port-info 
conf)))
           (log-debug "Created state in zookeeper" storm-cluster-state 
blob-store nimbus-host-port-info)))
 
       (^void uploadBlobChunk [this ^String session ^ByteBuffer blob-chunk]
@@ -2019,8 +2024,8 @@
                            (.subject))]
           (.deleteBlob (:blob-store nimbus) blob-key subject)
           (when (instance? LocalFsBlobStore blob-store)
-            (.remove-blobstore-key! (:storm-cluster-state nimbus) blob-key)
-            (.remove-key-version! (:storm-cluster-state nimbus) blob-key))
+            (.removeBlobstoreKey (:storm-cluster-state nimbus) blob-key)
+            (.removeKeyVersion (:storm-cluster-state nimbus) blob-key))
           (log-message "Deleted blob for key " blob-key)))
 
       (^ListBlobsResult listBlobs [this ^String session]
@@ -2157,7 +2162,8 @@
 
       (^TopologyHistoryInfo getTopologyHistory [this ^String user]
         (let [storm-cluster-state (:storm-cluster-state nimbus)
-              bases (topology-bases storm-cluster-state)
+              javabases (topology-bases storm-cluster-state)
+              bases (into {} (dofor [[id  base] javabases][id 
(clojurify-storm-base base)]))
               assigned-topology-ids (.assignments storm-cluster-state nil)
               user-group-match-fn (fn [topo-id user conf]
                                     (let [topology-conf (try-read-storm-conf 
conf topo-id (:blob-store nimbus))

http://git-wip-us.apache.org/repos/asf/storm/blob/9a79fb7d/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --git a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj 
b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 337a1b4..079b221 100644
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@ -19,11 +19,11 @@
            [org.apache.storm.utils LocalState Time Utils ConfigUtils]
            [org.apache.storm.daemon Shutdownable]
            [org.apache.storm Constants]
-           [org.apache.storm.cluster ClusterStateContext DaemonType]
+           [org.apache.storm.cluster ClusterStateContext DaemonType 
StormZkClusterState Cluster]
            [java.net JarURLConnection]
            [java.net URI]
            [org.apache.commons.io FileUtils])
-  (:use [org.apache.storm config util log timer local-state])
+  (:use [org.apache.storm config util log timer local-state converter])
   (:import [org.apache.storm.generated AuthorizationException 
KeyNotFoundException WorkerResources])
   (:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo])
   (:import [java.nio.file Files StandardCopyOption])
@@ -33,7 +33,7 @@
   (:use [org.apache.storm.daemon common])
   (:require [org.apache.storm.command [healthcheck :as healthcheck]])
   (:require [org.apache.storm.daemon [worker :as worker]]
-            [org.apache.storm [process-simulator :as psim] [cluster :as 
cluster] [event :as event]]
+            [org.apache.storm [process-simulator :as psim] [event :as event]]
             [clojure.set :as set])
   (:import [org.apache.thrift.transport TTransportException])
   (:import [org.apache.zookeeper data.ACL ZooDefs$Ids ZooDefs$Perms])
@@ -63,21 +63,22 @@
           (->>
            (dofor [sid storm-ids]
                   (let [recorded-version (:version (get assignment-versions 
sid))]
-                    (if-let [assignment-version (.assignment-version 
storm-cluster-state sid callback)]
+                    (if-let [assignment-version (.assignmentVersion 
storm-cluster-state sid callback)]
                       (if (= assignment-version recorded-version)
                         {sid (get assignment-versions sid)}
-                        {sid (.assignment-info-with-version 
storm-cluster-state sid callback)})
+                        {sid (.assignmentInfoWithVersion storm-cluster-state 
sid callback)})
                       {sid nil})))
            (apply merge)
            (filter-val not-nil?))
           new-profiler-actions
           (->>
             (dofor [sid (distinct storm-ids)]
-                   (if-let [topo-profile-actions 
(.get-topology-profile-requests storm-cluster-state sid false)]
+
+                   (if-let [topo-profile-actions (into [] (for [request 
(.getTopologyProfileRequests storm-cluster-state sid false)] 
(clojurify-profile-request request)))]
                       {sid topo-profile-actions}))
            (apply merge))]
-         
-      {:assignments (into {} (for [[k v] new-assignments] [k (:data v)]))
+
+      {:assignments (into {} (for [[k v] new-assignments] [k 
(clojurify-assignment (:data v))]))
        :profiler-actions new-profiler-actions
        :versions new-assignments})))
 
@@ -316,11 +317,9 @@
    :uptime (uptime-computer)
    :version STORM-VERSION
    :worker-thread-pids-atom (atom {})
-   :storm-cluster-state (cluster/mk-storm-cluster-state conf :acls (when
-                                                                     
(Utils/isZkAuthenticationConfiguredStormServer
-                                                                       conf)
-                                                                     
SUPERVISOR-ZK-ACLS)
-                                                        :context 
(ClusterStateContext. DaemonType/SUPERVISOR))
+   :storm-cluster-state (Cluster/mkStormClusterState conf (when 
(Utils/isZkAuthenticationConfiguredStormServer conf)
+                                                     SUPERVISOR-ZK-ACLS)
+                                                        (ClusterStateContext. 
DaemonType/SUPERVISOR))
    :local-state (ConfigUtils/supervisorState conf)
    :supervisor-id (.getSupervisorId isupervisor)
    :assignment-id (.getAssignmentId isupervisor)
@@ -675,7 +674,7 @@
 
 (defn- delete-topology-profiler-action [storm-cluster-state storm-id 
profile-action]
   (log-message "Deleting profiler action.." profile-action)
-  (.delete-topology-profile-requests storm-cluster-state storm-id 
profile-action))
+  (.deleteTopologyProfileRequests storm-cluster-state storm-id 
(thriftify-profile-request profile-action)))
 
 (defnk launch-profiler-action-for-worker
   "Launch profiler action for a worker"
@@ -743,7 +742,7 @@
                       action-on-exit (fn [exit-code]
                                        (log-message log-prefix " 
profile-action exited for code: " exit-code)
                                        (if (and (= exit-code 0) stop?)
-                                         (delete-topology-profiler-action 
storm-cluster-state storm-id pro-action)))
+                                         (delete-topology-profiler-action 
storm-cluster-state storm-id (thriftify-profile-request pro-action))))
                       command (->> command (map str) (filter (complement 
empty?)))]
 
                   (try
@@ -776,10 +775,10 @@
         synchronize-blobs-fn (update-blobs-for-all-topologies-fn supervisor)
         downloaded-storm-ids (set (read-downloaded-storm-ids conf))
         run-profiler-actions-fn (mk-run-profiler-actions-for-all-topologies 
supervisor)
-        heartbeat-fn (fn [] (.supervisor-heartbeat!
+        heartbeat-fn (fn [] (.supervisorHeartbeat
                                (:storm-cluster-state supervisor)
                                (:supervisor-id supervisor)
-                               (->SupervisorInfo (current-time-secs)
+                              (thriftify-supervisor-info (->SupervisorInfo 
(current-time-secs)
                                                  (:my-hostname supervisor)
                                                  (:assignment-id supervisor)
                                                  (keys @(:curr-assignment 
supervisor))
@@ -788,7 +787,7 @@
                                                  (conf 
SUPERVISOR-SCHEDULER-META)
                                                  ((:uptime supervisor))
                                                  (:version supervisor)
-                                                 (mk-supervisor-capacities 
conf))))]
+                                                 (mk-supervisor-capacities 
conf)))))]
     (heartbeat-fn)
 
     ;; should synchronize supervisor so it doesn't launch anything after being 
down (optimization)

Reply via email to