Merge branch 'master' into ClusterUtils

Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/924f9a29
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/924f9a29
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/924f9a29

Branch: refs/heads/master
Commit: 924f9a29dc1713921178e61ce2f5b6a14df97ed7
Parents: 2ee8bec 12ceb09
Author: xiaojian.fxj <xiaojian....@alibaba-inc.com>
Authored: Mon Feb 15 15:13:33 2016 +0800
Committer: xiaojian.fxj <xiaojian....@alibaba-inc.com>
Committed: Tue Feb 16 11:14:18 2016 +0800

----------------------------------------------------------------------
 CHANGELOG.md                                    |  16 +
 README.markdown                                 |   1 +
 bin/storm-config.cmd                            |   6 +-
 bin/storm.cmd                                   |  47 +-
 bin/storm.py                                    |   8 +-
 conf/defaults.yaml                              |   4 +
 dev-tools/travis/travis-script.sh               |   4 +-
 .../starter/trident/TridentMapExample.java      | 123 +++
 external/sql/storm-sql-core/pom.xml             |   9 +
 external/storm-elasticsearch/pom.xml            |   2 +
 .../storm/hbase/security/HBaseSecurityUtil.java |  36 +-
 .../jvm/org/apache/storm/kafka/KafkaSpout.java  |   8 +-
 .../jvm/org/apache/storm/kafka/KafkaUtils.java  |  44 +-
 .../apache/storm/kafka/PartitionManager.java    |  42 +-
 .../kafka/trident/TridentKafkaEmitter.java      |  23 +-
 external/storm-mqtt/core/pom.xml                |   4 +-
 log4j2/cluster.xml                              |   2 +-
 log4j2/worker.xml                               |   2 +-
 pom.xml                                         |   9 +-
 storm-core/pom.xml                              |  11 +-
 .../src/clj/org/apache/storm/LocalCluster.clj   |   4 +-
 storm-core/src/clj/org/apache/storm/clojure.clj |   8 +-
 .../clj/org/apache/storm/command/blobstore.clj  |  11 +-
 .../org/apache/storm/command/config_value.clj   |  25 -
 .../org/apache/storm/command/dev_zookeeper.clj  |   6 +-
 .../clj/org/apache/storm/command/get_errors.clj |  12 +-
 .../apache/storm/command/shell_submission.clj   |   4 +-
 storm-core/src/clj/org/apache/storm/config.clj  |  18 +-
 .../src/clj/org/apache/storm/converter.clj      |  14 +-
 .../src/clj/org/apache/storm/daemon/acker.clj   |  21 +-
 .../src/clj/org/apache/storm/daemon/common.clj  |  46 +-
 .../src/clj/org/apache/storm/daemon/drpc.clj    |  25 +-
 .../clj/org/apache/storm/daemon/executor.clj    | 530 +++++-----
 .../clj/org/apache/storm/daemon/logviewer.clj   |  70 +-
 .../src/clj/org/apache/storm/daemon/nimbus.clj  | 172 ++--
 .../clj/org/apache/storm/daemon/supervisor.clj  | 202 ++--
 .../src/clj/org/apache/storm/daemon/task.clj    |   2 +-
 .../src/clj/org/apache/storm/daemon/worker.clj  |  78 +-
 .../src/clj/org/apache/storm/disruptor.clj      |  10 +-
 storm-core/src/clj/org/apache/storm/event.clj   |   2 +-
 .../src/clj/org/apache/storm/local_state.clj    |   9 +-
 .../clj/org/apache/storm/messaging/loader.clj   |  34 -
 .../clj/org/apache/storm/messaging/local.clj    |  23 -
 .../org/apache/storm/pacemaker/pacemaker.clj    |   7 +-
 .../storm/pacemaker/pacemaker_state_factory.clj | 122 ---
 .../clj/org/apache/storm/process_simulator.clj  |   4 +-
 .../apache/storm/scheduler/DefaultScheduler.clj |   7 +-
 .../apache/storm/scheduler/EvenScheduler.clj    |  23 +-
 .../storm/scheduler/IsolationScheduler.clj      |  29 +-
 storm-core/src/clj/org/apache/storm/stats.clj   |  82 +-
 storm-core/src/clj/org/apache/storm/testing.clj |  89 +-
 storm-core/src/clj/org/apache/storm/thrift.clj  |   6 +-
 storm-core/src/clj/org/apache/storm/timer.clj   |  12 +-
 .../clj/org/apache/storm/trident/testing.clj    |   9 +-
 storm-core/src/clj/org/apache/storm/ui/core.clj |  99 +-
 .../src/clj/org/apache/storm/ui/helpers.clj     |  14 +-
 storm-core/src/clj/org/apache/storm/util.clj    | 925 +----------------
 storm-core/src/jvm/org/apache/storm/Config.java |  39 +
 .../org/apache/storm/cluster/ClusterUtils.java  |  49 +-
 .../storm/cluster/IStormClusterState.java       |   9 +-
 .../storm/cluster/PaceMakerStateStorage.java    |   4 +-
 .../storm/cluster/StateStorageFactory.java      |   2 +-
 .../storm/cluster/StormClusterStateImpl.java    |  51 +-
 .../storm/cluster/ZKStateStorageFactory.java    |   4 +-
 .../org/apache/storm/command/ConfigValue.java   |  30 +
 .../storm/daemon/metrics/MetricsUtils.java      | 108 ++
 .../reporters/ConsolePreparableReporter.java    |  76 ++
 .../reporters/CsvPreparableReporter.java        |  80 ++
 .../reporters/JmxPreparableReporter.java        |  70 ++
 .../metrics/reporters/PreparableReporter.java   |  32 +
 .../storm/logging/ThriftAccessLogger.java       |  13 +-
 .../apache/storm/pacemaker/PacemakerClient.java |   5 +
 .../serialization/SerializationFactory.java     |  17 +-
 .../staticmocking/MockedConfigUtils.java        |  31 -
 .../jvm/org/apache/storm/trident/Stream.java    |  87 +-
 .../storm/trident/operation/Consumer.java       |  35 +
 .../trident/operation/FlatMapFunction.java      |  37 +
 .../storm/trident/operation/MapFunction.java    |  36 +
 .../operation/impl/ConsumerExecutor.java        |  38 +
 .../operation/impl/FlatMapFunctionExecutor.java |  43 +
 .../operation/impl/MapFunctionExecutor.java     |  41 +
 .../trident/planner/processor/MapProcessor.java |  87 ++
 .../jvm/org/apache/storm/utils/ConfigUtils.java |  20 +-
 .../jvm/org/apache/storm/utils/Container.java   |  11 +-
 .../jvm/org/apache/storm/utils/IPredicate.java  |  27 +
 .../org/apache/storm/utils/NimbusClient.java    |   2 +-
 .../utils/StormConnectionStateConverter.java    |  44 +
 .../jvm/org/apache/storm/utils/TestUtils.java   |  34 -
 .../src/jvm/org/apache/storm/utils/Time.java    |  26 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   | 989 +++++++++++++++++--
 .../storm/validation/ConfigValidation.java      |   2 +-
 .../org/apache/storm/zookeeper/Zookeeper.java   |   7 +
 .../org/apache/storm/integration_test.clj       |  98 +-
 .../org/apache/storm/testing4j_test.clj         |  37 +-
 .../apache/storm/trident/integration_test.clj   |  15 +-
 .../test/clj/org/apache/storm/cluster_test.clj  |  27 +-
 .../test/clj/org/apache/storm/drpc_test.clj     |  23 +-
 .../clj/org/apache/storm/logviewer_test.clj     | 267 ++---
 .../storm/messaging/netty_integration_test.clj  |   2 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   | 112 ++-
 .../storm/pacemaker_state_factory_test.clj      |  74 +-
 .../scheduler/resource_aware_scheduler_test.clj |  21 +-
 .../apache/storm/security/auth/auth_test.clj    |  11 +-
 .../authorizer/DRPCSimpleACLAuthorizer_test.clj |   2 +-
 .../BlowfishTupleSerializer_test.clj            |   1 -
 .../clj/org/apache/storm/serialization_test.clj |  23 +-
 .../clj/org/apache/storm/supervisor_test.clj    | 649 ++++++------
 .../clj/org/apache/storm/transactional_test.clj |  18 +
 .../clj/org/apache/storm/trident/state_test.clj |   5 +-
 .../clj/org/apache/storm/trident/tuple_test.clj |  15 +-
 .../test/clj/org/apache/storm/utils_test.clj    |  16 +-
 .../test/clj/org/apache/storm/worker_test.clj   |   1 -
 .../staticmocking/ConfigUtilsInstaller.java     |  38 +
 .../utils/staticmocking/UtilsInstaller.java     |  38 +
 .../storm/utils/staticmocking/package-info.java |  95 ++
 115 files changed, 4281 insertions(+), 2648 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/conf/defaults.yaml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
index 7be526d,657e242..ab5cbed
--- a/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
+++ b/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj
@@@ -14,7 -14,8 +14,8 @@@
  ;; See the License for the specific language governing permissions and
  ;; limitations under the License.
  (ns org.apache.storm.command.dev-zookeeper
+   (:import [org.apache.storm.utils Utils])
 -  (:use [org.apache.storm zookeeper util config])
 +  (:use [org.apache.storm util config])
    (:import [org.apache.storm.utils ConfigUtils])
    (:import [org.apache.storm.zookeeper Zookeeper])
    (:gen-class))

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/command/shell_submission.clj
index 3978d2f,0253338..870e7f6
--- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
+++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj
@@@ -15,8 -15,9 +15,9 @@@
  ;; limitations under the License.
  (ns org.apache.storm.command.shell-submission
    (:import [org.apache.storm StormSubmitter]
+            [org.apache.storm.utils Utils]
             [org.apache.storm.zookeeper Zookeeper])
 -  (:use [org.apache.storm thrift util config log zookeeper])
 +  (:use [org.apache.storm thrift util config log])
    (:require [clojure.string :as str])
    (:import [org.apache.storm.utils ConfigUtils])
    (:gen-class))

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/converter.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/daemon/common.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/common.clj
index b144f40,eb1ec1e..db342d2
--- a/storm-core/src/clj/org/apache/storm/daemon/common.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj
@@@ -15,17 -15,19 +15,20 @@@
  ;; limitations under the License.
  (ns org.apache.storm.daemon.common
    (:use [org.apache.storm log config util])
 -  (:import [org.apache.storm.generated StormTopology
 +  (:import [org.apache.storm.generated StormTopology NodeInfo
              InvalidTopologyException GlobalStreamId]
-            [org.apache.storm.utils ThriftTopologyUtils])
-   (:import [org.apache.storm.utils Utils ConfigUtils])
+            [org.apache.storm.utils Utils ConfigUtils IPredicate 
ThriftTopologyUtils]
+            [org.apache.storm.daemon.metrics.reporters PreparableReporter]
+            [com.codahale.metrics MetricRegistry])
+   (:import [org.apache.storm.daemon.metrics MetricsUtils])
    (:import [org.apache.storm.task WorkerTopologyContext])
    (:import [org.apache.storm Constants])
 +  (:import [org.apache.storm.cluster StormClusterStateImpl])
    (:import [org.apache.storm.metric SystemBolt])
    (:import [org.apache.storm.metric EventLoggerBolt])
-   (:import [org.apache.storm.security.auth IAuthorizer]) 
-   (:import [java.io InterruptedIOException])
+   (:import [org.apache.storm.security.auth IAuthorizer])
+   (:import [java.io InterruptedIOException]
+            [org.json.simple JSONValue])
    (:require [clojure.set :as set])  
    (:require [org.apache.storm.daemon.acker :as acker])
    (:require [org.apache.storm.thrift :as thrift])
@@@ -73,12 -83,10 +84,11 @@@
  (defn new-executor-stats []
    (ExecutorStats. 0 0 0 0 0))
  
 +
  (defn get-storm-id [storm-cluster-state storm-name]
-   (let [active-storms (.activeStorms storm-cluster-state)]
-     (find-first
-       #(= storm-name (.get_name (.stormBase storm-cluster-state % nil)))
-       active-storms)
 -  (let [active-storms (.active-storms storm-cluster-state)
 -        pred  (reify IPredicate (test [this x] (= storm-name (:storm-name 
(.storm-base storm-cluster-state x nil)))))]
++  (let [active-storms (.activeStorms storm-cluster-state)
++        pred  (reify IPredicate (test [this x] (= storm-name (.get_name 
(.stormBase storm-cluster-state x nil)))))]
+     (Utils/findOne pred active-storms)
      ))
  
  (defn topology-bases [storm-cluster-state]

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/daemon/executor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/executor.clj
index 49ae6cf,e2380b7..33b89ed
--- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj
@@@ -34,10 -34,13 +34,12 @@@
    (:import [org.apache.storm.daemon Shutdownable])
    (:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo 
IMetricsConsumer$DataPoint StateMetric])
    (:import [org.apache.storm Config Constants])
 -  (:import [org.apache.storm.cluster ClusterStateContext DaemonType])
 +  (:import [org.apache.storm.cluster ClusterStateContext DaemonType 
StormClusterStateImpl ClusterUtils])
    (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping 
LoadAwareShuffleGrouping LoadMapping ShuffleGrouping])
-   (:import [java.util.concurrent ConcurrentLinkedQueue])
+   (:import [java.lang Thread Thread$UncaughtExceptionHandler]
+            [java.util.concurrent ConcurrentLinkedQueue]
+            [org.json.simple JSONValue])
 -  (:require [org.apache.storm [thrift :as thrift]
 -             [cluster :as cluster] [disruptor :as disruptor] [stats :as 
stats]])
 +  (:require [org.apache.storm [thrift :as thrift] [disruptor :as disruptor] 
[stats :as stats]])
    (:require [org.apache.storm.daemon [task :as task]])
    (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics])
    (:require [clojure.set :as set]))
@@@ -206,9 -211,9 +210,9 @@@
        (swap! interval-errors inc)
  
        (when (<= @interval-errors max-per-interval)
 -        (cluster/report-error (:storm-cluster-state executor) (:storm-id 
executor) (:component-id executor)
 +        (.reportError (:storm-cluster-state executor) (:storm-id executor) 
(:component-id executor)
-                               (hostname storm-conf)
+                               (Utils/hostname storm-conf)
 -                              (.getThisWorkerPort (:worker-context executor)) 
error)
 +          (long (.getThisWorkerPort (:worker-context executor))) error)
          ))))
  
  ;; in its own function so that it can be mocked out by tracked topologies

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
index daf5e45,710cd83..6bdbdc0
--- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj
@@@ -38,9 -39,9 +39,9 @@@
    (:import [org.apache.storm.scheduler INimbus SupervisorDetails WorkerSlot 
TopologyDetails
              Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl 
DefaultScheduler ExecutorDetails])
    (:import [org.apache.storm.nimbus NimbusInfo])
-   (:import [org.apache.storm.utils TimeCacheMap TimeCacheMap$ExpiredCallback 
Utils ConfigUtils TupleUtils ThriftTopologyUtils
+   (:import [org.apache.storm.utils TimeCacheMap Time 
TimeCacheMap$ExpiredCallback Utils ConfigUtils TupleUtils ThriftTopologyUtils
              BufferFileInputStream BufferInputStream])
 -  (:import [org.apache.storm.generated NotAliveException 
AlreadyAliveException StormTopology ErrorInfo
 +  (:import [org.apache.storm.generated NotAliveException 
AlreadyAliveException StormTopology ErrorInfo ClusterWorkerHeartbeat
              ExecutorInfo InvalidTopologyException Nimbus$Iface 
Nimbus$Processor SubmitOptions TopologyInitialStatus
              KillOptions RebalanceOptions ClusterSummary SupervisorSummary 
TopologySummary TopologyInfo TopologyHistoryInfo
              ExecutorSummary AuthorizationException GetInfoOptions 
NumErrorsChoice SettableBlobMeta ReadableBlobMeta
@@@ -407,10 -414,10 +409,10 @@@
    [storm-cluster-state]
  
    (let [assignments (.assignments storm-cluster-state nil)]
-     (defaulted
+     (or
        (apply merge-with set/union
               (for [a assignments
 -                   [_ [node port]] (-> (.assignment-info storm-cluster-state 
a nil) :executor->node+port)]
 +                   [_ [node port]] (-> (clojurify-assignment (.assignmentInfo 
storm-cluster-state a nil)) :executor->node+port)]
                 {node #{port}}
                 ))
        {})
@@@ -586,12 -594,7 +589,12 @@@
  (defn update-heartbeats! [nimbus storm-id all-executors existing-assignment]
    (log-debug "Updating heartbeats for " storm-id " " (pr-str all-executors))
    (let [storm-cluster-state (:storm-cluster-state nimbus)
 -        executor-beats (.executor-beats storm-cluster-state storm-id 
(:executor->node+port existing-assignment))
 +        executor-beats (let [executor-stats-java-map (.executorBeats 
storm-cluster-state storm-id (.get_executor_node_port (thriftify-assignment 
existing-assignment)))
 +                             executor-stats-clojurify (clojurify-structure 
executor-stats-java-map)]
-                          (->> (dofor [[^ExecutorInfo executor-info 
^ClusterWorkerHeartbeat cluster-worker-heartbeat] executor-stats-clojurify]
-                              {[(.get_task_start executor-info) (.get_task_end 
executor-info)] (clojurify-zk-worker-hb cluster-worker-heartbeat)})
++                         (->> (dofor [[^ExecutorInfo executor-info  
executor-heartbeat] executor-stats-clojurify]
++                             {[(.get_task_start executor-info) (.get_task_end 
executor-info)] executor-heartbeat})
 +                           (apply merge)))
 +
          cache (update-heartbeat-cache (@(:heartbeats-cache nimbus) storm-id)
                                        executor-beats
                                        all-executors
@@@ -988,10 -1002,10 +1002,10 @@@
          topology (system-topology! storm-conf (read-storm-topology storm-id 
blob-store))
          num-executors (->> (all-components topology) (map-val 
num-start-executors))]
      (log-message "Activating " storm-name ": " storm-id)
 -    (.activate-storm! storm-cluster-state
 +    (.activateStorm storm-cluster-state
                        storm-id
 -                      (StormBase. storm-name
 +      (thriftify-storm-base (StormBase. storm-name
-                                   (current-time-secs)
+                                   (Time/currentTimeSecs)
                                    {:type topology-initial-status}
                                    (storm-conf TOPOLOGY-WORKERS)
                                    num-executors
@@@ -1137,9 -1152,9 +1152,9 @@@
          (when-not (empty? to-cleanup-ids)
            (doseq [id to-cleanup-ids]
              (log-message "Cleaning up " id)
 -            (.teardown-heartbeats! storm-cluster-state id)
 -            (.teardown-topology-errors! storm-cluster-state id)
 +            (.teardownHeartbeats storm-cluster-state id)
 +            (.teardownTopologyErrors storm-cluster-state id)
-             (rmr (ConfigUtils/masterStormDistRoot conf id))
+             (Utils/forceDelete (ConfigUtils/masterStormDistRoot conf id))
              (blob-rm-topology-keys id blob-store storm-cluster-state)
              (swap! (:heartbeats-cache nimbus) dissoc id)))))
      (log-message "not a leader, skipping cleanup")))
@@@ -1811,8 -1830,8 +1838,8 @@@
                                           (.set_used_cpu sup-sum used-cpu))
                                         (when-let [version (:version info)] 
(.set_version sup-sum version))
                                         sup-sum))
-               nimbus-uptime ((:uptime nimbus))
+               nimbus-uptime (. (:uptime nimbus) upTime)
 -              bases (topology-bases storm-cluster-state)
 +              bases (nimbus-topology-bases storm-cluster-state)
                nimbuses (.nimbuses storm-cluster-state)
  
                ;;update the isLeader field for each nimbus summary

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
index 3a83d03,ae9e92f..273a6bd
--- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj
@@@ -16,14 -16,15 +16,15 @@@
  (ns org.apache.storm.daemon.supervisor
    (:import [java.io File IOException FileOutputStream])
    (:import [org.apache.storm.scheduler ISupervisor]
-            [org.apache.storm.utils LocalState Time Utils ConfigUtils]
+            [org.apache.storm.utils LocalState Time Utils 
Utils$ExitCodeCallable
+                                    ConfigUtils]
             [org.apache.storm.daemon Shutdownable]
             [org.apache.storm Constants]
 -           [org.apache.storm.cluster ClusterStateContext DaemonType]
 +           [org.apache.storm.cluster ClusterStateContext DaemonType 
StormClusterStateImpl ClusterUtils]
             [java.net JarURLConnection]
-            [java.net URI]
+            [java.net URI URLDecoder]
             [org.apache.commons.io FileUtils])
 -  (:use [org.apache.storm config util log timer local-state])
 +  (:use [org.apache.storm config util log timer local-state converter])
    (:import [org.apache.storm.generated AuthorizationException 
KeyNotFoundException WorkerResources])
    (:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo])
    (:import [java.nio.file Files StandardCopyOption])
@@@ -315,12 -319,14 +321,12 @@@
     :shared-context shared-context
     :isupervisor isupervisor
     :active (atom true)
-    :uptime (uptime-computer)
+    :uptime (Utils/makeUptimeComputer)
     :version STORM-VERSION
     :worker-thread-pids-atom (atom {})
 -   :storm-cluster-state (cluster/mk-storm-cluster-state conf :acls (when
 -                                                                     
(Utils/isZkAuthenticationConfiguredStormServer
 -                                                                       conf)
 -                                                                     
SUPERVISOR-ZK-ACLS)
 -                                                        :context 
(ClusterStateContext. DaemonType/SUPERVISOR))
 +   :storm-cluster-state (ClusterUtils/mkStormClusterState conf (when 
(Utils/isZkAuthenticationConfiguredStormServer conf)
 +                                                     SUPERVISOR-ZK-ACLS)
 +                                                        (ClusterStateContext. 
DaemonType/SUPERVISOR))
     :local-state (ConfigUtils/supervisorState conf)
     :supervisor-id (.getSupervisorId isupervisor)
     :assignment-id (.getAssignmentId isupervisor)
@@@ -777,19 -791,19 +792,19 @@@
          synchronize-blobs-fn (update-blobs-for-all-topologies-fn supervisor)
          downloaded-storm-ids (set (read-downloaded-storm-ids conf))
          run-profiler-actions-fn (mk-run-profiler-actions-for-all-topologies 
supervisor)
 -        heartbeat-fn (fn [] (.supervisor-heartbeat!
 +        heartbeat-fn (fn [] (.supervisorHeartbeat
                                 (:storm-cluster-state supervisor)
                                 (:supervisor-id supervisor)
-                               (thriftify-supervisor-info (->SupervisorInfo 
(current-time-secs)
 -                               (->SupervisorInfo (Time/currentTimeSecs)
++                               (thriftify-supervisor-info (->SupervisorInfo 
(Time/currentTimeSecs)
                                                   (:my-hostname supervisor)
                                                   (:assignment-id supervisor)
                                                   (keys @(:curr-assignment 
supervisor))
                                                    ;; used ports
                                                   (.getMetadata isupervisor)
                                                   (conf 
SUPERVISOR-SCHEDULER-META)
-                                                  ((:uptime supervisor))
+                                                  (. (:uptime supervisor) 
upTime)
                                                   (:version supervisor)
 -                                                 (mk-supervisor-capacities 
conf))))]
 +                                                 (mk-supervisor-capacities 
conf)))))]
      (heartbeat-fn)
  
      ;; should synchronize supervisor so it doesn't launch anything after 
being down (optimization)

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/daemon/worker.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/daemon/worker.clj
index ae5be57,fe8cfae..9863427
--- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj
+++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj
@@@ -19,16 -19,18 +19,18 @@@
    (:require [clj-time.core :as time])
    (:require [clj-time.coerce :as coerce])
    (:require [org.apache.storm.daemon [executor :as executor]])
 -  (:require [org.apache.storm [disruptor :as disruptor] [cluster :as 
cluster]])
 +  (:require [org.apache.storm [disruptor :as disruptor]])
    (:require [clojure.set :as set])
-   (:require [org.apache.storm.messaging.loader :as msg-loader])
    (:import [java.util.concurrent Executors]
-            [org.apache.storm.hooks IWorkerHook BaseWorkerHook])
-   (:import [java.util ArrayList HashMap])
-   (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer 
ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue])
+            [org.apache.storm.hooks IWorkerHook BaseWorkerHook]
+            [uk.org.lidalia.sysoutslf4j.context SysOutOverSLF4J])
+   (:import [java.util ArrayList HashMap]
+            [java.util.concurrent.locks ReentrantReadWriteLock])
+   (:import [org.apache.commons.io FileUtils])
+   (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer 
ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue Time])
    (:import [org.apache.storm.grouping LoadMapping])
    (:import [org.apache.storm.messaging TransportFactory])
-   (:import [org.apache.storm.messaging TaskMessage IContext IConnection 
ConnectionWithStatus ConnectionWithStatus$Status])
+   (:import [org.apache.storm.messaging TaskMessage IContext IConnection 
ConnectionWithStatus ConnectionWithStatus$Status 
DeserializingConnectionCallback])
    (:import [org.apache.storm.daemon Shutdownable])
    (:import [org.apache.storm.serialization KryoTupleSerializer])
    (:import [org.apache.storm.generated StormTopology])

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj
----------------------------------------------------------------------
diff --cc 
storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj
index a36da3a,be4361a..0000000
deleted file mode 100644,100644
--- a/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj
+++ /dev/null
@@@ -1,122 -1,141 +1,0 @@@
--;; Licensed to the Apache Software Foundation (ASF) under one
--;; or more contributor license agreements.  See the NOTICE file
--;; distributed with this work for additional information
--;; regarding copyright ownership.  The ASF licenses this file
--;; to you under the Apache License, Version 2.0 (the
--;; "License"); you may not use this file except in compliance
--;; with the License.  You may obtain a copy of the License at
--;;
--;; http://www.apache.org/licenses/LICENSE-2.0
--;;
--;; Unless required by applicable law or agreed to in writing, software
--;; distributed under the License is distributed on an "AS IS" BASIS,
--;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--;; See the License for the specific language governing permissions and
--;; limitations under the License.
--
--(ns org.apache.storm.pacemaker.pacemaker-state-factory
--  (:require [org.apache.storm.pacemaker pacemaker]
 -            [org.apache.storm.cluster-state [zookeeper-state-factory :as 
zk-factory]]
--            [org.apache.storm
--             [config :refer :all]
 -             [cluster :refer :all]
--             [log :refer :all]
--             [util :as util]])
--  (:import [org.apache.storm.generated
--            HBExecutionException HBServerMessageType HBMessage
--            HBMessageData HBPulse]
-            [org.apache.storm.cluster ZKStateStorage ClusterUtils 
IStateStorage]
 -           [org.apache.storm.cluster_state zookeeper_state_factory]
 -           [org.apache.storm.cluster ClusterState]
--           [org.apache.storm.pacemaker PacemakerClient])
--  (:gen-class
-     :implements [org.apache.storm.cluster.StateStorageFactory]))
 -   :implements [org.apache.storm.cluster.ClusterStateFactory]))
--
--;; So we can mock the client for testing
--(defn makeClient [conf]
--  (PacemakerClient. conf))
--
--(defn makeZKState [conf auth-conf acls context]
-   (ClusterUtils/mkStateStorage conf auth-conf acls context))
 -  (.mkState (zookeeper_state_factory.) conf auth-conf acls context))
--
--(def max-retries 10)
 -
 -(defn retry-on-exception
 -  "Retries specific function on exception based on retries count"
 -  [retries task-description f & args]
 -  (let [res (try {:value (apply f args)}
 -                 (catch Exception e
 -                   (if (<= 0 retries)
 -                     (throw e)
 -                     {:exception e})))]
 -    (if (:exception res)
 -      (do 
 -        (log-error (:exception res) (str "Failed to " task-description ". 
Will make [" retries "] more attempts."))
 -        (recur (dec retries) task-description f args))
 -      (do 
 -        (log-debug (str "Successful " task-description "."))
 -        (:value res)))))
--
--(defn -mkState [this conf auth-conf acls context]
--  (let [zk-state (makeZKState conf auth-conf acls context)
--        pacemaker-client (makeClient conf)]
--
--    (reify
-       IStateStorage
 -      ClusterState
--      ;; Let these pass through to the zk-state. We only want to handle 
heartbeats.
--      (register [this callback] (.register zk-state callback))
--      (unregister [this callback] (.unregister zk-state callback))
--      (set_ephemeral_node [this path data acls] (.set_ephemeral_node zk-state 
path data acls))
--      (create_sequential [this path data acls] (.create_sequential zk-state 
path data acls))
--      (set_data [this path data acls] (.set_data zk-state path data acls))
--      (delete_node [this path] (.delete_node zk-state path))
--      (delete_node_blobstore [this path nimbus-host-port-info] 
(.delete_node_blobstore zk-state path nimbus-host-port-info))
--      (get_data [this path watch?] (.get_data zk-state path watch?))
--      (get_data_with_version [this path watch?] (.get_data_with_version 
zk-state path watch?))
--      (get_version [this path watch?] (.get_version zk-state path watch?))
--      (get_children [this path watch?] (.get_children zk-state path watch?))
--      (mkdirs [this path acls] (.mkdirs zk-state path acls))
--      (node_exists [this path watch?] (.node_exists zk-state path watch?))
--      (add_listener [this listener] (.add_listener zk-state listener))
--      (sync_path [this path] (.sync_path zk-state path))
--      
--      (set_worker_hb [this path data acls]
-         (util/retry-on-exception
 -        (retry-on-exception
--         max-retries
--         "set_worker_hb"
--         #(let [response
--                (.send pacemaker-client
--                       (HBMessage. HBServerMessageType/SEND_PULSE
--                                   (HBMessageData/pulse
--                                    (doto (HBPulse.)
--                                      (.set_id path)
--                                      (.set_details data)))))]
--            (if (= (.get_type response) 
HBServerMessageType/SEND_PULSE_RESPONSE)
--              :ok
--              (throw (HBExecutionException. "Invalid Response Type"))))))
--
--      (delete_worker_hb [this path]
-         (util/retry-on-exception
 -        (retry-on-exception
--         max-retries
--         "delete_worker_hb"
--         #(let [response
--                (.send pacemaker-client
--                       (HBMessage. HBServerMessageType/DELETE_PATH
--                                   (HBMessageData/path path)))]
--            (if (= (.get_type response) 
HBServerMessageType/DELETE_PATH_RESPONSE)
--              :ok
--              (throw (HBExecutionException. "Invalid Response Type"))))))
--      
--      (get_worker_hb [this path watch?]
-         (util/retry-on-exception
 -        (retry-on-exception
--         max-retries
--         "get_worker_hb"
--         #(let [response
--                (.send pacemaker-client
--                       (HBMessage. HBServerMessageType/GET_PULSE
--                                   (HBMessageData/path path)))]
--            (if (= (.get_type response) 
HBServerMessageType/GET_PULSE_RESPONSE)
--              (try 
--                (.get_details (.get_pulse (.get_data response)))
--                (catch Exception e
--                  (throw (HBExecutionException. (.toString e)))))
--              (throw (HBExecutionException. "Invalid Response Type"))))))
--      
--      (get_worker_hb_children [this path watch?]
-         (util/retry-on-exception
 -        (retry-on-exception
--         max-retries
--         "get_worker_hb_children"
--         #(let [response
--                (.send pacemaker-client
--                       (HBMessage. HBServerMessageType/GET_ALL_NODES_FOR_PATH
--                                   (HBMessageData/path path)))]
--            (if (= (.get_type response) 
HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE)
--              (try
--                (into [] (.get_pulseIds (.get_nodes (.get_data response))))
--                (catch Exception e
--                  (throw (HBExecutionException. (.toString e)))))
--              (throw (HBExecutionException. "Invalid Response Type"))))))
--      
--      (close [this]
--        (.close zk-state)
--        (.close pacemaker-client)))))

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/stats.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/stats.clj
index 0bf1757,8b37fc3..8632ed3
--- a/storm-core/src/clj/org/apache/storm/stats.clj
+++ b/storm-core/src/clj/org/apache/storm/stats.clj
@@@ -24,8 -24,8 +24,9 @@@
              ExecutorAggregateStats SpecificAggregateStats
              SpoutAggregateStats TopologyPageInfo TopologyStats])
    (:import [org.apache.storm.utils Utils])
 +  (:import [org.apache.storm.cluster StormClusterStateImpl])
-   (:import [org.apache.storm.metric.internal MultiCountStatAndMetric 
MultiLatencyStatAndMetric])
+   (:import [org.apache.storm.metric.internal MultiCountStatAndMetric 
MultiLatencyStatAndMetric]
+            [java.util Collection])
    (:use [org.apache.storm log util])
    (:use [clojure.math.numeric-tower :only [ceil]]))
  

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/testing.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/testing.clj
index 5a0bdf2,c872742..eef7754
--- a/storm-core/src/clj/org/apache/storm/testing.clj
+++ b/storm-core/src/clj/org/apache/storm/testing.clj
@@@ -44,11 -45,12 +45,12 @@@
    (:import [org.apache.storm.transactional.partitioned 
PartitionedTransactionalSpoutExecutor])
    (:import [org.apache.storm.tuple Tuple])
    (:import [org.apache.storm.generated StormTopology])
-   (:import [org.apache.storm.task TopologyContext])
+   (:import [org.apache.storm.task TopologyContext]
+            (org.apache.storm.messaging IContext)
+            [org.json.simple JSONValue])
 -  (:require [org.apache.storm [zookeeper :as zk]])
 +  (:import [org.apache.storm.cluster ZKStateStorage ClusterStateContext 
StormClusterStateImpl ClusterUtils])
-   (:require [org.apache.storm.messaging.loader :as msg-loader])
    (:require [org.apache.storm.daemon.acker :as acker])
 -  (:use [org.apache.storm cluster util thrift config log local-state]))
 +  (:use [org.apache.storm util thrift config log local-state converter]))
  
  (defn feeder-spout
    [fields]

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/thrift.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/thrift.clj
index 4dc21f9,779c1d1..7aab729
--- a/storm-core/src/clj/org/apache/storm/thrift.clj
+++ b/storm-core/src/clj/org/apache/storm/thrift.clj
@@@ -29,8 -29,9 +29,9 @@@
    (:import [org.apache.storm.grouping CustomStreamGrouping])
    (:import [org.apache.storm.topology TopologyBuilder])
    (:import [org.apache.storm.clojure RichShellBolt RichShellSpout])
-   (:import [org.apache.thrift.transport TTransport])
+   (:import [org.apache.thrift.transport TTransport]
+            (org.json.simple JSONValue))
 -  (:use [org.apache.storm util config log zookeeper]))
 +  (:use [org.apache.storm util config log]))
  
  (defn instantiate-java-object
    [^JavaObject obj]

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/ui/core.clj
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/util.clj
----------------------------------------------------------------------
diff --cc storm-core/src/clj/org/apache/storm/util.clj
index 165d8ee,f685d12..72778bb
--- a/storm-core/src/clj/org/apache/storm/util.clj
+++ b/storm-core/src/clj/org/apache/storm/util.clj
@@@ -20,9 -20,8 +20,9 @@@
    (:import [java.io FileReader FileNotFoundException])
    (:import [java.nio.file Paths])
    (:import [org.apache.storm Config])
-   (:import [org.apache.storm.generated ErrorInfo])
-   (:import [org.apache.storm.utils Time Container ClojureTimerTask Utils
-             MutableObject MutableInt])
++ (:import [org.apache.storm.generated ErrorInfo])
+   (:import [org.apache.storm.utils Time ClojureTimerTask Utils
+             MutableObject])
    (:import [org.apache.storm.security.auth NimbusPrincipal])
    (:import [javax.security.auth Subject])
    (:import [java.util UUID Random ArrayList List Collections])
@@@ -262,58 -163,9 +164,19 @@@
                     (instance? Boolean x) (boolean x)
                     true x))
             s))
 +; move this func form convert.clj due to cyclic load dependency
 +(defn clojurify-error [^ErrorInfo error]
 +  (if error
 +    {
 +      :error (.get_error error)
 +      :time-secs (.get_error_time_secs error)
 +      :host (.get_host error)
 +      :port (.get_port error)
 +      }
 +    ))
  
- (defmacro with-file-lock
-   [path & body]
-   `(let [f# (File. ~path)
-          _# (.createNewFile f#)
-          rf# (RandomAccessFile. f# "rw")
-          lock# (.. rf# (getChannel) (lock))]
-      (try
-        ~@body
-        (finally
-          (.release lock#)
-          (.close rf#)))))
- 
- (defn tokenize-path
-   [^String path]
-   (let [toks (.split path "/")]
-     (vec (filter (complement empty?) toks))))
- 
- (defn assoc-conj
-   [m k v]
-   (merge-with concat m {k [v]}))
- 
- ;; returns [ones in first set not in second, ones in second set not in first]
- (defn set-delta
-   [old curr]
-   (let [s1 (set old)
-         s2 (set curr)]
-     [(set/difference s1 s2) (set/difference s2 s1)]))
- 
- (defn parent-path
-   [path]
-   (let [toks (tokenize-path path)]
-     (str "/" (str/join "/" (butlast toks)))))
- 
- (defn toks->path
-   [toks]
-   (str "/" (str/join "/" toks)))
- 
- (defn normalize-path
-   [^String path]
-   (toks->path (tokenize-path path)))
- 
+ ;TODO: We're keeping this function around until all the code using it is 
properly tranlated to java
+ ;TODO: by properly having the for loop IN THE JAVA FUNCTION that originally 
used this function.
  (defn map-val
    [afn amap]
    (into {}

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
index b30d1d2,0000000..0c663f0
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java
@@@ -1,249 -1,0 +1,282 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.cluster;
 +
 +import clojure.lang.APersistentMap;
++import clojure.lang.PersistentArrayMap;
++import clojure.lang.RT;
 +import org.apache.storm.Config;
 +import org.apache.storm.generated.ClusterWorkerHeartbeat;
 +import org.apache.storm.generated.ExecutorInfo;
 +import org.apache.storm.generated.ExecutorStats;
 +import org.apache.storm.generated.ProfileAction;
 +import org.apache.storm.utils.Utils;
 +import org.apache.zookeeper.ZooDefs;
 +import org.apache.zookeeper.data.ACL;
 +import org.apache.zookeeper.data.Id;
 +import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
 +
++import java.io.PrintWriter;
++import java.io.StringWriter;
 +import java.io.UnsupportedEncodingException;
 +import java.net.URLEncoder;
 +import java.security.NoSuchAlgorithmException;
 +import java.util.ArrayList;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +
 +public class ClusterUtils {
 +
 +    public static final String ZK_SEPERATOR = "/";
 +
 +    public static final String ASSIGNMENTS_ROOT = "assignments";
 +    public static final String CODE_ROOT = "code";
 +    public static final String STORMS_ROOT = "storms";
 +    public static final String SUPERVISORS_ROOT = "supervisors";
 +    public static final String WORKERBEATS_ROOT = "workerbeats";
 +    public static final String BACKPRESSURE_ROOT = "backpressure";
 +    public static final String ERRORS_ROOT = "errors";
 +    public static final String BLOBSTORE_ROOT = "blobstore";
 +    public static final String BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_ROOT = 
"blobstoremaxkeysequencenumber";
 +    public static final String NIMBUSES_ROOT = "nimbuses";
 +    public static final String CREDENTIALS_ROOT = "credentials";
 +    public static final String LOGCONFIG_ROOT = "logconfigs";
 +    public static final String PROFILERCONFIG_ROOT = "profilerconfigs";
 +
 +    public static final String ASSIGNMENTS_SUBTREE = ZK_SEPERATOR + 
ASSIGNMENTS_ROOT;
 +    public static final String STORMS_SUBTREE = ZK_SEPERATOR + STORMS_ROOT;
 +    public static final String SUPERVISORS_SUBTREE = ZK_SEPERATOR + 
SUPERVISORS_ROOT;
 +    public static final String WORKERBEATS_SUBTREE = ZK_SEPERATOR + 
WORKERBEATS_ROOT;
 +    public static final String BACKPRESSURE_SUBTREE = ZK_SEPERATOR + 
BACKPRESSURE_ROOT;
 +    public static final String ERRORS_SUBTREE = ZK_SEPERATOR + ERRORS_ROOT;
 +    public static final String BLOBSTORE_SUBTREE = ZK_SEPERATOR + 
BLOBSTORE_ROOT;
 +    public static final String BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE = 
ZK_SEPERATOR + BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_ROOT;
 +    public static final String NIMBUSES_SUBTREE = ZK_SEPERATOR + 
NIMBUSES_ROOT;
 +    public static final String CREDENTIALS_SUBTREE = ZK_SEPERATOR + 
CREDENTIALS_ROOT;
 +    public static final String LOGCONFIG_SUBTREE = ZK_SEPERATOR + 
LOGCONFIG_ROOT;
 +    public static final String PROFILERCONFIG_SUBTREE = ZK_SEPERATOR + 
PROFILERCONFIG_ROOT;
 +
 +    // A singleton instance allows us to mock delegated static methods in our
 +    // tests by subclassing.
 +    private static final ClusterUtils INSTANCE = new ClusterUtils();
 +    private static ClusterUtils _instance = INSTANCE;
 +
 +    /**
 +     * Provide an instance of this class for delegates to use. To mock out 
delegated methods, provide an instance of a subclass that overrides the
 +     * implementation of the delegated method.
 +     *
 +     * @param u a Cluster instance
 +     */
 +    public static void setInstance(ClusterUtils u) {
 +        _instance = u;
 +    }
 +
 +    /**
 +     * Resets the singleton instance to the default. This is helpful to reset 
the class to its original functionality when mocking is no longer desired.
 +     */
 +    public static void resetInstance() {
 +        _instance = INSTANCE;
 +    }
 +
 +    public static List<ACL> mkTopoOnlyAcls(Map topoConf) throws 
NoSuchAlgorithmException {
 +        List<ACL> aclList = null;
 +        String payload = (String) 
topoConf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD);
 +        if (Utils.isZkAuthenticationConfiguredStormServer(topoConf)) {
 +            aclList = new ArrayList<>();
 +            ACL acl1 = ZooDefs.Ids.CREATOR_ALL_ACL.get(0);
 +            aclList.add(acl1);
 +            ACL acl2 = new ACL(ZooDefs.Perms.READ, new Id("digest", 
DigestAuthenticationProvider.generateDigest(payload)));
 +            aclList.add(acl2);
 +        }
 +        return aclList;
 +    }
 +
 +    public static String supervisorPath(String id) {
 +        return SUPERVISORS_SUBTREE + ZK_SEPERATOR + id;
 +    }
 +
 +    public static String assignmentPath(String id) {
 +        return ASSIGNMENTS_SUBTREE + ZK_SEPERATOR + id;
 +    }
 +
 +    public static String blobstorePath(String key) {
 +        return BLOBSTORE_SUBTREE + ZK_SEPERATOR + key;
 +    }
 +
 +    public static String blobstoreMaxKeySequenceNumberPath(String key) {
 +        return BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE + ZK_SEPERATOR + key;
 +    }
 +
 +    public static String nimbusPath(String id) {
 +        return NIMBUSES_SUBTREE + ZK_SEPERATOR + id;
 +    }
 +
 +    public static String stormPath(String id) {
 +        return STORMS_SUBTREE + ZK_SEPERATOR + id;
 +    }
 +
 +    public static String workerbeatStormRoot(String stormId) {
 +        return WORKERBEATS_SUBTREE + ZK_SEPERATOR + stormId;
 +    }
 +
 +    public static String workerbeatPath(String stormId, String node, Long 
port) {
 +        return workerbeatStormRoot(stormId) + ZK_SEPERATOR + node + "-" + 
port;
 +    }
 +
 +    public static String backpressureStormRoot(String stormId) {
 +        return BACKPRESSURE_SUBTREE + ZK_SEPERATOR + stormId;
 +    }
 +
 +    public static String backpressurePath(String stormId, String node, Long 
port) {
 +        return backpressureStormRoot(stormId) + ZK_SEPERATOR + node + "-" + 
port;
 +    }
 +
 +    public static String errorStormRoot(String stormId) {
 +        return ERRORS_SUBTREE + ZK_SEPERATOR + stormId;
 +    }
 +
 +    public static String errorPath(String stormId, String componentId) {
 +        try {
 +            return errorStormRoot(stormId) + ZK_SEPERATOR + 
URLEncoder.encode(componentId, "UTF-8");
 +        } catch (UnsupportedEncodingException e) {
 +            throw Utils.wrapInRuntime(e);
 +        }
 +    }
 +
 +    public static String lastErrorPath(String stormId, String componentId) {
 +        return errorPath(stormId, componentId) + "-last-error";
 +    }
 +
 +    public static String credentialsPath(String stormId) {
 +        return CREDENTIALS_SUBTREE + ZK_SEPERATOR + stormId;
 +    }
 +
 +    public static String logConfigPath(String stormId) {
 +        return LOGCONFIG_SUBTREE + ZK_SEPERATOR + stormId;
 +    }
 +
 +    public static String profilerConfigPath(String stormId) {
 +        return PROFILERCONFIG_SUBTREE + ZK_SEPERATOR + stormId;
 +    }
 +
 +    public static String profilerConfigPath(String stormId, String host, Long 
port, ProfileAction requestType) {
 +        return profilerConfigPath(stormId) + ZK_SEPERATOR + host + "_" + port 
+ "_" + requestType;
 +    }
 +
 +    public static <T> T maybeDeserialize(byte[] serialized, Class<T> clazz) {
 +        if (serialized != null) {
 +            return Utils.deserialize(serialized, clazz);
 +        }
 +        return null;
 +    }
 +
-     // Ensures that we only return heartbeats for executors assigned to this 
worker
-     public static Map<ExecutorInfo, ClusterWorkerHeartbeat> 
convertExecutorBeats(List<ExecutorInfo> executors, ClusterWorkerHeartbeat 
workerHeartbeat) {
-         Map<ExecutorInfo, ClusterWorkerHeartbeat> executorWhb = new 
HashMap<>();
++    /**
++     * Ensures that we only return heartbeats for executors assigned to this 
worker
++     * @param executors
++     * @param workerHeartbeat
++     * @return
++     */
++    public static Map<ExecutorInfo, APersistentMap> 
convertExecutorBeats(List<ExecutorInfo> executors, ClusterWorkerHeartbeat 
workerHeartbeat) {
++        Map<ExecutorInfo, APersistentMap> executorWhb = new HashMap<>();
 +        Map<ExecutorInfo, ExecutorStats> executorStatsMap = 
workerHeartbeat.get_executor_stats();
 +        for (ExecutorInfo executor : executors) {
 +            if (executorStatsMap.containsKey(executor)) {
-                 executorWhb.put(executor, workerHeartbeat);
++                APersistentMap executorBeat =
++                        new PersistentArrayMap(new Object[] { 
RT.keyword(null, "time-secs"), workerHeartbeat.get_time_secs(), 
RT.keyword(null, "uptime"),
++                                workerHeartbeat.get_uptime_secs(), 
RT.keyword(null, "stats"), workerHeartbeat.get_executor_stats().get(executor) 
});
++                executorWhb.put(executor, executorBeat);
 +            }
 +        }
 +        return executorWhb;
 +    }
 +
 +    public IStormClusterState mkStormClusterStateImpl(Object stateStorage, 
List<ACL> acls, ClusterStateContext context) throws Exception {
 +        if (stateStorage instanceof IStateStorage) {
 +            return new StormClusterStateImpl((IStateStorage) stateStorage, 
acls, context, false);
 +        } else {
 +            IStateStorage Storage = 
_instance.mkStateStorageImpl((APersistentMap) stateStorage, (APersistentMap) 
stateStorage, acls, context);
 +            return new StormClusterStateImpl(Storage, acls, context, true);
 +        }
 +
 +    }
 +
-     public IStateStorage mkStateStorageImpl(APersistentMap config, 
APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context)
-             throws Exception {
++    public IStateStorage mkStateStorageImpl(APersistentMap config, 
APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) throws 
Exception {
 +        String className = null;
 +        IStateStorage stateStorage = null;
 +        if (config.get(Config.STORM_CLUSTER_STATE_STORE) != null) {
 +            className = (String) config.get(Config.STORM_CLUSTER_STATE_STORE);
 +        } else {
 +            className = "org.apache.storm.cluster.ZKStateStorageFactory";
 +        }
 +        Class clazz = Class.forName(className);
 +        StateStorageFactory storageFactory = (StateStorageFactory) 
clazz.newInstance();
 +        stateStorage = storageFactory.mkStore(config, auth_conf, acls, 
context);
 +        return stateStorage;
 +    }
 +
-     public static IStateStorage mkStateStorage(APersistentMap config, 
APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context)
-             throws Exception {
++    public static IStateStorage mkStateStorage(APersistentMap config, 
APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) throws 
Exception {
 +        return _instance.mkStateStorageImpl(config, auth_conf, acls, context);
 +    }
 +
 +    public static IStormClusterState mkStormClusterState(Object StateStorage, 
List<ACL> acls, ClusterStateContext context) throws Exception {
 +        return _instance.mkStormClusterStateImpl(StateStorage, acls, context);
 +    }
 +
 +    // TO be remove
 +    public static <K, V> HashMap<V, List<K>> reverseMap(Map<K, V> map) {
 +        HashMap<V, List<K>> rtn = new HashMap<V, List<K>>();
 +        if (map == null) {
 +            return rtn;
 +        }
 +        for (Map.Entry<K, V> entry : map.entrySet()) {
 +            K key = entry.getKey();
 +            V val = entry.getValue();
 +            List<K> list = rtn.get(val);
 +            if (list == null) {
 +                list = new ArrayList<K>();
 +                rtn.put(entry.getValue(), list);
 +            }
 +            list.add(key);
 +        }
 +        return rtn;
 +    }
++
++    public static String StringifyError(Throwable error) {
++        String errorString = null;
++        StringWriter result = null;
++        PrintWriter printWriter = null;
++        try {
++            result = new StringWriter();
++            printWriter = new PrintWriter(result);
++            error.printStackTrace(printWriter);
++            if (result != null) {
++                errorString = result.toString();
++            }
++        } finally {
++            try {
++                if (result != null)
++                    result.close();
++                if (printWriter != null)
++                    printWriter.close();
++            } catch (Exception e) {
++            }
++        }
++        return errorString;
++    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
index 59d1af7,0000000..01cf56a
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java
@@@ -1,129 -1,0 +1,124 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.cluster;
 +
 +import clojure.lang.APersistentMap;
 +import clojure.lang.IFn;
 +import org.apache.storm.generated.*;
 +import org.apache.storm.nimbus.NimbusInfo;
 +
 +import java.security.NoSuchAlgorithmException;
 +import java.util.List;
 +import java.util.Map;
 +
 +public interface IStormClusterState {
 +    public List<String> assignments(IFn callback);
 +
 +    public Assignment assignmentInfo(String stormId, IFn callback);
 +
 +    public APersistentMap assignmentInfoWithVersion(String stormId, IFn 
callback);
 +
 +    public Integer assignmentVersion(String stormId, IFn callback) throws 
Exception;
 +
-     // returns key information under /storm/blobstore/key
 +    public List<String> blobstoreInfo(String blobKey);
 +
-     // returns list of nimbus summaries stored under 
/stormroot/nimbuses/<nimbus-ids> -> <data>
 +    public List nimbuses();
 +
-     // adds the NimbusSummary to /stormroot/nimbuses/nimbus-id
 +    public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary);
 +
 +    public List<String> activeStorms();
 +
 +    public StormBase stormBase(String stormId, IFn callback);
 +
 +    public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String 
node, Long port);
 +
 +    public List<ProfileRequest> getWorkerProfileRequests(String stormId, 
NodeInfo nodeInfo, boolean isThrift);
 +
 +    public List<ProfileRequest> getTopologyProfileRequests(String stormId, 
boolean isThrift);
 +
 +    public void setWorkerProfileRequest(String stormId, ProfileRequest 
profileRequest);
 +
 +    public void deleteTopologyProfileRequests(String stormId, ProfileRequest 
profileRequest);
 +
-     public Map<ExecutorInfo, ClusterWorkerHeartbeat> executorBeats(String 
stormId, Map<List<Long>, NodeInfo> executorNodePort);
++    public Map<ExecutorInfo, APersistentMap> executorBeats(String stormId, 
Map<List<Long>, NodeInfo> executorNodePort);
 +
 +    public List<String> supervisors(IFn callback);
 +
 +    public SupervisorInfo supervisorInfo(String supervisorId); // returns nil 
if doesn't exist
 +
 +    public void setupHeatbeats(String stormId);
 +
 +    public void teardownHeartbeats(String stormId);
 +
 +    public void teardownTopologyErrors(String stormId);
 +
 +    public List<String> heartbeatStorms();
 +
 +    public List<String> errorTopologies();
 +
 +    public void setTopologyLogConfig(String stormId, LogConfig logConfig);
 +
 +    public LogConfig topologyLogConfig(String stormId, IFn cb);
 +
 +    public void workerHeartbeat(String stormId, String node, Long port, 
ClusterWorkerHeartbeat info);
 +
 +    public void removeWorkerHeartbeat(String stormId, String node, Long port);
 +
 +    public void supervisorHeartbeat(String supervisorId, SupervisorInfo info);
 +
 +    public void workerBackpressure(String stormId, String node, Long port, 
boolean on);
 +
 +    public boolean topologyBackpressure(String stormId, IFn callback);
 +
 +    public void setupBackpressure(String stormId);
 +
 +    public void removeWorkerBackpressure(String stormId, String node, Long 
port);
 +
 +    public void activateStorm(String stormId, StormBase stormBase);
 +
 +    public void updateStorm(String stormId, StormBase newElems);
 +
 +    public void removeStormBase(String stormId);
 +
 +    public void setAssignment(String stormId, Assignment info);
 +
-     // sets up information related to key consisting of nimbus
-     // host:port and version info of the blob
 +    public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer 
versionInfo);
 +
 +    public List<String> activeKeys();
 +
 +    public List<String> blobstore(IFn callback);
 +
 +    public void removeStorm(String stormId);
 +
 +    public void removeBlobstoreKey(String blobKey);
 +
 +    public void removeKeyVersion(String blobKey);
 +
-     public void reportError(String stormId, String componentId, String node, 
Long port, String error);
++    public void reportError(String stormId, String componentId, String node, 
Long port, Throwable error);
 +
 +    public List<ErrorInfo> errors(String stormId, String componentId);
 +
 +    public ErrorInfo lastError(String stormId, String componentId);
 +
 +    public void setCredentials(String stormId, Credentials creds, Map 
topoConf) throws NoSuchAlgorithmException;
 +
 +    public Credentials credentials(String stormId, IFn callback);
 +
 +    public void disconnect();
 +
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
index 1226c55,0000000..a9c4d89
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java
@@@ -1,212 -1,0 +1,212 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.cluster;
 +
 +import clojure.lang.APersistentMap;
 +import org.apache.curator.framework.state.ConnectionStateListener;
 +import org.apache.storm.callback.ZKStateChangedCallback;
 +import org.apache.storm.generated.*;
 +import org.apache.storm.pacemaker.PacemakerClient;
 +import org.apache.storm.utils.Utils;
 +import org.apache.zookeeper.data.ACL;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import java.util.List;
 +
 +public class PaceMakerStateStorage implements IStateStorage {
 +
 +    private static Logger LOG = 
LoggerFactory.getLogger(PaceMakerStateStorage.class);
 +
 +    private PacemakerClient pacemakerClient;
 +    private IStateStorage stateStorage;
 +    private static final int maxRetries = 10;
 +
 +    public PaceMakerStateStorage(PacemakerClient pacemakerClient, 
IStateStorage stateStorage) throws Exception {
 +        this.pacemakerClient = pacemakerClient;
 +        this.stateStorage = stateStorage;
 +    }
 +
 +    @Override
 +    public String register(ZKStateChangedCallback callback) {
 +        return stateStorage.register(callback);
 +    }
 +
 +    @Override
 +    public void unregister(String id) {
 +        stateStorage.unregister(id);
 +    }
 +
 +    @Override
 +    public String create_sequential(String path, byte[] data, List<ACL> acls) 
{
 +        return stateStorage.create_sequential(path, data, acls);
 +    }
 +
 +    @Override
 +    public void mkdirs(String path, List<ACL> acls) {
 +        stateStorage.mkdirs(path, acls);
 +    }
 +
 +    @Override
 +    public void delete_node(String path) {
 +        stateStorage.delete_node(path);
 +    }
 +
 +    @Override
 +    public void set_ephemeral_node(String path, byte[] data, List<ACL> acls) {
 +        stateStorage.set_ephemeral_node(path, data, acls);
 +    }
 +
 +    @Override
 +    public Integer get_version(String path, boolean watch) throws Exception {
 +        return stateStorage.get_version(path, watch);
 +    }
 +
 +    @Override
 +    public boolean node_exists(String path, boolean watch) {
 +        return stateStorage.node_exists(path, watch);
 +    }
 +
 +    @Override
 +    public List<String> get_children(String path, boolean watch) {
 +        return stateStorage.get_children(path, watch);
 +    }
 +
 +    @Override
 +    public void close() {
 +        stateStorage.close();
 +        pacemakerClient.close();
 +    }
 +
 +    @Override
 +    public void set_data(String path, byte[] data, List<ACL> acls) {
 +        stateStorage.set_data(path, data, acls);
 +    }
 +
 +    @Override
 +    public byte[] get_data(String path, boolean watch) {
 +        return stateStorage.get_data(path, watch);
 +    }
 +
 +    @Override
 +    public APersistentMap get_data_with_version(String path, boolean watch) {
 +        return stateStorage.get_data_with_version(path, watch);
 +    }
 +
 +    @Override
 +    public void set_worker_hb(String path, byte[] data, List<ACL> acls) {
 +        int retry = maxRetries;
 +        while (true) {
 +            try {
 +                HBPulse hbPulse = new HBPulse();
 +                hbPulse.set_id(path);
 +                hbPulse.set_details(data);
 +                HBMessage message = new 
HBMessage(HBServerMessageType.SEND_PULSE, HBMessageData.pulse(hbPulse));
 +                HBMessage response = pacemakerClient.send(message);
 +                if (response.get_type() != 
HBServerMessageType.SEND_PULSE_RESPONSE) {
 +                    throw new HBExecutionException("Invalid Response Type");
 +                }
 +                LOG.debug("Successful set_worker_hb");
 +                break;
 +            } catch (Exception e) {
 +                if (retry <= 0) {
 +                    throw Utils.wrapInRuntime(e);
 +                }
 +                LOG.error("{} Failed to set_worker_hb. Will make {} more 
attempts.", e.getMessage(), retry--);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public byte[] get_worker_hb(String path, boolean watch) {
 +        int retry = maxRetries;
 +        while (true) {
 +            try {
 +                HBMessage message = new 
HBMessage(HBServerMessageType.GET_PULSE, HBMessageData.path(path));
 +                HBMessage response = pacemakerClient.send(message);
 +                if (response.get_type() != 
HBServerMessageType.GET_PULSE_RESPONSE) {
 +                    throw new HBExecutionException("Invalid Response Type");
 +                }
 +                LOG.debug("Successful get_worker_hb");
 +                return response.get_data().get_pulse().get_details();
 +            } catch (Exception e) {
 +                if (retry <= 0) {
 +                    throw Utils.wrapInRuntime(e);
 +                }
 +                LOG.error("{} Failed to get_worker_hb. Will make {} more 
attempts.", e.getMessage(), retry--);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public List<String> get_worker_hb_children(String path, boolean watch) {
 +        int retry = maxRetries;
 +        while (true) {
 +            try {
-                 HBMessage message = new 
HBMessage(HBServerMessageType.GET_PULSE, HBMessageData.path(path));
++                HBMessage message = new 
HBMessage(HBServerMessageType.GET_ALL_NODES_FOR_PATH, HBMessageData.path(path));
 +                HBMessage response = pacemakerClient.send(message);
 +                if (response.get_type() != 
HBServerMessageType.GET_ALL_NODES_FOR_PATH_RESPONSE) {
 +                    throw new HBExecutionException("Invalid Response Type");
 +                }
 +                LOG.debug("Successful get_worker_hb");
 +                return response.get_data().get_nodes().get_pulseIds();
 +            } catch (Exception e) {
 +                if (retry <= 0) {
 +                    throw Utils.wrapInRuntime(e);
 +                }
 +                LOG.error("{} Failed to get_worker_hb_children. Will make {} 
more attempts.", e.getMessage(), retry--);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public void delete_worker_hb(String path) {
 +        int retry = maxRetries;
 +        while (true) {
 +            try {
-                 HBMessage message = new 
HBMessage(HBServerMessageType.GET_PULSE, HBMessageData.path(path));
++                HBMessage message = new 
HBMessage(HBServerMessageType.DELETE_PATH, HBMessageData.path(path));
 +                HBMessage response = pacemakerClient.send(message);
 +                if (response.get_type() != 
HBServerMessageType.DELETE_PATH_RESPONSE) {
 +                    throw new HBExecutionException("Invalid Response Type");
 +                }
 +                LOG.debug("Successful get_worker_hb");
 +                break;
 +            } catch (Exception e) {
 +                if (retry <= 0) {
 +                    throw Utils.wrapInRuntime(e);
 +                }
 +                LOG.error("{} Failed to delete_worker_hb. Will make {} more 
attempts.", e.getMessage(), retry--);
 +            }
 +        }
 +    }
 +
 +    @Override
 +    public void add_listener(ConnectionStateListener listener) {
 +        stateStorage.add_listener(listener);
 +    }
 +
 +    @Override
 +    public void sync_path(String path) {
 +        stateStorage.sync_path(path);
 +    }
 +
 +    @Override
 +    public void delete_node_blobstore(String path, String nimbusHostPortInfo) 
{
 +        stateStorage.delete_node_blobstore(path, nimbusHostPortInfo);
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
----------------------------------------------------------------------
diff --cc storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
index c2477d6,0000000..110da41
mode 100644,000000..100644
--- a/storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
+++ b/storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java
@@@ -1,28 -1,0 +1,28 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + * http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.cluster;
 +
 +import clojure.lang.APersistentMap;
 +import java.util.List;
 +import org.apache.zookeeper.data.ACL;
 +
 +public interface StateStorageFactory {
-     
++
 +    IStateStorage mkStore(APersistentMap config, APersistentMap auth_conf, 
List<ACL> acls, ClusterStateContext context);
 +
 +}

Reply via email to