Merge branch 'master' into ClusterUtils Conflicts: storm-core/src/clj/org/apache/storm/cluster.clj storm-core/src/clj/org/apache/storm/cluster_state/zookeeper_state_factory.clj storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj storm-core/src/clj/org/apache/storm/daemon/common.clj storm-core/src/clj/org/apache/storm/daemon/executor.clj storm-core/src/clj/org/apache/storm/daemon/nimbus.clj storm-core/src/clj/org/apache/storm/daemon/supervisor.clj storm-core/src/clj/org/apache/storm/stats.clj storm-core/src/clj/org/apache/storm/testing.clj storm-core/src/clj/org/apache/storm/thrift.clj storm-core/src/clj/org/apache/storm/util.clj storm-core/src/clj/org/apache/storm/zookeeper.clj storm-core/test/clj/integration/org/apache/storm/integration_test.clj storm-core/test/clj/org/apache/storm/cluster_test.clj storm-core/test/clj/org/apache/storm/nimbus_test.clj storm-core/test/clj/org/apache/storm/supervisor_test.clj
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/9a8962de Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/9a8962de Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/9a8962de Branch: refs/heads/master Commit: 9a8962de9c80fc6a5388fd6d63ab225268530adf Parents: 2ee8bec 12ceb09 Author: xiaojian.fxj <xiaojian....@alibaba-inc.com> Authored: Mon Feb 15 15:13:33 2016 +0800 Committer: xiaojian.fxj <xiaojian....@alibaba-inc.com> Committed: Mon Feb 15 22:15:21 2016 +0800 ---------------------------------------------------------------------- CHANGELOG.md | 16 + README.markdown | 1 + bin/storm-config.cmd | 6 +- bin/storm.cmd | 47 +- bin/storm.py | 8 +- conf/defaults.yaml | 4 + dev-tools/travis/travis-script.sh | 4 +- .../starter/trident/TridentMapExample.java | 123 +++ external/sql/storm-sql-core/pom.xml | 9 + external/storm-elasticsearch/pom.xml | 2 + .../storm/hbase/security/HBaseSecurityUtil.java | 36 +- .../jvm/org/apache/storm/kafka/KafkaSpout.java | 8 +- .../jvm/org/apache/storm/kafka/KafkaUtils.java | 44 +- .../apache/storm/kafka/PartitionManager.java | 42 +- .../kafka/trident/TridentKafkaEmitter.java | 23 +- external/storm-mqtt/core/pom.xml | 4 +- log4j2/cluster.xml | 2 +- log4j2/worker.xml | 2 +- pom.xml | 9 +- storm-core/pom.xml | 11 +- .../src/clj/org/apache/storm/LocalCluster.clj | 4 +- storm-core/src/clj/org/apache/storm/clojure.clj | 8 +- .../clj/org/apache/storm/command/blobstore.clj | 11 +- .../org/apache/storm/command/config_value.clj | 25 - .../org/apache/storm/command/dev_zookeeper.clj | 6 +- .../clj/org/apache/storm/command/get_errors.clj | 12 +- .../apache/storm/command/shell_submission.clj | 4 +- storm-core/src/clj/org/apache/storm/config.clj | 18 +- .../src/clj/org/apache/storm/converter.clj | 14 +- .../src/clj/org/apache/storm/daemon/acker.clj | 21 +- .../src/clj/org/apache/storm/daemon/common.clj | 46 +- .../src/clj/org/apache/storm/daemon/drpc.clj | 25 +- .../clj/org/apache/storm/daemon/executor.clj | 530 +++++----- .../clj/org/apache/storm/daemon/logviewer.clj | 70 +- .../src/clj/org/apache/storm/daemon/nimbus.clj | 168 ++-- .../clj/org/apache/storm/daemon/supervisor.clj | 205 ++-- .../src/clj/org/apache/storm/daemon/task.clj | 2 +- .../src/clj/org/apache/storm/daemon/worker.clj | 78 +- .../src/clj/org/apache/storm/disruptor.clj | 10 +- storm-core/src/clj/org/apache/storm/event.clj | 2 +- .../src/clj/org/apache/storm/local_state.clj | 9 +- .../clj/org/apache/storm/messaging/loader.clj | 34 - .../clj/org/apache/storm/messaging/local.clj | 23 - .../org/apache/storm/pacemaker/pacemaker.clj | 7 +- .../storm/pacemaker/pacemaker_state_factory.clj | 122 --- .../clj/org/apache/storm/process_simulator.clj | 4 +- .../apache/storm/scheduler/DefaultScheduler.clj | 7 +- .../apache/storm/scheduler/EvenScheduler.clj | 23 +- .../storm/scheduler/IsolationScheduler.clj | 29 +- storm-core/src/clj/org/apache/storm/stats.clj | 82 +- storm-core/src/clj/org/apache/storm/testing.clj | 89 +- storm-core/src/clj/org/apache/storm/thrift.clj | 6 +- storm-core/src/clj/org/apache/storm/timer.clj | 12 +- .../clj/org/apache/storm/trident/testing.clj | 9 +- storm-core/src/clj/org/apache/storm/ui/core.clj | 99 +- .../src/clj/org/apache/storm/ui/helpers.clj | 14 +- storm-core/src/clj/org/apache/storm/util.clj | 925 +---------------- storm-core/src/jvm/org/apache/storm/Config.java | 39 + .../storm/cluster/PaceMakerStateStorage.java | 4 +- .../org/apache/storm/command/ConfigValue.java | 30 + .../storm/daemon/metrics/MetricsUtils.java | 108 ++ .../reporters/ConsolePreparableReporter.java | 76 ++ .../reporters/CsvPreparableReporter.java | 80 ++ .../reporters/JmxPreparableReporter.java | 70 ++ .../metrics/reporters/PreparableReporter.java | 32 + .../storm/logging/ThriftAccessLogger.java | 13 +- .../apache/storm/pacemaker/PacemakerClient.java | 5 + .../serialization/SerializationFactory.java | 17 +- .../staticmocking/MockedConfigUtils.java | 31 - .../jvm/org/apache/storm/trident/Stream.java | 87 +- .../storm/trident/operation/Consumer.java | 35 + .../trident/operation/FlatMapFunction.java | 37 + .../storm/trident/operation/MapFunction.java | 36 + .../operation/impl/ConsumerExecutor.java | 38 + .../operation/impl/FlatMapFunctionExecutor.java | 43 + .../operation/impl/MapFunctionExecutor.java | 41 + .../trident/planner/processor/MapProcessor.java | 87 ++ .../jvm/org/apache/storm/utils/ConfigUtils.java | 20 +- .../jvm/org/apache/storm/utils/Container.java | 11 +- .../jvm/org/apache/storm/utils/IPredicate.java | 27 + .../org/apache/storm/utils/NimbusClient.java | 2 +- .../utils/StormConnectionStateConverter.java | 44 + .../jvm/org/apache/storm/utils/TestUtils.java | 34 - .../src/jvm/org/apache/storm/utils/Time.java | 26 +- .../src/jvm/org/apache/storm/utils/Utils.java | 989 +++++++++++++++++-- .../storm/validation/ConfigValidation.java | 2 +- .../org/apache/storm/zookeeper/Zookeeper.java | 7 + .../org/apache/storm/integration_test.clj | 98 +- .../org/apache/storm/testing4j_test.clj | 37 +- .../apache/storm/trident/integration_test.clj | 15 +- .../test/clj/org/apache/storm/cluster_test.clj | 27 +- .../test/clj/org/apache/storm/drpc_test.clj | 23 +- .../clj/org/apache/storm/logviewer_test.clj | 267 ++--- .../storm/messaging/netty_integration_test.clj | 2 +- .../test/clj/org/apache/storm/nimbus_test.clj | 112 ++- .../storm/pacemaker_state_factory_test.clj | 74 +- .../scheduler/resource_aware_scheduler_test.clj | 21 +- .../apache/storm/security/auth/auth_test.clj | 11 +- .../authorizer/DRPCSimpleACLAuthorizer_test.clj | 2 +- .../BlowfishTupleSerializer_test.clj | 1 - .../clj/org/apache/storm/serialization_test.clj | 23 +- .../clj/org/apache/storm/supervisor_test.clj | 649 ++++++------ .../clj/org/apache/storm/transactional_test.clj | 18 + .../clj/org/apache/storm/trident/state_test.clj | 5 +- .../clj/org/apache/storm/trident/tuple_test.clj | 15 +- .../test/clj/org/apache/storm/utils_test.clj | 16 +- .../test/clj/org/apache/storm/worker_test.clj | 1 - .../staticmocking/ConfigUtilsInstaller.java | 38 + .../utils/staticmocking/UtilsInstaller.java | 38 + .../storm/utils/staticmocking/package-info.java | 95 ++ 110 files changed, 4199 insertions(+), 2614 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/conf/defaults.yaml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj index 7be526d,657e242..ab5cbed --- a/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj +++ b/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj @@@ -14,7 -14,8 +14,8 @@@ ;; See the License for the specific language governing permissions and ;; limitations under the License. (ns org.apache.storm.command.dev-zookeeper + (:import [org.apache.storm.utils Utils]) - (:use [org.apache.storm zookeeper util config]) + (:use [org.apache.storm util config]) (:import [org.apache.storm.utils ConfigUtils]) (:import [org.apache.storm.zookeeper Zookeeper]) (:gen-class)) http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/command/shell_submission.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/command/shell_submission.clj index 3978d2f,0253338..870e7f6 --- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj +++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj @@@ -15,8 -15,9 +15,9 @@@ ;; limitations under the License. (ns org.apache.storm.command.shell-submission (:import [org.apache.storm StormSubmitter] + [org.apache.storm.utils Utils] [org.apache.storm.zookeeper Zookeeper]) - (:use [org.apache.storm thrift util config log zookeeper]) + (:use [org.apache.storm thrift util config log]) (:require [clojure.string :as str]) (:import [org.apache.storm.utils ConfigUtils]) (:gen-class)) http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/converter.clj ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/daemon/common.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/common.clj index b144f40,eb1ec1e..db342d2 --- a/storm-core/src/clj/org/apache/storm/daemon/common.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj @@@ -15,17 -15,19 +15,20 @@@ ;; limitations under the License. (ns org.apache.storm.daemon.common (:use [org.apache.storm log config util]) - (:import [org.apache.storm.generated StormTopology + (:import [org.apache.storm.generated StormTopology NodeInfo InvalidTopologyException GlobalStreamId] - [org.apache.storm.utils ThriftTopologyUtils]) - (:import [org.apache.storm.utils Utils ConfigUtils]) + [org.apache.storm.utils Utils ConfigUtils IPredicate ThriftTopologyUtils] + [org.apache.storm.daemon.metrics.reporters PreparableReporter] + [com.codahale.metrics MetricRegistry]) + (:import [org.apache.storm.daemon.metrics MetricsUtils]) (:import [org.apache.storm.task WorkerTopologyContext]) (:import [org.apache.storm Constants]) + (:import [org.apache.storm.cluster StormClusterStateImpl]) (:import [org.apache.storm.metric SystemBolt]) (:import [org.apache.storm.metric EventLoggerBolt]) - (:import [org.apache.storm.security.auth IAuthorizer]) - (:import [java.io InterruptedIOException]) + (:import [org.apache.storm.security.auth IAuthorizer]) + (:import [java.io InterruptedIOException] + [org.json.simple JSONValue]) (:require [clojure.set :as set]) (:require [org.apache.storm.daemon.acker :as acker]) (:require [org.apache.storm.thrift :as thrift]) @@@ -73,12 -83,10 +84,11 @@@ (defn new-executor-stats [] (ExecutorStats. 0 0 0 0 0)) + (defn get-storm-id [storm-cluster-state storm-name] - (let [active-storms (.activeStorms storm-cluster-state)] - (find-first - #(= storm-name (.get_name (.stormBase storm-cluster-state % nil))) - active-storms) - (let [active-storms (.active-storms storm-cluster-state) - pred (reify IPredicate (test [this x] (= storm-name (:storm-name (.storm-base storm-cluster-state x nil)))))] ++ (let [active-storms (.activeStorms storm-cluster-state) ++ pred (reify IPredicate (test [this x] (= storm-name (.get_name (.stormBase storm-cluster-state x nil)))))] + (Utils/findOne pred active-storms) )) (defn topology-bases [storm-cluster-state] http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/executor.clj index 49ae6cf,e2380b7..33b89ed --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@@ -34,10 -34,13 +34,12 @@@ (:import [org.apache.storm.daemon Shutdownable]) (:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric]) (:import [org.apache.storm Config Constants]) - (:import [org.apache.storm.cluster ClusterStateContext DaemonType]) + (:import [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils]) (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping]) - (:import [java.util.concurrent ConcurrentLinkedQueue]) + (:import [java.lang Thread Thread$UncaughtExceptionHandler] + [java.util.concurrent ConcurrentLinkedQueue] + [org.json.simple JSONValue]) - (:require [org.apache.storm [thrift :as thrift] - [cluster :as cluster] [disruptor :as disruptor] [stats :as stats]]) + (:require [org.apache.storm [thrift :as thrift] [disruptor :as disruptor] [stats :as stats]]) (:require [org.apache.storm.daemon [task :as task]]) (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics]) (:require [clojure.set :as set])) @@@ -206,9 -211,9 +210,9 @@@ (swap! interval-errors inc) (when (<= @interval-errors max-per-interval) - (cluster/report-error (:storm-cluster-state executor) (:storm-id executor) (:component-id executor) + (.reportError (:storm-cluster-state executor) (:storm-id executor) (:component-id executor) - (hostname storm-conf) + (Utils/hostname storm-conf) - (.getThisWorkerPort (:worker-context executor)) error) + (long (.getThisWorkerPort (:worker-context executor))) error) )))) ;; in its own function so that it can be mocked out by tracked topologies http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index daf5e45,710cd83..6af1b81 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@@ -38,9 -39,9 +39,9 @@@ (:import [org.apache.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails]) (:import [org.apache.storm.nimbus NimbusInfo]) - (:import [org.apache.storm.utils TimeCacheMap TimeCacheMap$ExpiredCallback Utils ConfigUtils TupleUtils ThriftTopologyUtils + (:import [org.apache.storm.utils TimeCacheMap Time TimeCacheMap$ExpiredCallback Utils ConfigUtils TupleUtils ThriftTopologyUtils BufferFileInputStream BufferInputStream]) - (:import [org.apache.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo + (:import [org.apache.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo ClusterWorkerHeartbeat ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo TopologyHistoryInfo ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice SettableBlobMeta ReadableBlobMeta @@@ -407,10 -414,10 +409,10 @@@ [storm-cluster-state] (let [assignments (.assignments storm-cluster-state nil)] - (defaulted + (or (apply merge-with set/union (for [a assignments - [_ [node port]] (-> (.assignment-info storm-cluster-state a nil) :executor->node+port)] + [_ [node port]] (-> (clojurify-assignment (.assignmentInfo storm-cluster-state a nil)) :executor->node+port)] {node #{port}} )) {}) @@@ -988,10 -1002,10 +1002,10 @@@ topology (system-topology! storm-conf (read-storm-topology storm-id blob-store)) num-executors (->> (all-components topology) (map-val num-start-executors))] (log-message "Activating " storm-name ": " storm-id) - (.activate-storm! storm-cluster-state + (.activateStorm storm-cluster-state storm-id - (StormBase. storm-name + (thriftify-storm-base (StormBase. storm-name - (current-time-secs) + (Time/currentTimeSecs) {:type topology-initial-status} (storm-conf TOPOLOGY-WORKERS) num-executors @@@ -1137,9 -1152,9 +1152,9 @@@ (when-not (empty? to-cleanup-ids) (doseq [id to-cleanup-ids] (log-message "Cleaning up " id) - (.teardown-heartbeats! storm-cluster-state id) - (.teardown-topology-errors! storm-cluster-state id) + (.teardownHeartbeats storm-cluster-state id) + (.teardownTopologyErrors storm-cluster-state id) - (rmr (ConfigUtils/masterStormDistRoot conf id)) + (Utils/forceDelete (ConfigUtils/masterStormDistRoot conf id)) (blob-rm-topology-keys id blob-store storm-cluster-state) (swap! (:heartbeats-cache nimbus) dissoc id))))) (log-message "not a leader, skipping cleanup"))) @@@ -1811,8 -1830,8 +1838,8 @@@ (.set_used_cpu sup-sum used-cpu)) (when-let [version (:version info)] (.set_version sup-sum version)) sup-sum)) - nimbus-uptime ((:uptime nimbus)) + nimbus-uptime (. (:uptime nimbus) upTime) - bases (topology-bases storm-cluster-state) + bases (nimbus-topology-bases storm-cluster-state) nimbuses (.nimbuses storm-cluster-state) ;;update the isLeader field for each nimbus summary http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index 3a83d03,ae9e92f..c1f058f --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@@ -16,14 -16,15 +16,15 @@@ (ns org.apache.storm.daemon.supervisor (:import [java.io File IOException FileOutputStream]) (:import [org.apache.storm.scheduler ISupervisor] - [org.apache.storm.utils LocalState Time Utils ConfigUtils] + [org.apache.storm.utils LocalState Time Utils Utils$ExitCodeCallable + ConfigUtils] [org.apache.storm.daemon Shutdownable] [org.apache.storm Constants] - [org.apache.storm.cluster ClusterStateContext DaemonType] + [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils] [java.net JarURLConnection] - [java.net URI] + [java.net URI URLDecoder] [org.apache.commons.io FileUtils]) - (:use [org.apache.storm config util log timer local-state]) + (:use [org.apache.storm config util log timer local-state converter]) (:import [org.apache.storm.generated AuthorizationException KeyNotFoundException WorkerResources]) (:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo]) (:import [java.nio.file Files StandardCopyOption]) @@@ -315,12 -319,14 +321,12 @@@ :shared-context shared-context :isupervisor isupervisor :active (atom true) - :uptime (uptime-computer) + :uptime (Utils/makeUptimeComputer) :version STORM-VERSION :worker-thread-pids-atom (atom {}) - :storm-cluster-state (cluster/mk-storm-cluster-state conf :acls (when - (Utils/isZkAuthenticationConfiguredStormServer - conf) - SUPERVISOR-ZK-ACLS) - :context (ClusterStateContext. DaemonType/SUPERVISOR)) + :storm-cluster-state (ClusterUtils/mkStormClusterState conf (when (Utils/isZkAuthenticationConfiguredStormServer conf) + SUPERVISOR-ZK-ACLS) + (ClusterStateContext. DaemonType/SUPERVISOR)) :local-state (ConfigUtils/supervisorState conf) :supervisor-id (.getSupervisorId isupervisor) :assignment-id (.getAssignmentId isupervisor) @@@ -777,19 -791,19 +792,19 @@@ synchronize-blobs-fn (update-blobs-for-all-topologies-fn supervisor) downloaded-storm-ids (set (read-downloaded-storm-ids conf)) run-profiler-actions-fn (mk-run-profiler-actions-for-all-topologies supervisor) - heartbeat-fn (fn [] (.supervisor-heartbeat! + heartbeat-fn (fn [] (.supervisorHeartbeat (:storm-cluster-state supervisor) (:supervisor-id supervisor) - (thriftify-supervisor-info (->SupervisorInfo (current-time-secs) - (->SupervisorInfo (Time/currentTimeSecs) ++ (thriftify-supervisor-info (->SupervisorInfo (Time/currentTimeSecs) (:my-hostname supervisor) (:assignment-id supervisor) (keys @(:curr-assignment supervisor)) ;; used ports (.getMetadata isupervisor) (conf SUPERVISOR-SCHEDULER-META) - ((:uptime supervisor)) + (. (:uptime supervisor) upTime) (:version supervisor) - (mk-supervisor-capacities conf))))] + (mk-supervisor-capacities conf)))))] (heartbeat-fn) ;; should synchronize supervisor so it doesn't launch anything after being down (optimization) @@@ -1162,7 -1183,7 +1184,8 @@@ (.readBlobTo blob-store (ConfigUtils/masterStormConfKey storm-id) (FileOutputStream. (ConfigUtils/supervisorStormConfPath tmproot)) nil) (finally (.shutdown blob-store))) -- (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) ++ (try (FileUtils/moveDirectory (File. tmproot) (File. stormroot)) (catch Exception e)) ++ (setup-storm-code-dir conf (clojurify-structure (ConfigUtils/readSupervisorStormConf conf storm-id)) stormroot) (let [classloader (.getContextClassLoader (Thread/currentThread)) resources-jar (resources-jar) http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/worker.clj index ae5be57,fe8cfae..9863427 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@@ -19,16 -19,18 +19,18 @@@ (:require [clj-time.core :as time]) (:require [clj-time.coerce :as coerce]) (:require [org.apache.storm.daemon [executor :as executor]]) - (:require [org.apache.storm [disruptor :as disruptor] [cluster :as cluster]]) + (:require [org.apache.storm [disruptor :as disruptor]]) (:require [clojure.set :as set]) - (:require [org.apache.storm.messaging.loader :as msg-loader]) (:import [java.util.concurrent Executors] - [org.apache.storm.hooks IWorkerHook BaseWorkerHook]) - (:import [java.util ArrayList HashMap]) - (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue]) + [org.apache.storm.hooks IWorkerHook BaseWorkerHook] + [uk.org.lidalia.sysoutslf4j.context SysOutOverSLF4J]) + (:import [java.util ArrayList HashMap] + [java.util.concurrent.locks ReentrantReadWriteLock]) + (:import [org.apache.commons.io FileUtils]) + (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue Time]) (:import [org.apache.storm.grouping LoadMapping]) (:import [org.apache.storm.messaging TransportFactory]) - (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status]) + (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status DeserializingConnectionCallback]) (:import [org.apache.storm.daemon Shutdownable]) (:import [org.apache.storm.serialization KryoTupleSerializer]) (:import [org.apache.storm.generated StormTopology]) http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj index a36da3a,be4361a..0000000 deleted file mode 100644,100644 --- a/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj +++ /dev/null @@@ -1,122 -1,141 +1,0 @@@ --;; Licensed to the Apache Software Foundation (ASF) under one --;; or more contributor license agreements. See the NOTICE file --;; distributed with this work for additional information --;; regarding copyright ownership. The ASF licenses this file --;; to you under the Apache License, Version 2.0 (the --;; "License"); you may not use this file except in compliance --;; with the License. You may obtain a copy of the License at --;; --;; http://www.apache.org/licenses/LICENSE-2.0 --;; --;; Unless required by applicable law or agreed to in writing, software --;; distributed under the License is distributed on an "AS IS" BASIS, --;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --;; See the License for the specific language governing permissions and --;; limitations under the License. -- --(ns org.apache.storm.pacemaker.pacemaker-state-factory -- (:require [org.apache.storm.pacemaker pacemaker] - [org.apache.storm.cluster-state [zookeeper-state-factory :as zk-factory]] -- [org.apache.storm -- [config :refer :all] - [cluster :refer :all] -- [log :refer :all] -- [util :as util]]) -- (:import [org.apache.storm.generated -- HBExecutionException HBServerMessageType HBMessage -- HBMessageData HBPulse] - [org.apache.storm.cluster ZKStateStorage ClusterUtils IStateStorage] - [org.apache.storm.cluster_state zookeeper_state_factory] - [org.apache.storm.cluster ClusterState] -- [org.apache.storm.pacemaker PacemakerClient]) -- (:gen-class - :implements [org.apache.storm.cluster.StateStorageFactory])) - :implements [org.apache.storm.cluster.ClusterStateFactory])) -- --;; So we can mock the client for testing --(defn makeClient [conf] -- (PacemakerClient. conf)) -- --(defn makeZKState [conf auth-conf acls context] - (ClusterUtils/mkStateStorage conf auth-conf acls context)) - (.mkState (zookeeper_state_factory.) conf auth-conf acls context)) -- --(def max-retries 10) - -(defn retry-on-exception - "Retries specific function on exception based on retries count" - [retries task-description f & args] - (let [res (try {:value (apply f args)} - (catch Exception e - (if (<= 0 retries) - (throw e) - {:exception e})))] - (if (:exception res) - (do - (log-error (:exception res) (str "Failed to " task-description ". Will make [" retries "] more attempts.")) - (recur (dec retries) task-description f args)) - (do - (log-debug (str "Successful " task-description ".")) - (:value res))))) -- --(defn -mkState [this conf auth-conf acls context] -- (let [zk-state (makeZKState conf auth-conf acls context) -- pacemaker-client (makeClient conf)] -- -- (reify - IStateStorage - ClusterState -- ;; Let these pass through to the zk-state. We only want to handle heartbeats. -- (register [this callback] (.register zk-state callback)) -- (unregister [this callback] (.unregister zk-state callback)) -- (set_ephemeral_node [this path data acls] (.set_ephemeral_node zk-state path data acls)) -- (create_sequential [this path data acls] (.create_sequential zk-state path data acls)) -- (set_data [this path data acls] (.set_data zk-state path data acls)) -- (delete_node [this path] (.delete_node zk-state path)) -- (delete_node_blobstore [this path nimbus-host-port-info] (.delete_node_blobstore zk-state path nimbus-host-port-info)) -- (get_data [this path watch?] (.get_data zk-state path watch?)) -- (get_data_with_version [this path watch?] (.get_data_with_version zk-state path watch?)) -- (get_version [this path watch?] (.get_version zk-state path watch?)) -- (get_children [this path watch?] (.get_children zk-state path watch?)) -- (mkdirs [this path acls] (.mkdirs zk-state path acls)) -- (node_exists [this path watch?] (.node_exists zk-state path watch?)) -- (add_listener [this listener] (.add_listener zk-state listener)) -- (sync_path [this path] (.sync_path zk-state path)) -- -- (set_worker_hb [this path data acls] - (util/retry-on-exception - (retry-on-exception -- max-retries -- "set_worker_hb" -- #(let [response -- (.send pacemaker-client -- (HBMessage. HBServerMessageType/SEND_PULSE -- (HBMessageData/pulse -- (doto (HBPulse.) -- (.set_id path) -- (.set_details data)))))] -- (if (= (.get_type response) HBServerMessageType/SEND_PULSE_RESPONSE) -- :ok -- (throw (HBExecutionException. "Invalid Response Type")))))) -- -- (delete_worker_hb [this path] - (util/retry-on-exception - (retry-on-exception -- max-retries -- "delete_worker_hb" -- #(let [response -- (.send pacemaker-client -- (HBMessage. HBServerMessageType/DELETE_PATH -- (HBMessageData/path path)))] -- (if (= (.get_type response) HBServerMessageType/DELETE_PATH_RESPONSE) -- :ok -- (throw (HBExecutionException. "Invalid Response Type")))))) -- -- (get_worker_hb [this path watch?] - (util/retry-on-exception - (retry-on-exception -- max-retries -- "get_worker_hb" -- #(let [response -- (.send pacemaker-client -- (HBMessage. HBServerMessageType/GET_PULSE -- (HBMessageData/path path)))] -- (if (= (.get_type response) HBServerMessageType/GET_PULSE_RESPONSE) -- (try -- (.get_details (.get_pulse (.get_data response))) -- (catch Exception e -- (throw (HBExecutionException. (.toString e))))) -- (throw (HBExecutionException. "Invalid Response Type")))))) -- -- (get_worker_hb_children [this path watch?] - (util/retry-on-exception - (retry-on-exception -- max-retries -- "get_worker_hb_children" -- #(let [response -- (.send pacemaker-client -- (HBMessage. HBServerMessageType/GET_ALL_NODES_FOR_PATH -- (HBMessageData/path path)))] -- (if (= (.get_type response) HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE) -- (try -- (into [] (.get_pulseIds (.get_nodes (.get_data response)))) -- (catch Exception e -- (throw (HBExecutionException. (.toString e))))) -- (throw (HBExecutionException. "Invalid Response Type")))))) -- -- (close [this] -- (.close zk-state) -- (.close pacemaker-client))))) http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/stats.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/stats.clj index 0bf1757,8b37fc3..8632ed3 --- a/storm-core/src/clj/org/apache/storm/stats.clj +++ b/storm-core/src/clj/org/apache/storm/stats.clj @@@ -24,8 -24,8 +24,9 @@@ ExecutorAggregateStats SpecificAggregateStats SpoutAggregateStats TopologyPageInfo TopologyStats]) (:import [org.apache.storm.utils Utils]) + (:import [org.apache.storm.cluster StormClusterStateImpl]) - (:import [org.apache.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric]) + (:import [org.apache.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric] + [java.util Collection]) (:use [org.apache.storm log util]) (:use [clojure.math.numeric-tower :only [ceil]])) http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/testing.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/testing.clj index 5a0bdf2,c872742..eef7754 --- a/storm-core/src/clj/org/apache/storm/testing.clj +++ b/storm-core/src/clj/org/apache/storm/testing.clj @@@ -44,11 -45,12 +45,12 @@@ (:import [org.apache.storm.transactional.partitioned PartitionedTransactionalSpoutExecutor]) (:import [org.apache.storm.tuple Tuple]) (:import [org.apache.storm.generated StormTopology]) - (:import [org.apache.storm.task TopologyContext]) + (:import [org.apache.storm.task TopologyContext] + (org.apache.storm.messaging IContext) + [org.json.simple JSONValue]) - (:require [org.apache.storm [zookeeper :as zk]]) + (:import [org.apache.storm.cluster ZKStateStorage ClusterStateContext StormClusterStateImpl ClusterUtils]) - (:require [org.apache.storm.messaging.loader :as msg-loader]) (:require [org.apache.storm.daemon.acker :as acker]) - (:use [org.apache.storm cluster util thrift config log local-state])) + (:use [org.apache.storm util thrift config log local-state converter])) (defn feeder-spout [fields] http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/thrift.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/thrift.clj index 4dc21f9,779c1d1..7aab729 --- a/storm-core/src/clj/org/apache/storm/thrift.clj +++ b/storm-core/src/clj/org/apache/storm/thrift.clj @@@ -29,8 -29,9 +29,9 @@@ (:import [org.apache.storm.grouping CustomStreamGrouping]) (:import [org.apache.storm.topology TopologyBuilder]) (:import [org.apache.storm.clojure RichShellBolt RichShellSpout]) - (:import [org.apache.thrift.transport TTransport]) + (:import [org.apache.thrift.transport TTransport] + (org.json.simple JSONValue)) - (:use [org.apache.storm util config log zookeeper])) + (:use [org.apache.storm util config log])) (defn instantiate-java-object [^JavaObject obj] http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/ui/core.clj ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/clj/org/apache/storm/util.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/util.clj index 165d8ee,f685d12..72778bb --- a/storm-core/src/clj/org/apache/storm/util.clj +++ b/storm-core/src/clj/org/apache/storm/util.clj @@@ -20,9 -20,8 +20,9 @@@ (:import [java.io FileReader FileNotFoundException]) (:import [java.nio.file Paths]) (:import [org.apache.storm Config]) - (:import [org.apache.storm.generated ErrorInfo]) - (:import [org.apache.storm.utils Time Container ClojureTimerTask Utils - MutableObject MutableInt]) ++ (:import [org.apache.storm.generated ErrorInfo]) + (:import [org.apache.storm.utils Time ClojureTimerTask Utils + MutableObject]) (:import [org.apache.storm.security.auth NimbusPrincipal]) (:import [javax.security.auth Subject]) (:import [java.util UUID Random ArrayList List Collections]) @@@ -262,58 -163,9 +164,19 @@@ (instance? Boolean x) (boolean x) true x)) s)) +; move this func form convert.clj due to cyclic load dependency +(defn clojurify-error [^ErrorInfo error] + (if error + { + :error (.get_error error) + :time-secs (.get_error_time_secs error) + :host (.get_host error) + :port (.get_port error) + } + )) - (defmacro with-file-lock - [path & body] - `(let [f# (File. ~path) - _# (.createNewFile f#) - rf# (RandomAccessFile. f# "rw") - lock# (.. rf# (getChannel) (lock))] - (try - ~@body - (finally - (.release lock#) - (.close rf#))))) - - (defn tokenize-path - [^String path] - (let [toks (.split path "/")] - (vec (filter (complement empty?) toks)))) - - (defn assoc-conj - [m k v] - (merge-with concat m {k [v]})) - - ;; returns [ones in first set not in second, ones in second set not in first] - (defn set-delta - [old curr] - (let [s1 (set old) - s2 (set curr)] - [(set/difference s1 s2) (set/difference s2 s1)])) - - (defn parent-path - [path] - (let [toks (tokenize-path path)] - (str "/" (str/join "/" (butlast toks))))) - - (defn toks->path - [toks] - (str "/" (str/join "/" toks))) - - (defn normalize-path - [^String path] - (toks->path (tokenize-path path))) - + ;TODO: We're keeping this function around until all the code using it is properly tranlated to java + ;TODO: by properly having the for loop IN THE JAVA FUNCTION that originally used this function. (defn map-val [afn amap] (into {} http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java index 1226c55,0000000..a9c4d89 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java @@@ -1,212 -1,0 +1,212 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.cluster; + +import clojure.lang.APersistentMap; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.storm.callback.ZKStateChangedCallback; +import org.apache.storm.generated.*; +import org.apache.storm.pacemaker.PacemakerClient; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class PaceMakerStateStorage implements IStateStorage { + + private static Logger LOG = LoggerFactory.getLogger(PaceMakerStateStorage.class); + + private PacemakerClient pacemakerClient; + private IStateStorage stateStorage; + private static final int maxRetries = 10; + + public PaceMakerStateStorage(PacemakerClient pacemakerClient, IStateStorage stateStorage) throws Exception { + this.pacemakerClient = pacemakerClient; + this.stateStorage = stateStorage; + } + + @Override + public String register(ZKStateChangedCallback callback) { + return stateStorage.register(callback); + } + + @Override + public void unregister(String id) { + stateStorage.unregister(id); + } + + @Override + public String create_sequential(String path, byte[] data, List<ACL> acls) { + return stateStorage.create_sequential(path, data, acls); + } + + @Override + public void mkdirs(String path, List<ACL> acls) { + stateStorage.mkdirs(path, acls); + } + + @Override + public void delete_node(String path) { + stateStorage.delete_node(path); + } + + @Override + public void set_ephemeral_node(String path, byte[] data, List<ACL> acls) { + stateStorage.set_ephemeral_node(path, data, acls); + } + + @Override + public Integer get_version(String path, boolean watch) throws Exception { + return stateStorage.get_version(path, watch); + } + + @Override + public boolean node_exists(String path, boolean watch) { + return stateStorage.node_exists(path, watch); + } + + @Override + public List<String> get_children(String path, boolean watch) { + return stateStorage.get_children(path, watch); + } + + @Override + public void close() { + stateStorage.close(); + pacemakerClient.close(); + } + + @Override + public void set_data(String path, byte[] data, List<ACL> acls) { + stateStorage.set_data(path, data, acls); + } + + @Override + public byte[] get_data(String path, boolean watch) { + return stateStorage.get_data(path, watch); + } + + @Override + public APersistentMap get_data_with_version(String path, boolean watch) { + return stateStorage.get_data_with_version(path, watch); + } + + @Override + public void set_worker_hb(String path, byte[] data, List<ACL> acls) { + int retry = maxRetries; + while (true) { + try { + HBPulse hbPulse = new HBPulse(); + hbPulse.set_id(path); + hbPulse.set_details(data); + HBMessage message = new HBMessage(HBServerMessageType.SEND_PULSE, HBMessageData.pulse(hbPulse)); + HBMessage response = pacemakerClient.send(message); + if (response.get_type() != HBServerMessageType.SEND_PULSE_RESPONSE) { + throw new HBExecutionException("Invalid Response Type"); + } + LOG.debug("Successful set_worker_hb"); + break; + } catch (Exception e) { + if (retry <= 0) { + throw Utils.wrapInRuntime(e); + } + LOG.error("{} Failed to set_worker_hb. Will make {} more attempts.", e.getMessage(), retry--); + } + } + } + + @Override + public byte[] get_worker_hb(String path, boolean watch) { + int retry = maxRetries; + while (true) { + try { + HBMessage message = new HBMessage(HBServerMessageType.GET_PULSE, HBMessageData.path(path)); + HBMessage response = pacemakerClient.send(message); + if (response.get_type() != HBServerMessageType.GET_PULSE_RESPONSE) { + throw new HBExecutionException("Invalid Response Type"); + } + LOG.debug("Successful get_worker_hb"); + return response.get_data().get_pulse().get_details(); + } catch (Exception e) { + if (retry <= 0) { + throw Utils.wrapInRuntime(e); + } + LOG.error("{} Failed to get_worker_hb. Will make {} more attempts.", e.getMessage(), retry--); + } + } + } + + @Override + public List<String> get_worker_hb_children(String path, boolean watch) { + int retry = maxRetries; + while (true) { + try { - HBMessage message = new HBMessage(HBServerMessageType.GET_PULSE, HBMessageData.path(path)); ++ HBMessage message = new HBMessage(HBServerMessageType.GET_ALL_NODES_FOR_PATH, HBMessageData.path(path)); + HBMessage response = pacemakerClient.send(message); + if (response.get_type() != HBServerMessageType.GET_ALL_NODES_FOR_PATH_RESPONSE) { + throw new HBExecutionException("Invalid Response Type"); + } + LOG.debug("Successful get_worker_hb"); + return response.get_data().get_nodes().get_pulseIds(); + } catch (Exception e) { + if (retry <= 0) { + throw Utils.wrapInRuntime(e); + } + LOG.error("{} Failed to get_worker_hb_children. Will make {} more attempts.", e.getMessage(), retry--); + } + } + } + + @Override + public void delete_worker_hb(String path) { + int retry = maxRetries; + while (true) { + try { - HBMessage message = new HBMessage(HBServerMessageType.GET_PULSE, HBMessageData.path(path)); ++ HBMessage message = new HBMessage(HBServerMessageType.DELETE_PATH, HBMessageData.path(path)); + HBMessage response = pacemakerClient.send(message); + if (response.get_type() != HBServerMessageType.DELETE_PATH_RESPONSE) { + throw new HBExecutionException("Invalid Response Type"); + } + LOG.debug("Successful get_worker_hb"); + break; + } catch (Exception e) { + if (retry <= 0) { + throw Utils.wrapInRuntime(e); + } + LOG.error("{} Failed to delete_worker_hb. Will make {} more attempts.", e.getMessage(), retry--); + } + } + } + + @Override + public void add_listener(ConnectionStateListener listener) { + stateStorage.add_listener(listener); + } + + @Override + public void sync_path(String path) { + stateStorage.sync_path(path); + } + + @Override + public void delete_node_blobstore(String path, String nimbusHostPortInfo) { + stateStorage.delete_node_blobstore(path, nimbusHostPortInfo); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java index af0e8f3,34f3665..20d6deb --- a/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java +++ b/storm-core/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java @@@ -60,6 -60,6 +60,10 @@@ public class PacemakerClient implement private StormBoundedExponentialBackoffRetry backoff = new StormBoundedExponentialBackoffRetry(100, 5000, 20); private int retryTimes = 0; ++ //the constructor is invoked by pacemaker-state-factory-test ++ public PacemakerClient() { ++ bootstrap = new ClientBootstrap(); ++ } public PacemakerClient(Map config) { String host = (String)config.get(Config.PACEMAKER_HOST); @@@ -157,6 -157,7 +161,7 @@@ public String secretKey() { return secret; } - ++ public HBMessage checkCaptured() {return null;} public HBMessage send(HBMessage m) { waitUntilReady(); LOG.debug("Sending message: {}", m.toString()); http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/src/jvm/org/apache/storm/zookeeper/Zookeeper.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/test/clj/integration/org/apache/storm/integration_test.clj ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/test/clj/org/apache/storm/cluster_test.clj ---------------------------------------------------------------------- diff --cc storm-core/test/clj/org/apache/storm/cluster_test.clj index 39adb9e,b146cb0..6c32d54 --- a/storm-core/test/clj/org/apache/storm/cluster_test.clj +++ b/storm-core/test/clj/org/apache/storm/cluster_test.clj @@@ -22,11 -22,11 +22,11 @@@ (:import [org.mockito Mockito]) (:import [org.mockito.exceptions.base MockitoAssertionError]) (:import [org.apache.curator.framework CuratorFramework CuratorFrameworkFactory CuratorFrameworkFactory$Builder]) - (:import [org.apache.storm.utils Utils TestUtils ZookeeperAuthInfo ConfigUtils]) + (:import [org.apache.storm.utils Time Utils ZookeeperAuthInfo ConfigUtils]) - (:import [org.apache.storm.cluster ClusterState]) + (:import [org.apache.storm.cluster IStateStorage ZKStateStorage ClusterStateContext StormClusterStateImpl ClusterUtils]) (:import [org.apache.storm.zookeeper Zookeeper]) - (:import [org.apache.storm.testing.staticmocking MockedZookeeper]) - (:require [org.apache.storm [zookeeper :as zk]]) + (:import [org.apache.storm.callback ZKStateChangedCallback]) + (:import [org.apache.storm.testing.staticmocking MockedZookeeper MockedCluster]) (:require [conjure.core]) (:use [conjure core]) (:use [clojure test]) @@@ -39,14 -39,18 +39,18 @@@ (defn mk-state ([zk-port] (let [conf (mk-config zk-port)] - (mk-distributed-cluster-state conf :auth-conf conf))) + (ClusterUtils/mkStateStorage conf conf nil (ClusterStateContext.)))) ([zk-port cb] - (let [ret (mk-state zk-port)] - (.register ret cb) - ret ))) + (let [ret (mk-state zk-port)] + (.register ret cb) + ret))) -(defn mk-storm-state [zk-port] (mk-storm-cluster-state (mk-config zk-port))) +(defn mk-storm-state [zk-port] (ClusterUtils/mkStormClusterState (mk-config zk-port) nil (ClusterStateContext.))) + (defn barr + [& vals] + (byte-array (map byte vals))) + (deftest test-basics (with-inprocess-zookeeper zk-port (let [state (mk-state zk-port)] @@@ -242,27 -244,26 +246,32 @@@ (is (.contains (:error error) target)) ))) ++(defn- stringify-error [error] ++ (let [result (java.io.StringWriter.) ++ printer (java.io.PrintWriter. result)] ++ (.printStackTrace error printer) ++ (.toString result))) (deftest test-storm-cluster-state-errors (with-inprocess-zookeeper zk-port (with-simulated-time (let [state (mk-storm-state zk-port)] - (.reportError state "a" "1" (local-hostname) 6700 (stringify-error (RuntimeException.))) - (.report-error state "a" "1" (Utils/localHostname) 6700 (RuntimeException.)) ++ (.reportError state "a" "1" (Utils/localHostname) 6700 (stringify-error (RuntimeException.))) (validate-errors! state "a" "1" ["RuntimeException"]) (advance-time-secs! 1) - (.reportError state "a" "1" (local-hostname) 6700 (stringify-error (IllegalArgumentException.))) - (.report-error state "a" "1" (Utils/localHostname) 6700 (IllegalArgumentException.)) ++ (.reportError state "a" "1" (Utils/localHostname) 6700 (stringify-error (IllegalArgumentException.))) (validate-errors! state "a" "1" ["IllegalArgumentException" "RuntimeException"]) (doseq [i (range 10)] - (.reportError state "a" "2" (local-hostname) 6700 (stringify-error (RuntimeException.))) - (.report-error state "a" "2" (Utils/localHostname) 6700 (RuntimeException.)) ++ (.reportError state "a" "2" (Utils/localHostname) 6700 (stringify-error (RuntimeException.))) (advance-time-secs! 2)) (validate-errors! state "a" "2" (repeat 10 "RuntimeException")) (doseq [i (range 5)] - (.reportError state "a" "2" (local-hostname) 6700 (stringify-error (IllegalArgumentException.))) - (.report-error state "a" "2" (Utils/localHostname) 6700 (IllegalArgumentException.)) ++ (.reportError state "a" "2" (Utils/localHostname) 6700 (stringify-error (IllegalArgumentException.))) (advance-time-secs! 2)) (validate-errors! state "a" "2" (concat (repeat 5 "IllegalArgumentException") - (repeat 5 "RuntimeException") - )) + (repeat 5 "RuntimeException") + )) + (.disconnect state) )))) @@@ -300,12 -301,12 +309,12 @@@ (. (Mockito/when (.connectString builder (Mockito/anyString))) (thenReturn builder)) (. (Mockito/when (.connectionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder)) (. (Mockito/when (.sessionTimeoutMs builder (Mockito/anyInt))) (thenReturn builder)) - (TestUtils/testSetupBuilder builder (str zk-port "/") conf (ZookeeperAuthInfo. conf)) + (Utils/testSetupBuilder builder (str zk-port "/") conf (ZookeeperAuthInfo. conf)) (is (nil? - (try - (. (Mockito/verify builder) (authorization "digest" (.getBytes (conf STORM-ZOOKEEPER-AUTH-PAYLOAD)))) - (catch MockitoAssertionError e - e))))))) + (try + (. (Mockito/verify builder) (authorization "digest" (.getBytes (conf STORM-ZOOKEEPER-AUTH-PAYLOAD)))) + (catch MockitoAssertionError e + e))))))) (deftest test-storm-state-callbacks ;; TODO finish @@@ -313,17 -314,15 +322,17 @@@ (deftest test-cluster-state-default-acls (testing "The default ACLs are empty." - (let [zk-mock (Mockito/mock Zookeeper)] + (let [zk-mock (Mockito/mock Zookeeper) + curator-frameworke (reify CuratorFramework (^void close [this] nil))] ;; No need for when clauses because we just want to return nil (with-open [_ (MockedZookeeper. zk-mock)] - (stubbing [zk/mk-client (reify CuratorFramework (^void close [this] nil))] - (mk-distributed-cluster-state {}) - (.mkdirsImpl (Mockito/verify zk-mock (Mockito/times 1)) (Mockito/any) (Mockito/anyString) (Mockito/eq nil))))) - (stubbing [mk-distributed-cluster-state (reify ClusterState - (register [this callback] nil) - (mkdirs [this path acls] nil))] - (mk-storm-cluster-state {}) - (verify-call-times-for mk-distributed-cluster-state 1) - (verify-first-call-args-for-indices mk-distributed-cluster-state [4] nil)))) + (. (Mockito/when (.mkClientImpl zk-mock (Mockito/anyMap) (Mockito/anyList) (Mockito/any) (Mockito/anyString) (Mockito/any) (Mockito/anyMap))) (thenReturn curator-frameworke)) + (ClusterUtils/mkStateStorage {} nil nil (ClusterStateContext.)) + (.mkdirsImpl (Mockito/verify zk-mock (Mockito/times 1)) (Mockito/any) (Mockito/anyString) (Mockito/eq nil)))) + (let [distributed-state-storage (reify IStateStorage + (register [this callback] nil) + (mkdirs [this path acls] nil)) + cluster-utils (Mockito/mock ClusterUtils)] + (with-open [mocked-cluster (MockedCluster. cluster-utils)] - (. (Mockito/when (mkStateStorageImpl cluster-utils (Mockito/any) (Mockito/any) (Mockito/eq nil) (Mockito/any))) (thenReturn distributed-state-storage)) ++ (. (Mockito/when (.mkStateStorageImpl cluster-utils (Mockito/any) (Mockito/any) (Mockito/eq nil) (Mockito/any))) (thenReturn distributed-state-storage)) + (ClusterUtils/mkStormClusterState {} nil (ClusterStateContext.)))))) http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/test/clj/org/apache/storm/nimbus_test.clj ---------------------------------------------------------------------- diff --cc storm-core/test/clj/org/apache/storm/nimbus_test.clj index 772a232,70cb885..2a65efc --- a/storm-core/test/clj/org/apache/storm/nimbus_test.clj +++ b/storm-core/test/clj/org/apache/storm/nimbus_test.clj @@@ -23,27 -23,33 +23,36 @@@ [org.apache.storm.nimbus InMemoryTopologyActionNotifier]) (:import [org.apache.storm.testing.staticmocking MockedZookeeper]) (:import [org.apache.storm.scheduler INimbus]) + (:import [org.mockito Mockito]) + (:import [org.mockito.exceptions.base MockitoAssertionError]) (:import [org.apache.storm.nimbus ILeaderElector NimbusInfo]) - (:import [org.apache.storm.testing.staticmocking MockedConfigUtils MockedCluster]) ++ (:import [org.apache.storm.testing.staticmocking MockedCluster]) (:import [org.apache.storm.generated Credentials NotAliveException SubmitOptions TopologyInitialStatus TopologyStatus AlreadyAliveException KillOptions RebalanceOptions InvalidTopologyException AuthorizationException LogConfig LogLevel LogLevelAction]) (:import [java.util HashMap]) (:import [java.io File]) - (:import [org.apache.storm.utils Time Utils ConfigUtils]) + (:import [org.apache.storm.utils Time Utils Utils$UptimeComputer ConfigUtils IPredicate] + [org.apache.storm.utils.staticmocking ConfigUtilsInstaller UtilsInstaller]) (:import [org.apache.storm.zookeeper Zookeeper]) - (:import [org.apache.commons.io FileUtils]) + (:import [org.apache.commons.io FileUtils] + [org.json.simple JSONValue]) - (:use [org.apache.storm testing MockAutoCred util config log timer zookeeper]) + (:import [org.apache.storm.cluster StormClusterStateImpl ClusterStateContext ClusterUtils]) + (:use [org.apache.storm testing MockAutoCred util config log timer converter]) (:use [org.apache.storm.daemon common]) (:require [conjure.core]) (:require [org.apache.storm - [thrift :as thrift] - [cluster :as cluster]]) + [thrift :as thrift]]) (:use [conjure core])) + (defn- from-json + [^String str] + (if str + (clojurify-structure + (JSONValue/parse str)) + nil)) + (defn storm-component->task-info [cluster storm-name] (let [storm-id (get-storm-id (:storm-cluster-state cluster) storm-name) nimbus (:nimbus cluster)] @@@ -72,8 -80,8 +83,8 @@@ (defn storm-num-workers [state storm-name] (let [storm-id (get-storm-id state storm-name) - assignment (.assignment-info state storm-id nil)] + assignment (clojurify-assignment (.assignmentInfo state storm-id nil))] - (count (reverse-map (:executor->node+port assignment))) + (count (clojurify-structure (Utils/reverseMap (:executor->node+port assignment)))) )) (defn topology-nodes [state storm-name] @@@ -95,9 -103,11 +106,11 @@@ set ))) + ;TODO: when translating this function, don't call map-val, but instead use an inline for loop. + ; map-val is a temporary kluge for clojure. (defn topology-node-distribution [state storm-name] (let [storm-id (get-storm-id state storm-name) - assignment (.assignment-info state storm-id nil)] + assignment (clojurify-assignment (.assignmentInfo state storm-id nil))] (->> assignment :executor->node+port vals @@@ -124,19 -134,18 +137,18 @@@ (defn do-executor-heartbeat [cluster storm-id executor] (let [state (:storm-cluster-state cluster) - executor->node+port (:executor->node+port (.assignment-info state storm-id nil)) + executor->node+port (:executor->node+port (clojurify-assignment (.assignmentInfo state storm-id nil))) [node port] (get executor->node+port executor) - curr-beat (.get-worker-heartbeat state storm-id node port) + curr-beat (clojurify-zk-worker-hb (.getWorkerHeartbeat state storm-id node port)) stats (:executor-stats curr-beat)] - (.worker-heartbeat! state storm-id node port - {:storm-id storm-id :time-secs (Time/currentTimeSecs) :uptime 10 :executor-stats (merge stats {executor (stats/render-stats! (stats/mk-bolt-stats 20))})} + (.workerHeartbeat state storm-id node port - (thriftify-zk-worker-hb {:storm-id storm-id :time-secs (current-time-secs) :uptime 10 :executor-stats (merge stats {executor (stats/render-stats! (stats/mk-bolt-stats 20))})}) ++ (thriftify-zk-worker-hb {:storm-id storm-id :time-secs (Time/currentTimeSecs) :uptime 10 :executor-stats (merge stats {executor (stats/render-stats! (stats/mk-bolt-stats 20))})}) ))) (defn slot-assignments [cluster storm-id] (let [state (:storm-cluster-state cluster) - assignment (.assignment-info state storm-id nil)] + assignment (clojurify-assignment (.assignmentInfo state storm-id nil))] - (reverse-map (:executor->node+port assignment)) - )) + (clojurify-structure (Utils/reverseMap (:executor->node+port assignment))))) (defn task-ids [cluster storm-id] (let [nimbus (:nimbus cluster)] @@@ -146,8 -155,10 +158,10 @@@ (defn topology-executors [cluster storm-id] (let [state (:storm-cluster-state cluster) - assignment (clojurify-assignment (.assignmentInfo state storm-id nil))] - (keys (:executor->node+port assignment)) - assignment (.assignment-info state storm-id nil) - ret-keys (keys (:executor->node+port assignment)) ++ assignment (clojurify-assignment (.assignmentInfo state storm-id nil)) ++ ret-keys (keys (:executor->node+port assignment)) + _ (log-message "ret-keys: " (pr-str ret-keys)) ] + ret-keys )) (defn check-distribution [items distribution] @@@ -1350,23 -1399,27 +1402,29 @@@ NIMBUS-THRIFT-PORT 6666}) expected-acls nimbus/NIMBUS-ZK-ACLS fake-inimbus (reify INimbus (getForcedScheduler [this] nil)) + fake-cu (proxy [ConfigUtils] [] - (nimbusTopoHistoryStateImpl [conf] nil)) ++ (nimbusTopoHistoryStateImpl [conf] nil)) + fake-utils (proxy [Utils] [] + (newInstanceImpl [_]) + (makeUptimeComputer [] (proxy [Utils$UptimeComputer] [] - (upTime [] 0))))] ++ (upTime [] 0)))) + cluster-utils (Mockito/mock ClusterUtils)] - (with-open [_ (proxy [MockedConfigUtils] [] + (with-open [_ (ConfigUtilsInstaller. fake-cu) + _ (UtilsInstaller. fake-utils) ++ _ (proxy [ConfigUtils] [] + (nimbusTopoHistoryStateImpl [conf] nil)) zk-le (MockedZookeeper. (proxy [Zookeeper] [] - (zkLeaderElectorImpl [conf] nil)))] + (zkLeaderElectorImpl [conf] nil))) + mocked-cluster (MockedCluster. cluster-utils)] (stubbing [mk-authorization-handler nil - cluster/mk-storm-cluster-state nil - nimbus/file-cache-map nil - nimbus/mk-blob-cache-map nil - nimbus/mk-bloblist-cache-map nil - mk-timer nil - nimbus/mk-scheduler nil] - (nimbus/nimbus-data auth-conf fake-inimbus) - (verify-call-times-for cluster/mk-storm-cluster-state 1) - (verify-first-call-args-for-indices cluster/mk-storm-cluster-state [2] - expected-acls)))))) + nimbus/file-cache-map nil + nimbus/mk-blob-cache-map nil + nimbus/mk-bloblist-cache-map nil - uptime-computer nil - new-instance nil + mk-timer nil + nimbus/mk-scheduler nil] + (nimbus/nimbus-data auth-conf fake-inimbus) + (.mkStormClusterStateImpl (Mockito/verify cluster-utils (Mockito/times 1)) (Mockito/any) (Mockito/eq expected-acls) (Mockito/any)) + ))))) (deftest test-file-bogus-download (with-local-cluster [cluster :daemon-conf {SUPERVISOR-ENABLE false TOPOLOGY-ACKER-EXECUTORS 0 TOPOLOGY-EVENTLOGGER-EXECUTORS 0}] @@@ -1397,9 -1450,9 +1455,9 @@@ STORM-CLUSTER-MODE "local" STORM-ZOOKEEPER-PORT zk-port STORM-LOCAL-DIR nimbus-dir})) - (bind cluster-state (cluster/mk-storm-cluster-state conf)) + (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.))) (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus))) - (sleep-secs 1) + (Time/sleepSecs 1) (bind topology (thrift/mk-topology {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)} {})) @@@ -1429,10 -1482,10 +1487,10 @@@ STORM-ZOOKEEPER-PORT zk-port STORM-LOCAL-DIR nimbus-dir NIMBUS-TOPOLOGY-ACTION-NOTIFIER-PLUGIN (.getName InMemoryTopologyActionNotifier)})) - (bind cluster-state (cluster/mk-storm-cluster-state conf)) + (bind cluster-state (ClusterUtils/mkStormClusterState conf nil (ClusterStateContext.))) (bind nimbus (nimbus/service-handler conf (nimbus/standalone-nimbus))) (bind notifier (InMemoryTopologyActionNotifier.)) - (sleep-secs 1) + (Time/sleepSecs 1) (bind topology (thrift/mk-topology {"1" (thrift/mk-spout-spec (TestPlannerSpout. true) :parallelism-hint 3)} {})) http://git-wip-us.apache.org/repos/asf/storm/blob/9a8962de/storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj ---------------------------------------------------------------------- diff --cc storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj index 1a7bd2c,0925237..1c45266 --- a/storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj +++ b/storm-core/test/clj/org/apache/storm/pacemaker_state_factory_test.clj @@@ -19,10 -20,8 +19,11 @@@ (:import [org.apache.storm.generated HBExecutionException HBNodes HBRecords HBServerMessageType HBMessage HBMessageData HBPulse] - [org.apache.storm.cluster ClusterStateContext PaceMakerStateStorageFactory] - [org.apache.storm.cluster ClusterStateContext] - [org.mockito Mockito Matchers])) ++ [org.apache.storm.cluster ClusterStateContext PaceMakerStateStorageFactory PaceMakerStateStorage] + [org.mockito Mockito Matchers]) +(:import [org.mockito.exceptions.base MockitoAssertionError]) ++(:import [org.apache.storm.pacemaker PacemakerClient]) +(:import [org.apache.storm.testing.staticmocking MockedPaceMakerStateStorageFactory])) (defn- string-to-bytes [string] (byte-array (map int string))) @@@ -30,26 -29,24 +31,23 @@@ (defn- bytes-to-string [bytez] (apply str (map char bytez))) --(defprotocol send-capture -- (send [this something]) -- (check-captured [this])) -- (defn- make-send-capture [response] (let [captured (atom nil)] -- (reify send-capture -- (send [this something] (reset! captured something) response) -- (check-captured [this] @captured)))) - -(defmacro with-mock-pacemaker-client-and-state [client state response & body] - `(let [~client (make-send-capture ~response)] - (stubbing [psf/makeZKState nil - psf/makeClient ~client] - (let [~state (psf/-mkState nil nil nil nil (ClusterStateContext.))] ++ (proxy [PacemakerClient] [] ++ (send [m] (reset! captured m) response) ++ (checkCaptured [] @captured)))) + +(defmacro with-mock-pacemaker-client-and-state [client state pacefactory mock response & body] + `(let [~client (make-send-capture ~response) + ~pacefactory (Mockito/mock PaceMakerStateStorageFactory)] + + (with-open [~mock (MockedPaceMakerStateStorageFactory. ~pacefactory)] + (. (Mockito/when (.initZKstateImpl ~pacefactory (Mockito/any) (Mockito/any) (Mockito/anyList) (Mockito/any))) (thenReturn nil)) + (. (Mockito/when (.initMakeClientImpl ~pacefactory (Mockito/any))) (thenReturn ~client)) - (let [~state (.mkStore ~pacefactory nil nil nil (ClusterStateContext.))] ++ (let [~state (PaceMakerStateStorage. (PaceMakerStateStorageFactory/initMakeClient nil) ++ (PaceMakerStateStorageFactory/initZKstate nil nil nil nil))] ~@body)))) - (deftest pacemaker_state_set_worker_hb (testing "set_worker_hb" (with-mock-pacemaker-client-and-state @@@ -57,7 -54,7 +55,7 @@@ (HBMessage. HBServerMessageType/SEND_PULSE_RESPONSE nil) (.set_worker_hb state "/foo" (string-to-bytes "data") nil) -- (let [sent (.check-captured client) ++ (let [sent (.checkCaptured client) pulse (.get_pulse (.get_data sent))] (is (= (.get_type sent) HBServerMessageType/SEND_PULSE)) (is (= (.get_id pulse) "/foo")) @@@ -65,13 -62,13 +63,12 @@@ (testing "set_worker_hb" (with-mock-pacemaker-client-and-state - client state + client state pacefactory mock (HBMessage. HBServerMessageType/SEND_PULSE nil) - (is (thrown? RuntimeException - (is (thrown? HBExecutionException -- (.set_worker_hb state "/foo" (string-to-bytes "data") nil)))))) ++ (is (thrown? RuntimeException ++ (.set_worker_hb state "/foo" (string-to-bytes "data") nil)))))) -- (deftest pacemaker_state_delete_worker_hb (testing "delete_worker_hb" @@@ -80,74 -77,74 +77,75 @@@ (HBMessage. HBServerMessageType/DELETE_PATH_RESPONSE nil) (.delete_worker_hb state "/foo/bar") -- (let [sent (.check-captured client)] ++ (let [sent (.checkCaptured client)] (is (= (.get_type sent) HBServerMessageType/DELETE_PATH)) (is (= (.get_path (.get_data sent)) "/foo/bar"))))) -- (testing "delete_worker_hb" -- (with-mock-pacemaker-client-and-state - client state pacefactory mock - client state -- (HBMessage. HBServerMessageType/DELETE_PATH nil) -- - (is (thrown? RuntimeException - (is (thrown? HBExecutionException -- (.delete_worker_hb state "/foo/bar")))))) ++ (testing "delete_worker_hb" ++ (with-mock-pacemaker-client-and-state ++ client state pacefactory mock ++ (HBMessage. HBServerMessageType/DELETE_PATH nil) ++ ++ (is (thrown? RuntimeException ++ (.delete_worker_hb state "/foo/bar")))))) (deftest pacemaker_state_get_worker_hb (testing "get_worker_hb" (with-mock-pacemaker-client-and-state - client state + client state pacefactory mock (HBMessage. HBServerMessageType/GET_PULSE_RESPONSE -- (HBMessageData/pulse -- (doto (HBPulse.) -- (.set_id "/foo") -- (.set_details (string-to-bytes "some data"))))) ++ (HBMessageData/pulse ++ (doto (HBPulse.) ++ (.set_id "/foo") ++ (.set_details (string-to-bytes "some data"))))) (.get_worker_hb state "/foo" false) -- (let [sent (.check-captured client)] ++ (let [sent (.checkCaptured client)] (is (= (.get_type sent) HBServerMessageType/GET_PULSE)) (is (= (.get_path (.get_data sent)) "/foo"))))) (testing "get_worker_hb - fail (bad response)" (with-mock-pacemaker-client-and-state - client state + client state pacefactory mock (HBMessage. HBServerMessageType/GET_PULSE nil) -- - (is (thrown? HBExecutionException - (.get_worker_hb state "/foo" false))))) - ++ + (is (thrown? RuntimeException - (.get_worker_hb state "/foo" false))))) - ++ (.get_worker_hb state "/foo" false))))) ++ (testing "get_worker_hb - fail (bad data)" (with-mock-pacemaker-client-and-state - client state + client state pacefactory mock (HBMessage. HBServerMessageType/GET_PULSE_RESPONSE nil) -- - (is (thrown? HBExecutionException - (.get_worker_hb state "/foo" false)))))) ++ + (is (thrown? RuntimeException - (.get_worker_hb state "/foo" false)))))) ++ (.get_worker_hb state "/foo" false)))))) (deftest pacemaker_state_get_worker_hb_children (testing "get_worker_hb_children" (with-mock-pacemaker-client-and-state - client state + client state pacefactory mock (HBMessage. HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE -- (HBMessageData/nodes -- (HBNodes. []))) ++ (HBMessageData/nodes ++ (HBNodes. []))) (.get_worker_hb_children state "/foo" false) -- (let [sent (.check-captured client)] ++ (let [sent (.checkCaptured client)] (is (= (.get_type sent) HBServerMessageType/GET_ALL_NODES_FOR_PATH)) (is (= (.get_path (.get_data sent)) "/foo"))))) (testing "get_worker_hb_children - fail (bad response)" (with-mock-pacemaker-client-and-state - client state + client state pacefactory mock (HBMessage. HBServerMessageType/DELETE_PATH nil) - (is (thrown? HBExecutionException - (.get_worker_hb_children state "/foo" false))))) + (is (thrown? RuntimeException - (.get_worker_hb_children state "/foo" false))))) ++ (.get_worker_hb_children state "/foo" false))))) -- (testing "get_worker_hb_children - fail (bad data)" ++ (testing "get_worker_hb_children - fail (bad data)" (with-mock-pacemaker-client-and-state - client state + client state pacefactory mock (HBMessage. HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE nil) - ;need been update due to HBExecutionException - - (is (thrown? HBExecutionException - (.get_worker_hb_children state "/foo" false)))))) ++ + (is (thrown? RuntimeException - (.get_worker_hb_children state "/foo" false)))))) ++ (.get_worker_hb_children state "/foo" false)))))) ++