Merge branch 'master' into ClusterUtils

Conflicts:
        storm-core/src/clj/org/apache/storm/cluster.clj
        
storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj
        storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
        storm-core/src/clj/org/apache/storm/daemon/common.clj
        storm-core/src/clj/org/apache/storm/daemon/executor.clj
        storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
        storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
        storm-core/src/clj/org/apache/storm/stats.clj
        storm-core/src/clj/org/apache/storm/testing.clj
        storm-core/src/clj/org/apache/storm/thrift.clj
        storm-core/src/clj/org/apache/storm/util.clj
        storm-core/src/clj/org/apache/storm/zookeeper.clj
        storm-core/test/clj/integration/org/apache/storm/integration_test.clj
        storm-core/test/clj/org/apache/storm/cluster_test.clj
        storm-core/test/clj/org/apache/storm/nimbus_test.clj
        storm-core/test/clj/org/apache/storm/supervisor_test.clj


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9a8962de
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9a8962de
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9a8962de

Branch: refs/heads/master
Commit: 9a8962de9c80fc6a5388fd6d63ab225268530adf
Parents: 2ee8bec 12ceb09
Author: xiaojian.fxj <xiaojian....@alibaba-inc.com>
Authored: Mon Feb 15 15:13:33 2016 +0800
Committer: xiaojian.fxj <xiaojian....@alibaba-inc.com>
Committed: Mon Feb 15 22:15:21 2016 +0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |  16 +
 README.markdown                                 |   1 +
 bin/storm-config.cmd                            |   6 +-
 bin/storm.cmd                                   |  47 +-
 bin/storm.py                                    |   8 +-
 conf/defaults.yaml                              |   4 +
 dev-tools/travis/travis-script.sh               |   4 +-
 .../starter/trident/TridentMapExample.java      | 123 +++
 external/sql/storm-sql-core/pom.xml             |   9 +
 external/storm-elasticsearch/pom.xml            |   2 +
 .../storm/hbase/security/HBaseSecurityUtil.java |  36 +-
 .../jvm/org/apache/storm/kafka/KafkaSpout.java  |   8 +-
 .../jvm/org/apache/storm/kafka/KafkaUtils.java  |  44 +-
 .../apache/storm/kafka/PartitionManager.java    |  42 +-
 .../kafka/trident/TridentKafkaEmitter.java      |  23 +-
 external/storm-mqtt/core/pom.xml                |   4 +-
 log4j2/cluster.xml                              |   2 +-
 log4j2/worker.xml                               |   2 +-
 pom.xml                                         |   9 +-
 storm-core/pom.xml                              |  11 +-
 .../src/clj/org/apache/storm/LocalCluster.clj   |   4 +-
 storm-core/src/clj/org/apache/storm/clojure.clj |   8 +-
 .../clj/org/apache/storm/command/blobstore.clj  |  11 +-
 .../org/apache/storm/command/config_value.clj   |  25 -
 .../org/apache/storm/command/dev_zookeeper.clj  |   6 +-
 .../clj/org/apache/storm/command/get_errors.clj |  12 +-
 .../apache/storm/command/shell_submission.clj   |   4 +-
 storm-core/src/clj/org/apache/storm/config.clj  |  18 +-
 .../src/clj/org/apache/storm/converter.clj      |  14 +-
 .../src/clj/org/apache/storm/daemon/acker.clj   |  21 +-
 .../src/clj/org/apache/storm/daemon/common.clj  |  46 +-
 .../src/clj/org/apache/storm/daemon/drpc.clj    |  25 +-
 .../clj/org/apache/storm/daemon/executor.clj    | 530 +++++-----
 .../clj/org/apache/storm/daemon/logviewer.clj   |  70 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 168 ++--
 .../clj/org/apache/storm/daemon/supervisor.clj  | 205 ++--
 .../src/clj/org/apache/storm/daemon/task.clj    |   2 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |  78 +-
 .../src/clj/org/apache/storm/disruptor.clj      |  10 +-
 storm-core/src/clj/org/apache/storm/event.clj   |   2 +-
 .../src/clj/org/apache/storm/local_state.clj    |   9 +-
 .../clj/org/apache/storm/messaging/loader.clj   |  34 -
 .../clj/org/apache/storm/messaging/local.clj    |  23 -
 .../org/apache/storm/pacemaker/pacemaker.clj    |   7 +-
 .../storm/pacemaker/pacemaker_state_factory.clj | 122 ---
 .../clj/org/apache/storm/process_simulator.clj  |   4 +-
 .../apache/storm/scheduler/DefaultScheduler.clj |   7 +-
 .../apache/storm/scheduler/EvenScheduler.clj    |  23 +-
 .../storm/scheduler/IsolationScheduler.clj      |  29 +-
 storm-core/src/clj/org/apache/storm/stats.clj   |  82 +-
 storm-core/src/clj/org/apache/storm/testing.clj |  89 +-
 storm-core/src/clj/org/apache/storm/thrift.clj  |   6 +-
 storm-core/src/clj/org/apache/storm/timer.clj   |  12 +-
 .../clj/org/apache/storm/trident/testing.clj    |   9 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj |  99 +-
 .../src/clj/org/apache/storm/ui/helpers.clj     |  14 +-
 storm-core/src/clj/org/apache/storm/util.clj    | 925 +----------------
 storm-core/src/jvm/org/apache/storm/Config.java |  39 +
 .../storm/cluster/PaceMakerStateStorage.java    |   4 +-
 .../org/apache/storm/command/ConfigValue.java   |  30 +
 .../storm/daemon/metrics/MetricsUtils.java      | 108 ++
 .../reporters/ConsolePreparableReporter.java    |  76 ++
 .../reporters/CsvPreparableReporter.java        |  80 ++
 .../reporters/JmxPreparableReporter.java        |  70 ++
 .../metrics/reporters/PreparableReporter.java   |  32 +
 .../storm/logging/ThriftAccessLogger.java       |  13 +-
 .../apache/storm/pacemaker/PacemakerClient.java |   5 +
 .../serialization/SerializationFactory.java     |  17 +-
 .../staticmocking/MockedConfigUtils.java        |  31 -
 .../jvm/org/apache/storm/trident/Stream.java    |  87 +-
 .../storm/trident/operation/Consumer.java       |  35 +
 .../trident/operation/FlatMapFunction.java      |  37 +
 .../storm/trident/operation/MapFunction.java    |  36 +
 .../operation/impl/ConsumerExecutor.java        |  38 +
 .../operation/impl/FlatMapFunctionExecutor.java |  43 +
 .../operation/impl/MapFunctionExecutor.java     |  41 +
 .../trident/planner/processor/MapProcessor.java |  87 ++
 .../jvm/org/apache/storm/utils/ConfigUtils.java |  20 +-
 .../jvm/org/apache/storm/utils/Container.java   |  11 +-
 .../jvm/org/apache/storm/utils/IPredicate.java  |  27 +
 .../org/apache/storm/utils/NimbusClient.java    |   2 +-
 .../utils/StormConnectionStateConverter.java    |  44 +
 .../jvm/org/apache/storm/utils/TestUtils.java   |  34 -
 .../src/jvm/org/apache/storm/utils/Time.java    |  26 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   | 989 +++++++++++++++++--
 .../storm/validation/ConfigValidation.java      |   2 +-
 .../org/apache/storm/zookeeper/Zookeeper.java   |   7 +
 .../org/apache/storm/integration_test.clj       |  98 +-
 .../org/apache/storm/testing4j_test.clj         |  37 +-
 .../apache/storm/trident/integration_test.clj   |  15 +-
 .../test/clj/org/apache/storm/cluster_test.clj  |  27 +-
 .../test/clj/org/apache/storm/drpc_test.clj     |  23 +-
 .../clj/org/apache/storm/logviewer_test.clj     | 267 ++---
 .../storm/messaging/netty_integration_test.clj  |   2 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   | 112 ++-
 .../storm/pacemaker_state_factory_test.clj      |  74 +-
 .../scheduler/resource_aware_scheduler_test.clj |  21 +-
 .../apache/storm/security/auth/auth_test.clj    |  11 +-
 .../authorizer/DRPCSimpleACLAuthorizer_test.clj |   2 +-
 .../BlowfishTupleSerializer_test.clj            |   1 -
 .../clj/org/apache/storm/serialization_test.clj |  23 +-
 .../clj/org/apache/storm/supervisor_test.clj    | 649 ++++++------
 .../clj/org/apache/storm/transactional_test.clj |  18 +
 .../clj/org/apache/storm/trident/state_test.clj |   5 +-
 .../clj/org/apache/storm/trident/tuple_test.clj |  15 +-
 .../test/clj/org/apache/storm/utils_test.clj    |  16 +-
 .../test/clj/org/apache/storm/worker_test.clj   |   1 -
 .../staticmocking/ConfigUtilsInstaller.java     |  38 +
 .../utils/staticmocking/UtilsInstaller.java     |  38 +
 .../storm/utils/staticmocking/package-info.java |  95 ++
 110 files changed, 4199 insertions(+), 2614 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/conf/defaults.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
index 7be526d,657e242..ab5cbed
--- a/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
+++ b/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
@@@ -14,7 -14,8 +14,8 @@@
  ;; See the License for the specific language governing permissions and
  ;; limitations under the License.
  (ns org.apache.storm.command.dev-zookeeper
+   (:import [org.apache.storm.utils Utils])
 -  (:use [org.apache.storm zookeeper util config])
 +  (:use [org.apache.storm util config])
    (:import [org.apache.storm.utils ConfigUtils])
    (:import [org.apache.storm.zookeeper Zookeeper])
    (:gen-class))

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/command/shell_submission.clj
index 3978d2f,0253338..870e7f6
--- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@@ -15,8 -15,9 +15,9 @@@
  ;; limitations under the License.
  (ns org.apache.storm.command.shell-submission
    (:import [org.apache.storm StormSubmitter]
+            [org.apache.storm.utils Utils]
             [org.apache.storm.zookeeper Zookeeper])
 -  (:use [org.apache.storm thrift util config log zookeeper])
 +  (:use [org.apache.storm thrift util config log])
    (:require [clojure.string :as str])
    (:import [org.apache.storm.utils ConfigUtils])
    (:gen-class))

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/converter.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/common.clj
index b144f40,eb1ec1e..db342d2
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@@ -15,17 -15,19 +15,20 @@@
  ;; limitations under the License.
  (ns org.apache.storm.daemon.common
    (:use [org.apache.storm log config util])
 -  (:import [org.apache.storm.generated StormTopology
 +  (:import [org.apache.storm.generated StormTopology NodeInfo
              InvalidTopologyException GlobalStreamId]
-            [org.apache.storm.utils ThriftTopologyUtils])
-   (:import [org.apache.storm.utils Utils ConfigUtils])
+            [org.apache.storm.utils Utils ConfigUtils IPredicate 
ThriftTopologyUtils]
+            [org.apache.storm.daemon.metrics.reporters PreparableReporter]
+            [com.codahale.metrics MetricRegistry])
+   (:import [org.apache.storm.daemon.metrics MetricsUtils])
    (:import [org.apache.storm.task WorkerTopologyContext])
    (:import [org.apache.storm Constants])
 +  (:import [org.apache.storm.cluster StormClusterStateImpl])
    (:import [org.apache.storm.metric SystemBolt])
    (:import [org.apache.storm.metric EventLoggerBolt])
-   (:import [org.apache.storm.security.auth IAuthorizer]) 
-   (:import [java.io InterruptedIOException])
+   (:import [org.apache.storm.security.auth IAuthorizer])
+   (:import [java.io InterruptedIOException]
+            [org.json.simple JSONValue])
    (:require [clojure.set :as set])  
    (:require [org.apache.storm.daemon.acker :as acker])
    (:require [org.apache.storm.thrift :as thrift])
@@@ -73,12 -83,10 +84,11 @@@
  (defn new-executor-stats []
    (ExecutorStats. 0 0 0 0 0))
  
 +
  (defn get-storm-id [storm-cluster-state storm-name]
-   (let [active-storms (.activeStorms storm-cluster-state)]
-     (find-first
-       #(= storm-name (.get_name (.stormBase storm-cluster-state % nil)))
-       active-storms)
 -  (let [active-storms (.active-storms storm-cluster-state)
 -        pred  (reify IPredicate (test [this x] (= storm-name (:storm-name 
(.storm-base storm-cluster-state x nil)))))]
++  (let [active-storms (.activeStorms storm-cluster-state)
++        pred  (reify IPredicate (test [this x] (= storm-name (.get_name 
(.stormBase storm-cluster-state x nil)))))]
+     (Utils/findOne pred active-storms)
      ))
  
  (defn topology-bases [storm-cluster-state]

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 49ae6cf,e2380b7..33b89ed
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@@ -34,10 -34,13 +34,12 @@@
    (:import [org.apache.storm.daemon Shutdownable])
    (:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo 
IMetricsConsumer$DataPoint StateMetric])
    (:import [org.apache.storm Config Constants])
 -  (:import [org.apache.storm.cluster ClusterStateContext DaemonType])
 +  (:import [org.apache.storm.cluster ClusterStateContext DaemonType 
StormClusterStateImpl ClusterUtils])
    (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping 
LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
-   (:import [java.util.concurrent ConcurrentLinkedQueue])
+   (:import [java.lang Thread Thread$UncaughtExceptionHandler]
+            [java.util.concurrent ConcurrentLinkedQueue]
+            [org.json.simple JSONValue])
 -  (:require [org.apache.storm [thrift :as thrift]
 -             [cluster :as cluster] [disruptor :as disruptor] [stats :as 
stats]])
 +  (:require [org.apache.storm [thrift :as thrift] [disruptor :as disruptor] 
[stats :as stats]])
    (:require [org.apache.storm.daemon [task :as task]])
    (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics])
    (:require [clojure.set :as set]))
@@@ -206,9 -211,9 +210,9 @@@
        (swap! interval-errors inc)
  
        (when (<= @interval-errors max-per-interval)
 -        (cluster/report-error (:storm-cluster-state executor) (:storm-id 
executor) (:component-id executor)
 +        (.reportError (:storm-cluster-state executor) (:storm-id executor) 
(:component-id executor)
-                               (hostname storm-conf)
+                               (Utils/hostname storm-conf)
 -                              (.getThisWorkerPort (:worker-context executor)) 
error)
 +          (long (.getThisWorkerPort (:worker-context executor))) error)
          ))))
  
  ;; in its own function so that it can be mocked out by tracked topologies

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index daf5e45,710cd83..6af1b81
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@@ -38,9 -39,9 +39,9 @@@
    (:import [org.apache.storm.scheduler INimbus SupervisorDetails WorkerSlot 
TopologyDetails
              Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl 
DefaultScheduler ExecutorDetails])
    (:import [org.apache.storm.nimbus NimbusInfo])
-   (:import [org.apache.storm.utils TimeCacheMap TimeCacheMap$ExpiredCallback 
Utils ConfigUtils TupleUtils ThriftTopologyUtils
+   (:import [org.apache.storm.utils TimeCacheMap Time 
TimeCacheMap$ExpiredCallback Utils ConfigUtils TupleUtils ThriftTopologyUtils
              BufferFileInputStream BufferInputStream])
 -  (:import [org.apache.storm.generated NotAliveException 
AlreadyAliveException StormTopology ErrorInfo
 +  (:import [org.apache.storm.generated NotAliveException 
AlreadyAliveException StormTopology ErrorInfo ClusterWorkerHeartbeat
              ExecutorInfo InvalidTopologyException Nimbus$Iface 
Nimbus$Processor SubmitOptions TopologyInitialStatus
              KillOptions RebalanceOptions ClusterSummary SupervisorSummary 
TopologySummary TopologyInfo TopologyHistoryInfo
              ExecutorSummary AuthorizationException GetInfoOptions 
NumErrorsChoice SettableBlobMeta ReadableBlobMeta
@@@ -407,10 -414,10 +409,10 @@@
    [storm-cluster-state]
  
    (let [assignments (.assignments storm-cluster-state nil)]
-     (defaulted
+     (or
        (apply merge-with set/union
               (for [a assignments
 -                   [_ [node port]] (-> (.assignment-info storm-cluster-state 
a nil) :executor->node+port)]
 +                   [_ [node port]] (-> (clojurify-assignment (.assignmentInfo 
storm-cluster-state a nil)) :executor->node+port)]
                 {node #{port}}
                 ))
        {})
@@@ -988,10 -1002,10 +1002,10 @@@
          topology (system-topology! storm-conf (read-storm-topology storm-id 
blob-store))
          num-executors (->> (all-components topology) (map-val 
num-start-executors))]
      (log-message "Activating " storm-name ": " storm-id)
 -    (.activate-storm! storm-cluster-state
 +    (.activateStorm storm-cluster-state
                        storm-id
 -                      (StormBase. storm-name
 +      (thriftify-storm-base (StormBase. storm-name
-                                   (current-time-secs)
+                                   (Time/currentTimeSecs)
                                    {:type topology-initial-status}
                                    (storm-conf TOPOLOGY-WORKERS)
                                    num-executors
@@@ -1137,9 -1152,9 +1152,9 @@@
          (when-not (empty? to-cleanup-ids)
            (doseq [id to-cleanup-ids]
              (log-message "Cleaning up " id)
 -            (.teardown-heartbeats! storm-cluster-state id)
 -            (.teardown-topology-errors! storm-cluster-state id)
 +            (.teardownHeartbeats storm-cluster-state id)
 +            (.teardownTopologyErrors storm-cluster-state id)
-             (rmr (ConfigUtils/masterStormDistRoot conf id))
+             (Utils/forceDelete (ConfigUtils/masterStormDistRoot conf id))
              (blob-rm-topology-keys id blob-store storm-cluster-state)
              (swap! (:heartbeats-cache nimbus) dissoc id)))))
      (log-message "not a leader, skipping cleanup")))
@@@ -1811,8 -1830,8 +1838,8 @@@
                                           (.set_used_cpu sup-sum used-cpu))
                                         (when-let [version (:version info)] 
(.set_version sup-sum version))
                                         sup-sum))
-               nimbus-uptime ((:uptime nimbus))
+               nimbus-uptime (. (:uptime nimbus) upTime)
 -              bases (topology-bases storm-cluster-state)
 +              bases (nimbus-topology-bases storm-cluster-state)
                nimbuses (.nimbuses storm-cluster-state)
  
                ;;update the isLeader field for each nimbus summary

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 3a83d03,ae9e92f..c1f058f
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@@ -16,14 -16,15 +16,15 @@@
  (ns org.apache.storm.daemon.supervisor
    (:import [java.io File IOException FileOutputStream])
    (:import [org.apache.storm.scheduler ISupervisor]
-            [org.apache.storm.utils LocalState Time Utils ConfigUtils]
+            [org.apache.storm.utils LocalState Time Utils 
Utils$ExitCodeCallable
+                                    ConfigUtils]
             [org.apache.storm.daemon Shutdownable]
             [org.apache.storm Constants]
 -           [org.apache.storm.cluster ClusterStateContext DaemonType]
 +           [org.apache.storm.cluster ClusterStateContext DaemonType 
StormClusterStateImpl ClusterUtils]
             [java.net JarURLConnection]
-            [java.net URI]
+            [java.net URI URLDecoder]
             [org.apache.commons.io FileUtils])
 -  (:use [org.apache.storm config util log timer local-state])
 +  (:use [org.apache.storm config util log timer local-state converter])
    (:import [org.apache.storm.generated AuthorizationException 
KeyNotFoundException WorkerResources])
    (:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo])
    (:import [java.nio.file Files StandardCopyOption])
@@@ -315,12 -319,14 +321,12 @@@
     :shared-context shared-context
     :isupervisor isupervisor
     :active (atom true)
-    :uptime (uptime-computer)
+    :uptime (Utils/makeUptimeComputer)
     :version STORM-VERSION
     :worker-thread-pids-atom (atom {})
 -   :storm-cluster-state (cluster/mk-storm-cluster-state conf :acls (when
 -                                                                     
(Utils/isZkAuthenticationConfiguredStormServer
 -                                                                       conf)
 -                                                                     
SUPERVISOR-ZK-ACLS)
 -                                                        :context 
(ClusterStateContext. DaemonType/SUPERVISOR))
 +   :storm-cluster-state (ClusterUtils/mkStormClusterState conf (when 
(Utils/isZkAuthenticationConfiguredStormServer conf)
 +                                                     SUPERVISOR-ZK-ACLS)
 +                                                        (ClusterStateContext. 
DaemonType/SUPERVISOR))
     :local-state (ConfigUtils/supervisorState conf)
     :supervisor-id (.getSupervisorId isupervisor)
     :assignment-id (.getAssignmentId isupervisor)
@@@ -777,19 -791,19 +792,19 @@@
          synchronize-blobs-fn (update-blobs-for-all-topologies-fn supervisor)
          downloaded-storm-ids (set (read-downloaded-storm-ids conf))
          run-profiler-actions-fn (mk-run-profiler-actions-for-all-topologies 
supervisor)
 -        heartbeat-fn (fn [] (.supervisor-heartbeat!
 +        heartbeat-fn (fn [] (.supervisorHeartbeat
                                 (:storm-cluster-state supervisor)
                                 (:supervisor-id supervisor)
-                               (thriftify-supervisor-info (->SupervisorInfo 
(current-time-secs)
 -                               (->SupervisorInfo (Time/currentTimeSecs)
++                               (thriftify-supervisor-info (->SupervisorInfo 
(Time/currentTimeSecs)
                                                   (:my-hostname supervisor)
                                                   (:assignment-id supervisor)
                                                   (keys @(:curr-assignment 
supervisor))
                                                    ;; used ports
                                                   (.getMetadata isupervisor)
                                                   (conf 
SUPERVISOR-SCHEDULER-META)
-                                                  ((:uptime supervisor))
+                                                  (. (:uptime supervisor) 
upTime)
                                                   (:version supervisor)
 -                                                 (mk-supervisor-capacities 
conf))))]
 +                                                 (mk-supervisor-capacities 
conf)))))]
      (heartbeat-fn)
  
      ;; should synchronize supervisor so it doesn't launch anything after 
being down (optimization)
@@@ -1162,7 -1183,7 +1184,8 @@@
        (.readBlobTo blob-store (ConfigUtils/masterStormConfKey storm-id) 
(FileOutputStream. (ConfigUtils/supervisorStormConfPath tmproot)) nil)
        (finally
          (.shutdown blob-store)))
--    (FileUtils/moveDirectory (File. tmproot) (File. stormroot))
++    (try (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) (catch 
Exception e))
++
      (setup-storm-code-dir conf (clojurify-structure 
(ConfigUtils/readSupervisorStormConf conf storm-id)) stormroot)
      (let [classloader (.getContextClassLoader (Thread/currentThread))
            resources-jar (resources-jar)

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/worker.clj
index ae5be57,fe8cfae..9863427
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@@ -19,16 -19,18 +19,18 @@@
    (:require [clj-time.core :as time])
    (:require [clj-time.coerce :as coerce])
    (:require [org.apache.storm.daemon [executor :as executor]])
 -  (:require [org.apache.storm [disruptor :as disruptor] [cluster :as 
cluster]])
 +  (:require [org.apache.storm [disruptor :as disruptor]])
    (:require [clojure.set :as set])
-   (:require [org.apache.storm.messaging.loader :as msg-loader])
    (:import [java.util.concurrent Executors]
-            [org.apache.storm.hooks IWorkerHook BaseWorkerHook])
-   (:import [java.util ArrayList HashMap])
-   (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer 
ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue])
+            [org.apache.storm.hooks IWorkerHook BaseWorkerHook]
+            [uk.org.lidalia.sysoutslf4j.context SysOutOverSLF4J])
+   (:import [java.util ArrayList HashMap]
+            [java.util.concurrent.locks ReentrantReadWriteLock])
+   (:import [org.apache.commons.io FileUtils])
+   (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer 
ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue Time])
    (:import [org.apache.storm.grouping LoadMapping])
    (:import [org.apache.storm.messaging TransportFactory])
-   (:import [org.apache.storm.messaging TaskMessage IContext IConnection 
ConnectionWithStatus ConnectionWithStatus$Status])
+   (:import [org.apache.storm.messaging TaskMessage IContext IConnection 
ConnectionWithStatus ConnectionWithStatus$Status 
DeserializingConnectionCallback])
    (:import [org.apache.storm.daemon Shutdownable])
    (:import [org.apache.storm.serialization KryoTupleSerializer])
    (:import [org.apache.storm.generated StormTopology])

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj
----------------------------------------------------------------------
diff --cc 
storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj
index a36da3a,be4361a..0000000
deleted file mode 100644,100644
--- a/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj
+++ /dev/null
@@@ -1,122 -1,141 +1,0 @@@
--;; Licensed to the Apache Software Foundation (ASF) under one
--;; or more contributor license agreements.  See the NOTICE file
--;; distributed with this work for additional information
--;; regarding copyright ownership.  The ASF licenses this file
--;; to you under the Apache License, Version 2.0 (the
--;; "License"); you may not use this file except in compliance
--;; with the License.  You may obtain a copy of the License at
--;;
--;; http://www.apache.org/licenses/LICENSE-2.0
--;;
--;; Unless required by applicable law or agreed to in writing, software
--;; distributed under the License is distributed on an "AS IS" BASIS,
--;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--;; See the License for the specific language governing permissions and
--;; limitations under the License.
--
--(ns org.apache.storm.pacemaker.pacemaker-state-factory
--  (:require [org.apache.storm.pacemaker pacemaker]
 -            [org.apache.storm.cluster-state [zookeeper-state-factory :as 
zk-factory]]
--            [org.apache.storm
--             [config :refer :all]
 -             [cluster :refer :all]
--             [log :refer :all]
--             [util :as util]])
--  (:import [org.apache.storm.generated
--            HBExecutionException HBServerMessageType HBMessage
--            HBMessageData HBPulse]
-            [org.apache.storm.cluster ZKStateStorage ClusterUtils 
IStateStorage]
 -           [org.apache.storm.cluster_state zookeeper_state_factory]
 -           [org.apache.storm.cluster ClusterState]
--           [org.apache.storm.pacemaker PacemakerClient])
--  (:gen-class
-     :implements [org.apache.storm.cluster.StateStorageFactory]))
 -   :implements [org.apache.storm.cluster.ClusterStateFactory]))
--
--;; So we can mock the client for testing
--(defn makeClient [conf]
--  (PacemakerClient. conf))
--
--(defn makeZKState [conf auth-conf acls context]
-   (ClusterUtils/mkStateStorage conf auth-conf acls context))
 -  (.mkState (zookeeper_state_factory.) conf auth-conf acls context))
--
--(def max-retries 10)
 -
 -(defn retry-on-exception
 -  "Retries specific function on exception based on retries count"
 -  [retries task-description f & args]
 -  (let [res (try {:value (apply f args)}
 -                 (catch Exception e
 -                   (if (<= 0 retries)
 -                     (throw e)
 -                     {:exception e})))]
 -    (if (:exception res)
 -      (do 
 -        (log-error (:exception res) (str "Failed to " task-description ". 
Will make [" retries "] more attempts."))
 -        (recur (dec retries) task-description f args))
 -      (do 
 -        (log-debug (str "Successful " task-description "."))
 -        (:value res)))))
--
--(defn -mkState [this conf auth-conf acls context]
--  (let [zk-state (makeZKState conf auth-conf acls context)
--        pacemaker-client (makeClient conf)]
--
--    (reify
-       IStateStorage
 -      ClusterState
--      ;; Let these pass through to the zk-state. We only want to handle 
heartbeats.
--      (register [this callback] (.register zk-state callback))
--      (unregister [this callback] (.unregister zk-state callback))
--      (set_ephemeral_node [this path data acls] (.set_ephemeral_node zk-state 
path data acls))
--      (create_sequential [this path data acls] (.create_sequential zk-state 
path data acls))
--      (set_data [this path data acls] (.set_data zk-state path data acls))
--      (delete_node [this path] (.delete_node zk-state path))
--      (delete_node_blobstore [this path nimbus-host-port-info] 
(.delete_node_blobstore zk-state path nimbus-host-port-info))
--      (get_data [this path watch?] (.get_data zk-state path watch?))
--      (get_data_with_version [this path watch?] (.get_data_with_version 
zk-state path watch?))
--      (get_version [this path watch?] (.get_version zk-state path watch?))
--      (get_children [this path watch?] (.get_children zk-state path watch?))
--      (mkdirs [this path acls] (.mkdirs zk-state path acls))
--      (node_exists [this path watch?] (.node_exists zk-state path watch?))
--      (add_listener [this listener] (.add_listener zk-state listener))
--      (sync_path [this path] (.sync_path zk-state path))
--      
--      (set_worker_hb [this path data acls]
-         (util/retry-on-exception
 -        (retry-on-exception
--         max-retries
--         "set_worker_hb"
--         #(let [response
--                (.send pacemaker-client
--                       (HBMessage. HBServerMessageType/SEND_PULSE
--                                   (HBMessageData/pulse
--                                    (doto (HBPulse.)
--                                      (.set_id path)
--                                      (.set_details data)))))]
--            (if (= (.get_type response) 
HBServerMessageType/SEND_PULSE_RESPONSE)
--              :ok
--              (throw (HBExecutionException. "Invalid Response Type"))))))
--
--      (delete_worker_hb [this path]
-         (util/retry-on-exception
 -        (retry-on-exception
--         max-retries
--         "delete_worker_hb"
--         #(let [response
--                (.send pacemaker-client
--                       (HBMessage. HBServerMessageType/DELETE_PATH
--                                   (HBMessageData/path path)))]
--            (if (= (.get_type response) 
HBServerMessageType/DELETE_PATH_RESPONSE)
--              :ok
--              (throw (HBExecutionException. "Invalid Response Type"))))))
--      
--      (get_worker_hb [this path watch?]
-         (util/retry-on-exception
 -        (retry-on-exception
--         max-retries
--         "get_worker_hb"
--         #(let [response
--                (.send pacemaker-client
--                       (HBMessage. HBServerMessageType/GET_PULSE
--                                   (HBMessageData/path path)))]
--            (if (= (.get_type response) 
HBServerMessageType/GET_PULSE_RESPONSE)
--              (try 
--                (.get_details (.get_pulse (.get_data response)))
--                (catch Exception e
--                  (throw (HBExecutionException. (.toString e)))))
--              (throw (HBExecutionException. "Invalid Response Type"))))))
--      
--      (get_worker_hb_children [this path watch?]
-         (util/retry-on-exception
 -        (retry-on-exception
--         max-retries
--         "get_worker_hb_children"
--         #(let [response
--                (.send pacemaker-client
--                       (HBMessage. HBServerMessageType/GET_ALL_NODES_FOR_PATH
--                                   (HBMessageData/path path)))]
--            (if (= (.get_type response) 
HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE)
--              (try
--                (into [] (.get_pulseIds (.get_nodes (.get_data response))))
--                (catch Exception e
--                  (throw (HBExecutionException. (.toString e)))))
--              (throw (HBExecutionException. "Invalid Response Type"))))))
--      
--      (close [this]
--        (.close zk-state)
--        (.close pacemaker-client)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/stats.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/stats.clj
index 0bf1757,8b37fc3..8632ed3
--- a/storm-core/src/clj/org/apache/storm/stats.clj
+++ b/storm-core/src/clj/org/apache/storm/stats.clj
@@@ -24,8 -24,8 +24,9 @@@
              ExecutorAggregateStats SpecificAggregateStats
              SpoutAggregateStats TopologyPageInfo TopologyStats])
    (:import [org.apache.storm.utils Utils])
 +  (:import [org.apache.storm.cluster StormClusterStateImpl])
-   (:import [org.apache.storm.metric.internal MultiCountStatAndMetric 
MultiLatencyStatAndMetric])
+   (:import [org.apache.storm.metric.internal MultiCountStatAndMetric 
MultiLatencyStatAndMetric]
+            [java.util Collection])
    (:use [org.apache.storm log util])
    (:use [clojure.math.numeric-tower :only [ceil]]))
  

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/testing.clj
index 5a0bdf2,c872742..eef7754
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@@ -44,11 -45,12 +45,12 @@@
    (:import [org.apache.storm.transactional.partitioned 
PartitionedTransactionalSpoutExecutor])
    (:import [org.apache.storm.tuple Tuple])
    (:import [org.apache.storm.generated StormTopology])
-   (:import [org.apache.storm.task TopologyContext])
+   (:import [org.apache.storm.task TopologyContext]
+            (org.apache.storm.messaging IContext)
+            [org.json.simple JSONValue])
 -  (:require [org.apache.storm [zookeeper :as zk]])
 +  (:import [org.apache.storm.cluster ZKStateStorage ClusterStateContext 
StormClusterStateImpl ClusterUtils])
-   (:require [org.apache.storm.messaging.loader :as msg-loader])
    (:require [org.apache.storm.daemon.acker :as acker])
 -  (:use [org.apache.storm cluster util thrift config log local-state]))
 +  (:use [org.apache.storm util thrift config log local-state converter]))
  
  (defn feeder-spout
    [fields]

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/thrift.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/thrift.clj
index 4dc21f9,779c1d1..7aab729
--- a/storm-core/src/clj/org/apache/storm/thrift.clj
+++ b/storm-core/src/clj/org/apache/storm/thrift.clj
@@@ -29,8 -29,9 +29,9 @@@
    (:import [org.apache.storm.grouping CustomStreamGrouping])
    (:import [org.apache.storm.topology TopologyBuilder])
    (:import [org.apache.storm.clojure RichShellBolt RichShellSpout])
-   (:import [org.apache.thrift.transport TTransport])
+   (:import [org.apache.thrift.transport TTransport]
+            (org.json.simple JSONValue))
 -  (:use [org.apache.storm util config log zookeeper]))
 +  (:use [org.apache.storm util config log]))
  
  (defn instantiate-java-object
    [^JavaObject obj]

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/util.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/util.clj
index 165d8ee,f685d12..72778bb
--- a/storm-core/src/clj/org/apache/storm/util.clj
+++ b/storm-core/src/clj/org/apache/storm/util.clj
@@@ -20,9 -20,8 +20,9 @@@
    (:import [java.io FileReader FileNotFoundException])
    (:import [java.nio.file Paths])
    (:import [org.apache.storm Config])
-   (:import [org.apache.storm.generated ErrorInfo])
-   (:import [org.apache.storm.utils Time Container ClojureTimerTask Utils
-             MutableObject MutableInt])
++ (:import [org.apache.storm.generated ErrorInfo])
+   (:import [org.apache.storm.utils Time ClojureTimerTask Utils
+             MutableObject])
    (:import [org.apache.storm.security.auth NimbusPrincipal])
    (:import [javax.security.auth Subject])
    (:import [java.util UUID Random ArrayList List Collections])
@@@ -262,58 -163,9 +164,19 @@@
                     (instance? Boolean x) (boolean x)
                     true x))
             s))
 +; move this func form convert.clj due to cyclic load dependency
 +(defn clojurify-error [^ErrorInfo error]
 +  (if error
 +    {
 +      :error (.get_error error)
 +      :time-secs (.get_error_time_secs error)
 +      :host (.get_host error)
 +      :port (.get_port error)
 +      }
 +    ))
  
- (defmacro with-file-lock
-   [path & body]
-   `(let [f# (File. ~path)
-          _# (.createNewFile f#)
-          rf# (RandomAccessFile. f# "rw")
-          lock# (.. rf# (getChannel) (lock))]
-      (try
-        ~@body
-        (finally
-          (.release lock#)
-          (.close rf#)))))
- 
- (defn tokenize-path
-   [^String path]
-   (let [toks (.split path "/")]
-     (vec (filter (complement empty?) toks))))
- 
- (defn assoc-conj
-   [m k v]
-   (merge-with concat m {k [v]}))
- 
- ;; returns [ones in first set not in second, ones in second set not in first]
- (defn set-delta
-   [old curr]
-   (let [s1 (set old)
-         s2 (set curr)]
-     [(set/difference s1 s2) (set/difference s2 s1)]))
- 
- (defn parent-path
-   [path]
-   (let [toks (tokenize-path path)]
-     (str "/" (str/join "/" (butlast toks)))))
- 
- (defn toks->path
-   [toks]
-   (str "/" (str/join "/" toks)))
- 
- (defn normalize-path
-   [^String path]
-   (toks->path (tokenize-path path)))
- 
+ ;TODO: We're keeping this function around until all the code using it is 
properly tranlated to java
+ ;TODO: by properly having the for loop IN THE JAVA FUNCTION that originally 
used this function.
  (defn map-val
    [afn amap]
    (into {}

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
index 1226c55,0000000..a9c4d89
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
@@@ -1,212 -1,0 +1,212 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.cluster;
 +
 +import clojure.lang.APersistentMap;
 +import org.apache.curator.framework.state.ConnectionStateListener;
 +import org.apache.storm.callback.ZKStateChangedCallback;
 +import org.apache.storm.generated.*;
 +import org.apache.storm.pacemaker.PacemakerClient;
 +import org.apache.storm.utils.Utils;
 +import org.apache.zookeeper.data.ACL;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.util.List;
 +
 +public class PaceMakerStateStorage implements IStateStorage {
 +
 +    private static Logger LOG = 
LoggerFactory.getLogger(PaceMakerStateStorage.class);
 +
 +    private PacemakerClient pacemakerClient;
 +    private IStateStorage stateStorage;
 +    private static final int maxRetries = 10;
 +
 +    public PaceMakerStateStorage(PacemakerClient pacemakerClient, 
IStateStorage stateStorage) throws Exception {
 +        this.pacemakerClient = pacemakerClient;
 +        this.stateStorage = stateStorage;
 +    }
 +
 +    @Override
 +    public String register(ZKStateChangedCallback callback) {
 +        return stateStorage.register(callback);
 +    }
 +
 +    @Override
 +    public void unregister(String id) {
 +        stateStorage.unregister(id);
 +    }
 +
 +    @Override
 +    public String create_sequential(String path, byte[] data, List<ACL> acls) 
{
 +        return stateStorage.create_sequential(path, data, acls);
 +    }
 +
 +    @Override
 +    public void mkdirs(String path, List<ACL> acls) {
 +        stateStorage.mkdirs(path, acls);
 +    }
 +
 +    @Override
 +    public void delete_node(String path) {
 +        stateStorage.delete_node(path);
 +    }
 +
 +    @Override
 +    public void set_ephemeral_node(String path, byte[] data, List<ACL> acls) {
 +        stateStorage.set_ephemeral_node(path, data, acls);
 +    }
 +
 +    @Override
 +    public Integer get_version(String path, boolean watch) throws Exception {
 +        return stateStorage.get_version(path, watch);
 +    }
 +
 +    @Override
 +    public boolean node_exists(String path, boolean watch) {
 +        return stateStorage.node_exists(path, watch);
 +    }
 +
 +    @Override
 +    public List<String> get_children(String path, boolean watch) {
 +        return stateStorage.get_children(path, watch);
 +    }
 +
 +    @Override
 +    public void close() {
 +        stateStorage.close();
 +        pacemakerClient.close();
 +    }
 +
 +    @Override
 +    public void set_data(String path, byte[] data, List<ACL> acls) {
 +        stateStorage.set_data(path, data, acls);
 +    }
 +
 +    @Override
 +    public byte[] get_data(String path, boolean watch) {
 +        return stateStorage.get_data(path, watch);
 +    }
 +
 +    @Override
 +    public APersistentMap get_data_with_version(String path, boolean watch) {
 +        return stateStorage.get_data_with_version(path, watch);
 +    }
 +
 +    @Override
 +    public void set_worker_hb(String path, byte[] data, List<ACL> acls) {
 +        int retry = maxRetries;
 +        while (true) {
 +            try {
 +                HBPulse hbPulse = new HBPulse();
 +                hbPulse.set_id(path);
 +                hbPulse.set_details(data);
 +                HBMessage message = new 
HBMessage(HBServerMessageType.SEND_PULSE, HBMessageData.pulse(hbPulse));
 +                HBMessage response = pacemakerClient.send(message);
 +                if (response.get_type() != 
HBServerMessageType.SEND_PULSE_RESPONSE) {
 +                    throw new HBExecutionException("Invalid Response Type");
 +                }
 +                LOG.debug("Successful set_worker_hb");
 +                break;
 +            } catch (Exception e) {
 +                if (retry <= 0) {
 +                    throw Utils.wrapInRuntime(e);
 +                }
 +                LOG.error("{} Failed to set_worker_hb. Will make {} more 
attempts.", e.getMessage(), retry--);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public byte[] get_worker_hb(String path, boolean watch) {
 +        int retry = maxRetries;
 +        while (true) {
 +            try {
 +                HBMessage message = new 
HBMessage(HBServerMessageType.GET_PULSE, HBMessageData.path(path));
 +                HBMessage response = pacemakerClient.send(message);
 +                if (response.get_type() != 
HBServerMessageType.GET_PULSE_RESPONSE) {
 +                    throw new HBExecutionException("Invalid Response Type");
 +                }
 +                LOG.debug("Successful get_worker_hb");
 +                return response.get_data().get_pulse().get_details();
 +            } catch (Exception e) {
 +                if (retry <= 0) {
 +                    throw Utils.wrapInRuntime(e);
 +                }
 +                LOG.error("{} Failed to get_worker_hb. Will make {} more 
attempts.", e.getMessage(), retry--);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public List<String> get_worker_hb_children(String path, boolean watch) {
 +        int retry = maxRetries;
 +        while (true) {
 +            try {
-                 HBMessage message = new 
HBMessage(HBServerMessageType.GET_PULSE, HBMessageData.path(path));
++                HBMessage message = new 
HBMessage(HBServerMessageType.GET_ALL_NODES_FOR_PATH, HBMessageData.path(path));
 +                HBMessage response = pacemakerClient.send(message);
 +                if (response.get_type() != 
HBServerMessageType.GET_ALL_NODES_FOR_PATH_RESPONSE) {
 +                    throw new HBExecutionException("Invalid Response Type");
 +                }
 +                LOG.debug("Successful get_worker_hb");
 +                return response.get_data().get_nodes().get_pulseIds();
 +            } catch (Exception e) {
 +                if (retry <= 0) {
 +                    throw Utils.wrapInRuntime(e);
 +                }
 +                LOG.error("{} Failed to get_worker_hb_children. Will make {} 
more attempts.", e.getMessage(), retry--);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public void delete_worker_hb(String path) {
 +        int retry = maxRetries;
 +        while (true) {
 +            try {
-                 HBMessage message = new 
HBMessage(HBServerMessageType.GET_PULSE, HBMessageData.path(path));
++                HBMessage message = new 
HBMessage(HBServerMessageType.DELETE_PATH, HBMessageData.path(path));
 +                HBMessage response = pacemakerClient.send(message);
 +                if (response.get_type() != 
HBServerMessageType.DELETE_PATH_RESPONSE) {
 +                    throw new HBExecutionException("Invalid Response Type");
 +                }
 +                LOG.debug("Successful get_worker_hb");
 +                break;
 +            } catch (Exception e) {
 +                if (retry <= 0) {
 +                    throw Utils.wrapInRuntime(e);
 +                }
 +                LOG.error("{} Failed to delete_worker_hb. Will make {} more 
attempts.", e.getMessage(), retry--);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public void add_listener(ConnectionStateListener listener) {
 +        stateStorage.add_listener(listener);
 +    }
 +
 +    @Override
 +    public void sync_path(String path) {
 +        stateStorage.sync_path(path);
 +    }
 +
 +    @Override
 +    public void delete_node_blobstore(String path, String nimbusHostPortInfo) 
{
 +        stateStorage.delete_node_blobstore(path, nimbusHostPortInfo);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
index af0e8f3,34f3665..20d6deb
--- a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
+++ b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
@@@ -60,6 -60,6 +60,10 @@@ public class PacemakerClient implement
      private StormBoundedExponentialBackoffRetry backoff = new 
StormBoundedExponentialBackoffRetry(100, 5000, 20);
      private int retryTimes = 0;
  
++    //the constructor is invoked by pacemaker-state-factory-test
++    public PacemakerClient() {
++        bootstrap = new ClientBootstrap();
++    }
      public PacemakerClient(Map config) {
  
          String host = (String)config.get(Config.PACEMAKER_HOST);
@@@ -157,6 -157,7 +161,7 @@@
      public String secretKey() {
          return secret;
      }
 -
++    public HBMessage  checkCaptured() {return null;}
      public HBMessage send(HBMessage m) {
          waitUntilReady();
          LOG.debug("Sending message: {}", m.toString());

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/test/clj/integration/org/apache/storm/integration_test.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/test/clj/org/apache/storm/cluster_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/cluster_test.clj
index 39adb9e,b146cb0..6c32d54
--- a/storm-core/test/clj/org/apache/storm/cluster_test.clj
+++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj
@@@ -22,11 -22,11 +22,11 @@@
    (:import [org.mockito Mockito])
    (:import [org.mockito.exceptions.base MockitoAssertionError])
    (:import [org.apache.curator.framework CuratorFramework 
CuratorFrameworkFactory CuratorFrameworkFactory$Builder])
-   (:import [org.apache.storm.utils Utils TestUtils ZookeeperAuthInfo 
ConfigUtils])
+   (:import [org.apache.storm.utils Time Utils ZookeeperAuthInfo ConfigUtils])
 -  (:import [org.apache.storm.cluster ClusterState])
 +  (:import [org.apache.storm.cluster IStateStorage ZKStateStorage 
ClusterStateContext StormClusterStateImpl ClusterUtils])
    (:import [org.apache.storm.zookeeper Zookeeper])
 -  (:import [org.apache.storm.testing.staticmocking MockedZookeeper])
 -  (:require [org.apache.storm [zookeeper :as zk]])
 +  (:import [org.apache.storm.callback ZKStateChangedCallback])
 +  (:import [org.apache.storm.testing.staticmocking MockedZookeeper 
MockedCluster])
    (:require [conjure.core])
    (:use [conjure core])
    (:use [clojure test])
@@@ -39,14 -39,18 +39,18 @@@
  
  (defn mk-state
    ([zk-port] (let [conf (mk-config zk-port)]
 -               (mk-distributed-cluster-state conf :auth-conf conf)))
 +               (ClusterUtils/mkStateStorage conf conf nil 
(ClusterStateContext.))))
    ([zk-port cb]
 -     (let [ret (mk-state zk-port)]
 -       (.register ret cb)
 -       ret )))
 +    (let [ret (mk-state zk-port)]
 +      (.register ret cb)
 +      ret)))
  
 -(defn mk-storm-state [zk-port] (mk-storm-cluster-state (mk-config zk-port)))
 +(defn mk-storm-state [zk-port] (ClusterUtils/mkStormClusterState (mk-config 
zk-port) nil (ClusterStateContext.)))
  
+ (defn barr
+   [& vals]
+   (byte-array (map byte vals)))
+ 
  (deftest test-basics
    (with-inprocess-zookeeper zk-port
      (let [state (mk-state zk-port)]
@@@ -242,27 -244,26 +246,32 @@@
        (is (.contains (:error error) target))
        )))
  
++(defn- stringify-error [error]
++  (let [result (java.io.StringWriter.)
++        printer (java.io.PrintWriter. result)]
++    (.printStackTrace error printer)
++    (.toString result)))
  
  (deftest test-storm-cluster-state-errors
    (with-inprocess-zookeeper zk-port
      (with-simulated-time
        (let [state (mk-storm-state zk-port)]
-         (.reportError state "a" "1" (local-hostname) 6700 (stringify-error 
(RuntimeException.)))
 -        (.report-error state "a" "1" (Utils/localHostname) 6700 
(RuntimeException.))
++        (.reportError state "a" "1" (Utils/localHostname) 6700 
(stringify-error (RuntimeException.)))
          (validate-errors! state "a" "1" ["RuntimeException"])
          (advance-time-secs! 1)
-         (.reportError state "a" "1" (local-hostname) 6700 (stringify-error 
(IllegalArgumentException.)))
 -        (.report-error state "a" "1" (Utils/localHostname) 6700 
(IllegalArgumentException.))
++        (.reportError state "a" "1" (Utils/localHostname) 6700 
(stringify-error (IllegalArgumentException.)))
          (validate-errors! state "a" "1" ["IllegalArgumentException" 
"RuntimeException"])
          (doseq [i (range 10)]
-           (.reportError state "a" "2" (local-hostname) 6700 (stringify-error 
(RuntimeException.)))
 -          (.report-error state "a" "2" (Utils/localHostname) 6700 
(RuntimeException.))
++          (.reportError state "a" "2" (Utils/localHostname) 6700 
(stringify-error (RuntimeException.)))
            (advance-time-secs! 2))
          (validate-errors! state "a" "2" (repeat 10 "RuntimeException"))
          (doseq [i (range 5)]
-           (.reportError state "a" "2" (local-hostname) 6700 (stringify-error 
(IllegalArgumentException.)))
 -          (.report-error state "a" "2" (Utils/localHostname) 6700 
(IllegalArgumentException.))
++          (.reportError state "a" "2" (Utils/localHostname) 6700 
(stringify-error (IllegalArgumentException.)))
            (advance-time-secs! 2))
          (validate-errors! state "a" "2" (concat (repeat 5 
"IllegalArgumentException")
 -                                                (repeat 5 "RuntimeException")
 -                                                ))
 +                                          (repeat 5 "RuntimeException")
 +                                          ))
 +
          (.disconnect state)
          ))))
  
@@@ -300,12 -301,12 +309,12 @@@
        (. (Mockito/when (.connectString builder (Mockito/anyString))) 
(thenReturn builder))
        (. (Mockito/when (.connectionTimeoutMs builder (Mockito/anyInt))) 
(thenReturn builder))
        (. (Mockito/when (.sessionTimeoutMs builder (Mockito/anyInt))) 
(thenReturn builder))
-       (TestUtils/testSetupBuilder builder (str zk-port "/") conf 
(ZookeeperAuthInfo. conf))
+       (Utils/testSetupBuilder builder (str zk-port "/") conf 
(ZookeeperAuthInfo. conf))
        (is (nil?
 -           (try
 -             (. (Mockito/verify builder) (authorization "digest" (.getBytes 
(conf STORM-ZOOKEEPER-AUTH-PAYLOAD))))
 -             (catch MockitoAssertionError e
 -               e)))))))
 +            (try
 +              (. (Mockito/verify builder) (authorization "digest" (.getBytes 
(conf STORM-ZOOKEEPER-AUTH-PAYLOAD))))
 +              (catch MockitoAssertionError e
 +                e)))))))
  
  (deftest test-storm-state-callbacks
    ;; TODO finish
@@@ -313,17 -314,15 +322,17 @@@
  
  (deftest test-cluster-state-default-acls
    (testing "The default ACLs are empty."
 -    (let [zk-mock (Mockito/mock Zookeeper)]
 +    (let [zk-mock (Mockito/mock Zookeeper)
 +          curator-frameworke (reify CuratorFramework (^void close [this] 
nil))]
        ;; No need for when clauses because we just want to return nil
        (with-open [_ (MockedZookeeper. zk-mock)]
 -        (stubbing [zk/mk-client (reify CuratorFramework (^void close [this] 
nil))]
 -          (mk-distributed-cluster-state {})
 -          (.mkdirsImpl (Mockito/verify zk-mock (Mockito/times 1)) 
(Mockito/any) (Mockito/anyString) (Mockito/eq nil)))))
 -    (stubbing [mk-distributed-cluster-state (reify ClusterState
 -                                              (register [this callback] nil)
 -                                              (mkdirs [this path acls] nil))]
 -     (mk-storm-cluster-state {})
 -     (verify-call-times-for mk-distributed-cluster-state 1)
 -     (verify-first-call-args-for-indices mk-distributed-cluster-state [4] 
nil))))
 +        (. (Mockito/when (.mkClientImpl zk-mock (Mockito/anyMap) 
(Mockito/anyList) (Mockito/any) (Mockito/anyString) (Mockito/any) 
(Mockito/anyMap))) (thenReturn curator-frameworke))
 +        (ClusterUtils/mkStateStorage {} nil nil (ClusterStateContext.))
 +        (.mkdirsImpl (Mockito/verify zk-mock (Mockito/times 1)) (Mockito/any) 
(Mockito/anyString) (Mockito/eq nil))))
 +    (let [distributed-state-storage (reify IStateStorage
 +                                      (register [this callback] nil)
 +                                      (mkdirs [this path acls] nil))
 +          cluster-utils (Mockito/mock ClusterUtils)]
 +      (with-open [mocked-cluster (MockedCluster. cluster-utils)]
-         (. (Mockito/when (mkStateStorageImpl cluster-utils (Mockito/any) 
(Mockito/any) (Mockito/eq nil) (Mockito/any))) (thenReturn 
distributed-state-storage))
++        (. (Mockito/when (.mkStateStorageImpl cluster-utils (Mockito/any) 
(Mockito/any) (Mockito/eq nil) (Mockito/any))) (thenReturn 
distributed-state-storage))
 +        (ClusterUtils/mkStormClusterState {} nil (ClusterStateContext.))))))

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/test/clj/org/apache/storm/nimbus_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/nimbus_test.clj
index 772a232,70cb885..2a65efc
--- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj
+++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj
@@@ -23,27 -23,33 +23,36 @@@
             [org.apache.storm.nimbus InMemoryTopologyActionNotifier])
    (:import [org.apache.storm.testing.staticmocking MockedZookeeper])
    (:import [org.apache.storm.scheduler INimbus])
 +  (:import [org.mockito Mockito])
 +  (:import [org.mockito.exceptions.base MockitoAssertionError])
    (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo])
-   (:import [org.apache.storm.testing.staticmocking MockedConfigUtils 
MockedCluster])
++  (:import [org.apache.storm.testing.staticmocking MockedCluster])
    (:import [org.apache.storm.generated Credentials NotAliveException 
SubmitOptions
              TopologyInitialStatus TopologyStatus AlreadyAliveException 
KillOptions RebalanceOptions
              InvalidTopologyException AuthorizationException
              LogConfig LogLevel LogLevelAction])
    (:import [java.util HashMap])
    (:import [java.io File])
-   (:import [org.apache.storm.utils Time Utils ConfigUtils])
+   (:import [org.apache.storm.utils Time Utils Utils$UptimeComputer 
ConfigUtils IPredicate]
+            [org.apache.storm.utils.staticmocking ConfigUtilsInstaller 
UtilsInstaller])
    (:import [org.apache.storm.zookeeper Zookeeper])
-   (:import [org.apache.commons.io FileUtils])
+   (:import [org.apache.commons.io FileUtils]
+            [org.json.simple JSONValue])
 -  (:use [org.apache.storm testing MockAutoCred util config log timer 
zookeeper])
 +  (:import [org.apache.storm.cluster StormClusterStateImpl 
ClusterStateContext ClusterUtils])
 +  (:use [org.apache.storm testing MockAutoCred util config log timer 
converter])
    (:use [org.apache.storm.daemon common])
    (:require [conjure.core])
    (:require [org.apache.storm
 -             [thrift :as thrift]
 -             [cluster :as cluster]])
 +             [thrift :as thrift]])
    (:use [conjure core]))
  
+ (defn- from-json
+        [^String str]
+        (if str
+          (clojurify-structure
+            (JSONValue/parse str))
+          nil))
+ 
  (defn storm-component->task-info [cluster storm-name]
    (let [storm-id (get-storm-id (:storm-cluster-state cluster) storm-name)
          nimbus (:nimbus cluster)]
@@@ -72,8 -80,8 +83,8 @@@
  
  (defn storm-num-workers [state storm-name]
    (let [storm-id (get-storm-id state storm-name)
 -        assignment (.assignment-info state storm-id nil)]
 +        assignment (clojurify-assignment (.assignmentInfo state storm-id 
nil))]
-     (count (reverse-map (:executor->node+port assignment)))
+     (count (clojurify-structure (Utils/reverseMap (:executor->node+port 
assignment))))
      ))
  
  (defn topology-nodes [state storm-name]
@@@ -95,9 -103,11 +106,11 @@@
           set         
           )))
  
+ ;TODO: when translating this function, don't call map-val, but instead use an 
inline for loop.
+ ; map-val is a temporary kluge for clojure.
  (defn topology-node-distribution [state storm-name]
    (let [storm-id (get-storm-id state storm-name)
 -        assignment (.assignment-info state storm-id nil)]
 +        assignment (clojurify-assignment (.assignmentInfo state storm-id 
nil))]
      (->> assignment
           :executor->node+port
           vals
@@@ -124,19 -134,18 +137,18 @@@
  
  (defn do-executor-heartbeat [cluster storm-id executor]
    (let [state (:storm-cluster-state cluster)
 -        executor->node+port (:executor->node+port (.assignment-info state 
storm-id nil))
 +        executor->node+port (:executor->node+port (clojurify-assignment 
(.assignmentInfo state storm-id nil)))
          [node port] (get executor->node+port executor)
 -        curr-beat (.get-worker-heartbeat state storm-id node port)
 +        curr-beat (clojurify-zk-worker-hb (.getWorkerHeartbeat state storm-id 
node port))
          stats (:executor-stats curr-beat)]
 -    (.worker-heartbeat! state storm-id node port
 -      {:storm-id storm-id :time-secs (Time/currentTimeSecs) :uptime 10 
:executor-stats (merge stats {executor (stats/render-stats! 
(stats/mk-bolt-stats 20))})}
 +    (.workerHeartbeat state storm-id node port
-       (thriftify-zk-worker-hb {:storm-id storm-id :time-secs 
(current-time-secs) :uptime 10 :executor-stats (merge stats {executor 
(stats/render-stats! (stats/mk-bolt-stats 20))})})
++      (thriftify-zk-worker-hb {:storm-id storm-id :time-secs 
(Time/currentTimeSecs) :uptime 10 :executor-stats (merge stats {executor 
(stats/render-stats! (stats/mk-bolt-stats 20))})})
        )))
  
  (defn slot-assignments [cluster storm-id]
    (let [state (:storm-cluster-state cluster)
 -        assignment (.assignment-info state storm-id nil)]
 +        assignment (clojurify-assignment (.assignmentInfo state storm-id 
nil))]
-     (reverse-map (:executor->node+port assignment))
-     ))
+         (clojurify-structure (Utils/reverseMap (:executor->node+port 
assignment)))))
  
  (defn task-ids [cluster storm-id]
    (let [nimbus (:nimbus cluster)]
@@@ -146,8 -155,10 +158,10 @@@
  
  (defn topology-executors [cluster storm-id]
    (let [state (:storm-cluster-state cluster)
-         assignment (clojurify-assignment (.assignmentInfo state storm-id 
nil))]
-     (keys (:executor->node+port assignment))
 -        assignment (.assignment-info state storm-id nil)
 -        ret-keys (keys (:executor->node+port assignment))
++        assignment (clojurify-assignment (.assignmentInfo state storm-id nil))
++    ret-keys (keys (:executor->node+port assignment))
+         _ (log-message "ret-keys: " (pr-str ret-keys)) ]
+     ret-keys
      ))
  
  (defn check-distribution [items distribution]
@@@ -1350,23 -1399,27 +1402,29 @@@
                       NIMBUS-THRIFT-PORT 6666})
            expected-acls nimbus/NIMBUS-ZK-ACLS
            fake-inimbus (reify INimbus (getForcedScheduler [this] nil))
+           fake-cu (proxy [ConfigUtils] []
 -                      (nimbusTopoHistoryStateImpl [conf] nil))
++                    (nimbusTopoHistoryStateImpl [conf] nil))
+           fake-utils (proxy [Utils] []
+                        (newInstanceImpl [_])
+                        (makeUptimeComputer [] (proxy [Utils$UptimeComputer] []
 -                                                (upTime [] 0))))]
++                                                (upTime [] 0))))
 +          cluster-utils (Mockito/mock ClusterUtils)]
-       (with-open [_ (proxy [MockedConfigUtils] []
+       (with-open [_ (ConfigUtilsInstaller. fake-cu)
+                   _ (UtilsInstaller. fake-utils)
++                  _ (proxy [ConfigUtils] []
 +                      (nimbusTopoHistoryStateImpl [conf] nil))
                    zk-le (MockedZookeeper. (proxy [Zookeeper] []
 -                          (zkLeaderElectorImpl [conf] nil)))]
 +                          (zkLeaderElectorImpl [conf] nil)))
 +                  mocked-cluster (MockedCluster. cluster-utils)]
          (stubbing [mk-authorization-handler nil
 -                   cluster/mk-storm-cluster-state nil
 -                   nimbus/file-cache-map nil
 -                   nimbus/mk-blob-cache-map nil
 -                   nimbus/mk-bloblist-cache-map nil
 -                   mk-timer nil
 -                   nimbus/mk-scheduler nil]
 -                  (nimbus/nimbus-data auth-conf fake-inimbus)
 -                  (verify-call-times-for cluster/mk-storm-cluster-state 1)
 -                  (verify-first-call-args-for-indices 
cluster/mk-storm-cluster-state [2]
 -                                                      expected-acls))))))
 +                 nimbus/file-cache-map nil
 +                 nimbus/mk-blob-cache-map nil
 +                 nimbus/mk-bloblist-cache-map nil
-                  uptime-computer nil
-                  new-instance nil
 +                 mk-timer nil
 +                 nimbus/mk-scheduler nil]
 +          (nimbus/nimbus-data auth-conf fake-inimbus)
 +          (.mkStormClusterStateImpl (Mockito/verify cluster-utils 
(Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any))
 +          )))))
  
  (deftest test-file-bogus-download
    (with-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false 
TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}]
@@@ -1397,9 -1450,9 +1455,9 @@@
                        STORM-CLUSTER-MODE "local"
                        STORM-ZOOKEEPER-PORT zk-port
                        STORM-LOCAL-DIR nimbus-dir}))
 -        (bind cluster-state (cluster/mk-storm-cluster-state conf))
 +        (bind cluster-state (ClusterUtils/mkStormClusterState conf nil 
(ClusterStateContext.)))
          (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus)))
-         (sleep-secs 1)
+         (Time/sleepSecs 1)
          (bind topology (thrift/mk-topology
                           {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) 
:parallelism-hint 3)}
                           {}))
@@@ -1429,10 -1482,10 +1487,10 @@@
                          STORM-ZOOKEEPER-PORT zk-port
                          STORM-LOCAL-DIR nimbus-dir
                          NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN (.getName 
InMemoryTopologyActionNotifier)}))
 -          (bind cluster-state (cluster/mk-storm-cluster-state conf))
 +          (bind cluster-state (ClusterUtils/mkStormClusterState conf nil 
(ClusterStateContext.)))
            (bind nimbus (nimbus/service-handler conf 
(nimbus/standalone-nimbus)))
            (bind notifier (InMemoryTopologyActionNotifier.))
-           (sleep-secs 1)
+           (Time/sleepSecs 1)
            (bind topology (thrift/mk-topology
                             {"1" (thrift/mk-spout-spec (TestPlannerSpout. 
true) :parallelism-hint 3)}
                             {}))

http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj
----------------------------------------------------------------------
diff --cc storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj
index 1a7bd2c,0925237..1c45266
--- a/storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj
+++ b/storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj
@@@ -19,10 -20,8 +19,11 @@@
    (:import [org.apache.storm.generated
              HBExecutionException HBNodes HBRecords
              HBServerMessageType HBMessage HBMessageData HBPulse]
-            [org.apache.storm.cluster ClusterStateContext  
PaceMakerStateStorageFactory]
 -           [org.apache.storm.cluster ClusterStateContext]
 -           [org.mockito Mockito Matchers]))
++           [org.apache.storm.cluster ClusterStateContext  
PaceMakerStateStorageFactory PaceMakerStateStorage]
 +           [org.mockito Mockito Matchers])
 +(:import [org.mockito.exceptions.base MockitoAssertionError])
++(:import [org.apache.storm.pacemaker PacemakerClient])
 +(:import [org.apache.storm.testing.staticmocking 
MockedPaceMakerStateStorageFactory]))
  
  (defn- string-to-bytes [string]
    (byte-array (map int string)))
@@@ -30,26 -29,24 +31,23 @@@
  (defn- bytes-to-string [bytez]
    (apply str (map char bytez)))
  
--(defprotocol send-capture
--  (send [this something])
--  (check-captured [this]))
--
  (defn- make-send-capture [response]
    (let [captured (atom nil)]
--    (reify send-capture
--      (send [this something] (reset! captured something) response)
--      (check-captured [this] @captured))))
 -
 -(defmacro with-mock-pacemaker-client-and-state [client state response & body]
 -  `(let [~client (make-send-capture ~response)]
 -     (stubbing [psf/makeZKState nil
 -                psf/makeClient ~client]
 -               (let [~state (psf/-mkState nil nil nil nil 
(ClusterStateContext.))]
++    (proxy [PacemakerClient] []
++      (send [m] (reset! captured m) response)
++      (checkCaptured [] @captured))))
 +
 +(defmacro with-mock-pacemaker-client-and-state [client state pacefactory mock 
response & body]
 +  `(let [~client (make-send-capture ~response)
 +         ~pacefactory (Mockito/mock PaceMakerStateStorageFactory)]
 +
 +     (with-open [~mock (MockedPaceMakerStateStorageFactory. ~pacefactory)]
 +       (. (Mockito/when (.initZKstateImpl ~pacefactory (Mockito/any) 
(Mockito/any) (Mockito/anyList) (Mockito/any))) (thenReturn nil))
 +       (. (Mockito/when (.initMakeClientImpl ~pacefactory (Mockito/any))) 
(thenReturn ~client))
-                (let [~state (.mkStore ~pacefactory nil nil nil 
(ClusterStateContext.))]
++               (let [~state (PaceMakerStateStorage. 
(PaceMakerStateStorageFactory/initMakeClient nil)
++                   (PaceMakerStateStorageFactory/initZKstate nil  nil nil 
nil))]
                   ~@body))))
  
 -
  (deftest pacemaker_state_set_worker_hb
    (testing "set_worker_hb"
      (with-mock-pacemaker-client-and-state
@@@ -57,7 -54,7 +55,7 @@@
        (HBMessage. HBServerMessageType/SEND_PULSE_RESPONSE nil)
  
        (.set_worker_hb state "/foo" (string-to-bytes "data") nil)
--      (let [sent (.check-captured client)
++      (let [sent (.checkCaptured client)
              pulse (.get_pulse (.get_data sent))]
          (is (= (.get_type sent) HBServerMessageType/SEND_PULSE))
          (is (= (.get_id pulse) "/foo"))
@@@ -65,13 -62,13 +63,12 @@@
  
    (testing "set_worker_hb"
      (with-mock-pacemaker-client-and-state
 -      client state
 +      client state pacefactory mock
        (HBMessage. HBServerMessageType/SEND_PULSE nil)
  
-       (is (thrown? RuntimeException      
 -      (is (thrown? HBExecutionException      
--                   (.set_worker_hb state "/foo" (string-to-bytes "data") 
nil))))))
++      (is (thrown? RuntimeException
++            (.set_worker_hb state "/foo" (string-to-bytes "data") nil))))))
  
--      
  
  (deftest pacemaker_state_delete_worker_hb
    (testing "delete_worker_hb"
@@@ -80,74 -77,74 +77,75 @@@
        (HBMessage. HBServerMessageType/DELETE_PATH_RESPONSE nil)
  
        (.delete_worker_hb state "/foo/bar")
--      (let [sent (.check-captured client)]
++      (let [sent (.checkCaptured client)]
          (is (= (.get_type sent) HBServerMessageType/DELETE_PATH))
          (is (= (.get_path (.get_data sent)) "/foo/bar")))))
  
--    (testing "delete_worker_hb"
--      (with-mock-pacemaker-client-and-state
-         client state pacefactory mock
 -        client state
--        (HBMessage. HBServerMessageType/DELETE_PATH nil)
--        
-         (is (thrown? RuntimeException
 -        (is (thrown? HBExecutionException
--                     (.delete_worker_hb state "/foo/bar"))))))
++  (testing "delete_worker_hb"
++    (with-mock-pacemaker-client-and-state
++      client state pacefactory mock
++      (HBMessage. HBServerMessageType/DELETE_PATH nil)
++
++      (is (thrown? RuntimeException
++            (.delete_worker_hb state "/foo/bar"))))))
  
  (deftest pacemaker_state_get_worker_hb
    (testing "get_worker_hb"
      (with-mock-pacemaker-client-and-state
 -      client state
 +      client state pacefactory mock
        (HBMessage. HBServerMessageType/GET_PULSE_RESPONSE
--                (HBMessageData/pulse
--                 (doto (HBPulse.)
--                   (.set_id "/foo")
--                   (.set_details (string-to-bytes "some data")))))
++        (HBMessageData/pulse
++          (doto (HBPulse.)
++            (.set_id "/foo")
++            (.set_details (string-to-bytes "some data")))))
  
        (.get_worker_hb state "/foo" false)
--      (let [sent (.check-captured client)]
++      (let [sent (.checkCaptured client)]
          (is (= (.get_type sent) HBServerMessageType/GET_PULSE))
          (is (= (.get_path (.get_data sent)) "/foo")))))
  
    (testing "get_worker_hb - fail (bad response)"
      (with-mock-pacemaker-client-and-state
 -      client state
 +      client state pacefactory mock
        (HBMessage. HBServerMessageType/GET_PULSE nil)
--      
 -      (is (thrown? HBExecutionException
 -                   (.get_worker_hb state "/foo" false)))))
 -  
++
 +      (is (thrown? RuntimeException
-                    (.get_worker_hb state "/foo" false)))))
-   
++            (.get_worker_hb state "/foo" false)))))
++
    (testing "get_worker_hb - fail (bad data)"
      (with-mock-pacemaker-client-and-state
 -      client state
 +      client state pacefactory mock
        (HBMessage. HBServerMessageType/GET_PULSE_RESPONSE nil)
--      
 -      (is (thrown? HBExecutionException
 -                   (.get_worker_hb state "/foo" false))))))
++
 +      (is (thrown? RuntimeException
-                    (.get_worker_hb state "/foo" false))))))
++            (.get_worker_hb state "/foo" false))))))
  
  (deftest pacemaker_state_get_worker_hb_children
    (testing "get_worker_hb_children"
      (with-mock-pacemaker-client-and-state
 -      client state
 +      client state pacefactory mock
        (HBMessage. HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE
--                (HBMessageData/nodes
--                 (HBNodes. [])))
++        (HBMessageData/nodes
++          (HBNodes. [])))
  
        (.get_worker_hb_children state "/foo" false)
--      (let [sent (.check-captured client)]
++      (let [sent (.checkCaptured client)]
          (is (= (.get_type sent) HBServerMessageType/GET_ALL_NODES_FOR_PATH))
          (is (= (.get_path (.get_data sent)) "/foo")))))
  
    (testing "get_worker_hb_children - fail (bad response)"
      (with-mock-pacemaker-client-and-state
 -      client state
 +      client state pacefactory mock
        (HBMessage. HBServerMessageType/DELETE_PATH nil)
  
 -      (is (thrown? HBExecutionException
 -                   (.get_worker_hb_children state "/foo" false)))))
 +      (is (thrown? RuntimeException
-                    (.get_worker_hb_children state "/foo" false)))))
++            (.get_worker_hb_children state "/foo" false)))))
  
--    (testing "get_worker_hb_children - fail (bad data)"
++  (testing "get_worker_hb_children - fail (bad data)"
      (with-mock-pacemaker-client-and-state
 -      client state
 +      client state pacefactory mock
        (HBMessage. HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE nil)
-       ;need been update due to HBExecutionException
 -      
 -      (is (thrown? HBExecutionException
 -                   (.get_worker_hb_children state "/foo" false))))))
++
 +      (is (thrown? RuntimeException
-                    (.get_worker_hb_children state "/foo" false))))))
++            (.get_worker_hb_children state "/foo" false))))))
++

Reply via email to