Merge branch 'master' into ClusterUtils
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/924f9a29 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/924f9a29 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/924f9a29 Branch: refs/heads/master Commit: 924f9a29dc1713921178e61ce2f5b6a14df97ed7 Parents: 2ee8bec 12ceb09 Author: xiaojian.fxj <xiaojian....@alibaba-inc.com> Authored: Mon Feb 15 15:13:33 2016 +0800 Committer: xiaojian.fxj <xiaojian....@alibaba-inc.com> Committed: Tue Feb 16 11:14:18 2016 +0800 ---------------------------------------------------------------------- CHANGELOG.md | 16 + README.markdown | 1 + bin/storm-config.cmd | 6 +- bin/storm.cmd | 47 +- bin/storm.py | 8 +- conf/defaults.yaml | 4 + dev-tools/travis/travis-script.sh | 4 +- .../starter/trident/TridentMapExample.java | 123 +++ external/sql/storm-sql-core/pom.xml | 9 + external/storm-elasticsearch/pom.xml | 2 + .../storm/hbase/security/HBaseSecurityUtil.java | 36 +- .../jvm/org/apache/storm/kafka/KafkaSpout.java | 8 +- .../jvm/org/apache/storm/kafka/KafkaUtils.java | 44 +- .../apache/storm/kafka/PartitionManager.java | 42 +- .../kafka/trident/TridentKafkaEmitter.java | 23 +- external/storm-mqtt/core/pom.xml | 4 +- log4j2/cluster.xml | 2 +- log4j2/worker.xml | 2 +- pom.xml | 9 +- storm-core/pom.xml | 11 +- .../src/clj/org/apache/storm/LocalCluster.clj | 4 +- storm-core/src/clj/org/apache/storm/clojure.clj | 8 +- .../clj/org/apache/storm/command/blobstore.clj | 11 +- .../org/apache/storm/command/config_value.clj | 25 - .../org/apache/storm/command/dev_zookeeper.clj | 6 +- .../clj/org/apache/storm/command/get_errors.clj | 12 +- .../apache/storm/command/shell_submission.clj | 4 +- storm-core/src/clj/org/apache/storm/config.clj | 18 +- .../src/clj/org/apache/storm/converter.clj | 14 +- .../src/clj/org/apache/storm/daemon/acker.clj | 21 +- .../src/clj/org/apache/storm/daemon/common.clj | 46 +- .../src/clj/org/apache/storm/daemon/drpc.clj | 25 +- .../clj/org/apache/storm/daemon/executor.clj | 530 +++++----- .../clj/org/apache/storm/daemon/logviewer.clj | 70 +- .../src/clj/org/apache/storm/daemon/nimbus.clj | 172 ++-- .../clj/org/apache/storm/daemon/supervisor.clj | 202 ++-- .../src/clj/org/apache/storm/daemon/task.clj | 2 +- .../src/clj/org/apache/storm/daemon/worker.clj | 78 +- .../src/clj/org/apache/storm/disruptor.clj | 10 +- storm-core/src/clj/org/apache/storm/event.clj | 2 +- .../src/clj/org/apache/storm/local_state.clj | 9 +- .../clj/org/apache/storm/messaging/loader.clj | 34 - .../clj/org/apache/storm/messaging/local.clj | 23 - .../org/apache/storm/pacemaker/pacemaker.clj | 7 +- .../storm/pacemaker/pacemaker_state_factory.clj | 122 --- .../clj/org/apache/storm/process_simulator.clj | 4 +- .../apache/storm/scheduler/DefaultScheduler.clj | 7 +- .../apache/storm/scheduler/EvenScheduler.clj | 23 +- .../storm/scheduler/IsolationScheduler.clj | 29 +- storm-core/src/clj/org/apache/storm/stats.clj | 82 +- storm-core/src/clj/org/apache/storm/testing.clj | 89 +- storm-core/src/clj/org/apache/storm/thrift.clj | 6 +- storm-core/src/clj/org/apache/storm/timer.clj | 12 +- .../clj/org/apache/storm/trident/testing.clj | 9 +- storm-core/src/clj/org/apache/storm/ui/core.clj | 99 +- .../src/clj/org/apache/storm/ui/helpers.clj | 14 +- storm-core/src/clj/org/apache/storm/util.clj | 925 +---------------- storm-core/src/jvm/org/apache/storm/Config.java | 39 + .../org/apache/storm/cluster/ClusterUtils.java | 49 +- .../storm/cluster/IStormClusterState.java | 9 +- .../storm/cluster/PaceMakerStateStorage.java | 4 +- .../storm/cluster/StateStorageFactory.java | 2 +- .../storm/cluster/StormClusterStateImpl.java | 51 +- .../storm/cluster/ZKStateStorageFactory.java | 4 +- .../org/apache/storm/command/ConfigValue.java | 30 + .../storm/daemon/metrics/MetricsUtils.java | 108 ++ .../reporters/ConsolePreparableReporter.java | 76 ++ .../reporters/CsvPreparableReporter.java | 80 ++ .../reporters/JmxPreparableReporter.java | 70 ++ .../metrics/reporters/PreparableReporter.java | 32 + .../storm/logging/ThriftAccessLogger.java | 13 +- .../apache/storm/pacemaker/PacemakerClient.java | 5 + .../serialization/SerializationFactory.java | 17 +- .../staticmocking/MockedConfigUtils.java | 31 - .../jvm/org/apache/storm/trident/Stream.java | 87 +- .../storm/trident/operation/Consumer.java | 35 + .../trident/operation/FlatMapFunction.java | 37 + .../storm/trident/operation/MapFunction.java | 36 + .../operation/impl/ConsumerExecutor.java | 38 + .../operation/impl/FlatMapFunctionExecutor.java | 43 + .../operation/impl/MapFunctionExecutor.java | 41 + .../trident/planner/processor/MapProcessor.java | 87 ++ .../jvm/org/apache/storm/utils/ConfigUtils.java | 20 +- .../jvm/org/apache/storm/utils/Container.java | 11 +- .../jvm/org/apache/storm/utils/IPredicate.java | 27 + .../org/apache/storm/utils/NimbusClient.java | 2 +- .../utils/StormConnectionStateConverter.java | 44 + .../jvm/org/apache/storm/utils/TestUtils.java | 34 - .../src/jvm/org/apache/storm/utils/Time.java | 26 +- .../src/jvm/org/apache/storm/utils/Utils.java | 989 +++++++++++++++++-- .../storm/validation/ConfigValidation.java | 2 +- .../org/apache/storm/zookeeper/Zookeeper.java | 7 + .../org/apache/storm/integration_test.clj | 98 +- .../org/apache/storm/testing4j_test.clj | 37 +- .../apache/storm/trident/integration_test.clj | 15 +- .../test/clj/org/apache/storm/cluster_test.clj | 27 +- .../test/clj/org/apache/storm/drpc_test.clj | 23 +- .../clj/org/apache/storm/logviewer_test.clj | 267 ++--- .../storm/messaging/netty_integration_test.clj | 2 +- .../test/clj/org/apache/storm/nimbus_test.clj | 112 ++- .../storm/pacemaker_state_factory_test.clj | 74 +- .../scheduler/resource_aware_scheduler_test.clj | 21 +- .../apache/storm/security/auth/auth_test.clj | 11 +- .../authorizer/DRPCSimpleACLAuthorizer_test.clj | 2 +- .../BlowfishTupleSerializer_test.clj | 1 - .../clj/org/apache/storm/serialization_test.clj | 23 +- .../clj/org/apache/storm/supervisor_test.clj | 649 ++++++------ .../clj/org/apache/storm/transactional_test.clj | 18 + .../clj/org/apache/storm/trident/state_test.clj | 5 +- .../clj/org/apache/storm/trident/tuple_test.clj | 15 +- .../test/clj/org/apache/storm/utils_test.clj | 16 +- .../test/clj/org/apache/storm/worker_test.clj | 1 - .../staticmocking/ConfigUtilsInstaller.java | 38 + .../utils/staticmocking/UtilsInstaller.java | 38 + .../storm/utils/staticmocking/package-info.java | 95 ++ 115 files changed, 4281 insertions(+), 2648 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/conf/defaults.yaml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj index 7be526d,657e242..ab5cbed --- a/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj +++ b/storm-core/src/clj/org/apache/storm/command/dev_zookeeper.clj @@@ -14,7 -14,8 +14,8 @@@ ;; See the License for the specific language governing permissions and ;; limitations under the License. (ns org.apache.storm.command.dev-zookeeper + (:import [org.apache.storm.utils Utils]) - (:use [org.apache.storm zookeeper util config]) + (:use [org.apache.storm util config]) (:import [org.apache.storm.utils ConfigUtils]) (:import [org.apache.storm.zookeeper Zookeeper]) (:gen-class)) http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/command/shell_submission.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/command/shell_submission.clj index 3978d2f,0253338..870e7f6 --- a/storm-core/src/clj/org/apache/storm/command/shell_submission.clj +++ b/storm-core/src/clj/org/apache/storm/command/shell_submission.clj @@@ -15,8 -15,9 +15,9 @@@ ;; limitations under the License. (ns org.apache.storm.command.shell-submission (:import [org.apache.storm StormSubmitter] + [org.apache.storm.utils Utils] [org.apache.storm.zookeeper Zookeeper]) - (:use [org.apache.storm thrift util config log zookeeper]) + (:use [org.apache.storm thrift util config log]) (:require [clojure.string :as str]) (:import [org.apache.storm.utils ConfigUtils]) (:gen-class)) http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/converter.clj ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/daemon/common.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/common.clj index b144f40,eb1ec1e..db342d2 --- a/storm-core/src/clj/org/apache/storm/daemon/common.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/common.clj @@@ -15,17 -15,19 +15,20 @@@ ;; limitations under the License. (ns org.apache.storm.daemon.common (:use [org.apache.storm log config util]) - (:import [org.apache.storm.generated StormTopology + (:import [org.apache.storm.generated StormTopology NodeInfo InvalidTopologyException GlobalStreamId] - [org.apache.storm.utils ThriftTopologyUtils]) - (:import [org.apache.storm.utils Utils ConfigUtils]) + [org.apache.storm.utils Utils ConfigUtils IPredicate ThriftTopologyUtils] + [org.apache.storm.daemon.metrics.reporters PreparableReporter] + [com.codahale.metrics MetricRegistry]) + (:import [org.apache.storm.daemon.metrics MetricsUtils]) (:import [org.apache.storm.task WorkerTopologyContext]) (:import [org.apache.storm Constants]) + (:import [org.apache.storm.cluster StormClusterStateImpl]) (:import [org.apache.storm.metric SystemBolt]) (:import [org.apache.storm.metric EventLoggerBolt]) - (:import [org.apache.storm.security.auth IAuthorizer]) - (:import [java.io InterruptedIOException]) + (:import [org.apache.storm.security.auth IAuthorizer]) + (:import [java.io InterruptedIOException] + [org.json.simple JSONValue]) (:require [clojure.set :as set]) (:require [org.apache.storm.daemon.acker :as acker]) (:require [org.apache.storm.thrift :as thrift]) @@@ -73,12 -83,10 +84,11 @@@ (defn new-executor-stats [] (ExecutorStats. 0 0 0 0 0)) + (defn get-storm-id [storm-cluster-state storm-name] - (let [active-storms (.activeStorms storm-cluster-state)] - (find-first - #(= storm-name (.get_name (.stormBase storm-cluster-state % nil))) - active-storms) - (let [active-storms (.active-storms storm-cluster-state) - pred (reify IPredicate (test [this x] (= storm-name (:storm-name (.storm-base storm-cluster-state x nil)))))] ++ (let [active-storms (.activeStorms storm-cluster-state) ++ pred (reify IPredicate (test [this x] (= storm-name (.get_name (.stormBase storm-cluster-state x nil)))))] + (Utils/findOne pred active-storms) )) (defn topology-bases [storm-cluster-state] http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/executor.clj index 49ae6cf,e2380b7..33b89ed --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@@ -34,10 -34,13 +34,12 @@@ (:import [org.apache.storm.daemon Shutdownable]) (:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric]) (:import [org.apache.storm Config Constants]) - (:import [org.apache.storm.cluster ClusterStateContext DaemonType]) + (:import [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils]) (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping]) - (:import [java.util.concurrent ConcurrentLinkedQueue]) + (:import [java.lang Thread Thread$UncaughtExceptionHandler] + [java.util.concurrent ConcurrentLinkedQueue] + [org.json.simple JSONValue]) - (:require [org.apache.storm [thrift :as thrift] - [cluster :as cluster] [disruptor :as disruptor] [stats :as stats]]) + (:require [org.apache.storm [thrift :as thrift] [disruptor :as disruptor] [stats :as stats]]) (:require [org.apache.storm.daemon [task :as task]]) (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics]) (:require [clojure.set :as set])) @@@ -206,9 -211,9 +210,9 @@@ (swap! interval-errors inc) (when (<= @interval-errors max-per-interval) - (cluster/report-error (:storm-cluster-state executor) (:storm-id executor) (:component-id executor) + (.reportError (:storm-cluster-state executor) (:storm-id executor) (:component-id executor) - (hostname storm-conf) + (Utils/hostname storm-conf) - (.getThisWorkerPort (:worker-context executor)) error) + (long (.getThisWorkerPort (:worker-context executor))) error) )))) ;; in its own function so that it can be mocked out by tracked topologies http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/nimbus.clj index daf5e45,710cd83..6bdbdc0 --- a/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/nimbus.clj @@@ -38,9 -39,9 +39,9 @@@ (:import [org.apache.storm.scheduler INimbus SupervisorDetails WorkerSlot TopologyDetails Cluster Topologies SchedulerAssignment SchedulerAssignmentImpl DefaultScheduler ExecutorDetails]) (:import [org.apache.storm.nimbus NimbusInfo]) - (:import [org.apache.storm.utils TimeCacheMap TimeCacheMap$ExpiredCallback Utils ConfigUtils TupleUtils ThriftTopologyUtils + (:import [org.apache.storm.utils TimeCacheMap Time TimeCacheMap$ExpiredCallback Utils ConfigUtils TupleUtils ThriftTopologyUtils BufferFileInputStream BufferInputStream]) - (:import [org.apache.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo + (:import [org.apache.storm.generated NotAliveException AlreadyAliveException StormTopology ErrorInfo ClusterWorkerHeartbeat ExecutorInfo InvalidTopologyException Nimbus$Iface Nimbus$Processor SubmitOptions TopologyInitialStatus KillOptions RebalanceOptions ClusterSummary SupervisorSummary TopologySummary TopologyInfo TopologyHistoryInfo ExecutorSummary AuthorizationException GetInfoOptions NumErrorsChoice SettableBlobMeta ReadableBlobMeta @@@ -407,10 -414,10 +409,10 @@@ [storm-cluster-state] (let [assignments (.assignments storm-cluster-state nil)] - (defaulted + (or (apply merge-with set/union (for [a assignments - [_ [node port]] (-> (.assignment-info storm-cluster-state a nil) :executor->node+port)] + [_ [node port]] (-> (clojurify-assignment (.assignmentInfo storm-cluster-state a nil)) :executor->node+port)] {node #{port}} )) {}) @@@ -586,12 -594,7 +589,12 @@@ (defn update-heartbeats! [nimbus storm-id all-executors existing-assignment] (log-debug "Updating heartbeats for " storm-id " " (pr-str all-executors)) (let [storm-cluster-state (:storm-cluster-state nimbus) - executor-beats (.executor-beats storm-cluster-state storm-id (:executor->node+port existing-assignment)) + executor-beats (let [executor-stats-java-map (.executorBeats storm-cluster-state storm-id (.get_executor_node_port (thriftify-assignment existing-assignment))) + executor-stats-clojurify (clojurify-structure executor-stats-java-map)] - (->> (dofor [[^ExecutorInfo executor-info ^ClusterWorkerHeartbeat cluster-worker-heartbeat] executor-stats-clojurify] - {[(.get_task_start executor-info) (.get_task_end executor-info)] (clojurify-zk-worker-hb cluster-worker-heartbeat)}) ++ (->> (dofor [[^ExecutorInfo executor-info executor-heartbeat] executor-stats-clojurify] ++ {[(.get_task_start executor-info) (.get_task_end executor-info)] executor-heartbeat}) + (apply merge))) + cache (update-heartbeat-cache (@(:heartbeats-cache nimbus) storm-id) executor-beats all-executors @@@ -988,10 -1002,10 +1002,10 @@@ topology (system-topology! storm-conf (read-storm-topology storm-id blob-store)) num-executors (->> (all-components topology) (map-val num-start-executors))] (log-message "Activating " storm-name ": " storm-id) - (.activate-storm! storm-cluster-state + (.activateStorm storm-cluster-state storm-id - (StormBase. storm-name + (thriftify-storm-base (StormBase. storm-name - (current-time-secs) + (Time/currentTimeSecs) {:type topology-initial-status} (storm-conf TOPOLOGY-WORKERS) num-executors @@@ -1137,9 -1152,9 +1152,9 @@@ (when-not (empty? to-cleanup-ids) (doseq [id to-cleanup-ids] (log-message "Cleaning up " id) - (.teardown-heartbeats! storm-cluster-state id) - (.teardown-topology-errors! storm-cluster-state id) + (.teardownHeartbeats storm-cluster-state id) + (.teardownTopologyErrors storm-cluster-state id) - (rmr (ConfigUtils/masterStormDistRoot conf id)) + (Utils/forceDelete (ConfigUtils/masterStormDistRoot conf id)) (blob-rm-topology-keys id blob-store storm-cluster-state) (swap! (:heartbeats-cache nimbus) dissoc id))))) (log-message "not a leader, skipping cleanup"))) @@@ -1811,8 -1830,8 +1838,8 @@@ (.set_used_cpu sup-sum used-cpu)) (when-let [version (:version info)] (.set_version sup-sum version)) sup-sum)) - nimbus-uptime ((:uptime nimbus)) + nimbus-uptime (. (:uptime nimbus) upTime) - bases (topology-bases storm-cluster-state) + bases (nimbus-topology-bases storm-cluster-state) nimbuses (.nimbuses storm-cluster-state) ;;update the isLeader field for each nimbus summary http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/supervisor.clj index 3a83d03,ae9e92f..273a6bd --- a/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/supervisor.clj @@@ -16,14 -16,15 +16,15 @@@ (ns org.apache.storm.daemon.supervisor (:import [java.io File IOException FileOutputStream]) (:import [org.apache.storm.scheduler ISupervisor] - [org.apache.storm.utils LocalState Time Utils ConfigUtils] + [org.apache.storm.utils LocalState Time Utils Utils$ExitCodeCallable + ConfigUtils] [org.apache.storm.daemon Shutdownable] [org.apache.storm Constants] - [org.apache.storm.cluster ClusterStateContext DaemonType] + [org.apache.storm.cluster ClusterStateContext DaemonType StormClusterStateImpl ClusterUtils] [java.net JarURLConnection] - [java.net URI] + [java.net URI URLDecoder] [org.apache.commons.io FileUtils]) - (:use [org.apache.storm config util log timer local-state]) + (:use [org.apache.storm config util log timer local-state converter]) (:import [org.apache.storm.generated AuthorizationException KeyNotFoundException WorkerResources]) (:import [org.apache.storm.utils NimbusLeaderNotFoundException VersionInfo]) (:import [java.nio.file Files StandardCopyOption]) @@@ -315,12 -319,14 +321,12 @@@ :shared-context shared-context :isupervisor isupervisor :active (atom true) - :uptime (uptime-computer) + :uptime (Utils/makeUptimeComputer) :version STORM-VERSION :worker-thread-pids-atom (atom {}) - :storm-cluster-state (cluster/mk-storm-cluster-state conf :acls (when - (Utils/isZkAuthenticationConfiguredStormServer - conf) - SUPERVISOR-ZK-ACLS) - :context (ClusterStateContext. DaemonType/SUPERVISOR)) + :storm-cluster-state (ClusterUtils/mkStormClusterState conf (when (Utils/isZkAuthenticationConfiguredStormServer conf) + SUPERVISOR-ZK-ACLS) + (ClusterStateContext. DaemonType/SUPERVISOR)) :local-state (ConfigUtils/supervisorState conf) :supervisor-id (.getSupervisorId isupervisor) :assignment-id (.getAssignmentId isupervisor) @@@ -777,19 -791,19 +792,19 @@@ synchronize-blobs-fn (update-blobs-for-all-topologies-fn supervisor) downloaded-storm-ids (set (read-downloaded-storm-ids conf)) run-profiler-actions-fn (mk-run-profiler-actions-for-all-topologies supervisor) - heartbeat-fn (fn [] (.supervisor-heartbeat! + heartbeat-fn (fn [] (.supervisorHeartbeat (:storm-cluster-state supervisor) (:supervisor-id supervisor) - (thriftify-supervisor-info (->SupervisorInfo (current-time-secs) - (->SupervisorInfo (Time/currentTimeSecs) ++ (thriftify-supervisor-info (->SupervisorInfo (Time/currentTimeSecs) (:my-hostname supervisor) (:assignment-id supervisor) (keys @(:curr-assignment supervisor)) ;; used ports (.getMetadata isupervisor) (conf SUPERVISOR-SCHEDULER-META) - ((:uptime supervisor)) + (. (:uptime supervisor) upTime) (:version supervisor) - (mk-supervisor-capacities conf))))] + (mk-supervisor-capacities conf)))))] (heartbeat-fn) ;; should synchronize supervisor so it doesn't launch anything after being down (optimization) http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/worker.clj index ae5be57,fe8cfae..9863427 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@@ -19,16 -19,18 +19,18 @@@ (:require [clj-time.core :as time]) (:require [clj-time.coerce :as coerce]) (:require [org.apache.storm.daemon [executor :as executor]]) - (:require [org.apache.storm [disruptor :as disruptor] [cluster :as cluster]]) + (:require [org.apache.storm [disruptor :as disruptor]]) (:require [clojure.set :as set]) - (:require [org.apache.storm.messaging.loader :as msg-loader]) (:import [java.util.concurrent Executors] - [org.apache.storm.hooks IWorkerHook BaseWorkerHook]) - (:import [java.util ArrayList HashMap]) - (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue]) + [org.apache.storm.hooks IWorkerHook BaseWorkerHook] + [uk.org.lidalia.sysoutslf4j.context SysOutOverSLF4J]) + (:import [java.util ArrayList HashMap] + [java.util.concurrent.locks ReentrantReadWriteLock]) + (:import [org.apache.commons.io FileUtils]) + (:import [org.apache.storm.utils Utils ConfigUtils TransferDrainer ThriftTopologyUtils WorkerBackpressureThread DisruptorQueue Time]) (:import [org.apache.storm.grouping LoadMapping]) (:import [org.apache.storm.messaging TransportFactory]) - (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status]) + (:import [org.apache.storm.messaging TaskMessage IContext IConnection ConnectionWithStatus ConnectionWithStatus$Status DeserializingConnectionCallback]) (:import [org.apache.storm.daemon Shutdownable]) (:import [org.apache.storm.serialization KryoTupleSerializer]) (:import [org.apache.storm.generated StormTopology]) http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj index a36da3a,be4361a..0000000 deleted file mode 100644,100644 --- a/storm-core/src/clj/org/apache/storm/pacemaker/pacemaker_state_factory.clj +++ /dev/null @@@ -1,122 -1,141 +1,0 @@@ --;; Licensed to the Apache Software Foundation (ASF) under one --;; or more contributor license agreements. See the NOTICE file --;; distributed with this work for additional information --;; regarding copyright ownership. The ASF licenses this file --;; to you under the Apache License, Version 2.0 (the --;; "License"); you may not use this file except in compliance --;; with the License. You may obtain a copy of the License at --;; --;; http://www.apache.org/licenses/LICENSE-2.0 --;; --;; Unless required by applicable law or agreed to in writing, software --;; distributed under the License is distributed on an "AS IS" BASIS, --;; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. --;; See the License for the specific language governing permissions and --;; limitations under the License. -- --(ns org.apache.storm.pacemaker.pacemaker-state-factory -- (:require [org.apache.storm.pacemaker pacemaker] - [org.apache.storm.cluster-state [zookeeper-state-factory :as zk-factory]] -- [org.apache.storm -- [config :refer :all] - [cluster :refer :all] -- [log :refer :all] -- [util :as util]]) -- (:import [org.apache.storm.generated -- HBExecutionException HBServerMessageType HBMessage -- HBMessageData HBPulse] - [org.apache.storm.cluster ZKStateStorage ClusterUtils IStateStorage] - [org.apache.storm.cluster_state zookeeper_state_factory] - [org.apache.storm.cluster ClusterState] -- [org.apache.storm.pacemaker PacemakerClient]) -- (:gen-class - :implements [org.apache.storm.cluster.StateStorageFactory])) - :implements [org.apache.storm.cluster.ClusterStateFactory])) -- --;; So we can mock the client for testing --(defn makeClient [conf] -- (PacemakerClient. conf)) -- --(defn makeZKState [conf auth-conf acls context] - (ClusterUtils/mkStateStorage conf auth-conf acls context)) - (.mkState (zookeeper_state_factory.) conf auth-conf acls context)) -- --(def max-retries 10) - -(defn retry-on-exception - "Retries specific function on exception based on retries count" - [retries task-description f & args] - (let [res (try {:value (apply f args)} - (catch Exception e - (if (<= 0 retries) - (throw e) - {:exception e})))] - (if (:exception res) - (do - (log-error (:exception res) (str "Failed to " task-description ". Will make [" retries "] more attempts.")) - (recur (dec retries) task-description f args)) - (do - (log-debug (str "Successful " task-description ".")) - (:value res))))) -- --(defn -mkState [this conf auth-conf acls context] -- (let [zk-state (makeZKState conf auth-conf acls context) -- pacemaker-client (makeClient conf)] -- -- (reify - IStateStorage - ClusterState -- ;; Let these pass through to the zk-state. We only want to handle heartbeats. -- (register [this callback] (.register zk-state callback)) -- (unregister [this callback] (.unregister zk-state callback)) -- (set_ephemeral_node [this path data acls] (.set_ephemeral_node zk-state path data acls)) -- (create_sequential [this path data acls] (.create_sequential zk-state path data acls)) -- (set_data [this path data acls] (.set_data zk-state path data acls)) -- (delete_node [this path] (.delete_node zk-state path)) -- (delete_node_blobstore [this path nimbus-host-port-info] (.delete_node_blobstore zk-state path nimbus-host-port-info)) -- (get_data [this path watch?] (.get_data zk-state path watch?)) -- (get_data_with_version [this path watch?] (.get_data_with_version zk-state path watch?)) -- (get_version [this path watch?] (.get_version zk-state path watch?)) -- (get_children [this path watch?] (.get_children zk-state path watch?)) -- (mkdirs [this path acls] (.mkdirs zk-state path acls)) -- (node_exists [this path watch?] (.node_exists zk-state path watch?)) -- (add_listener [this listener] (.add_listener zk-state listener)) -- (sync_path [this path] (.sync_path zk-state path)) -- -- (set_worker_hb [this path data acls] - (util/retry-on-exception - (retry-on-exception -- max-retries -- "set_worker_hb" -- #(let [response -- (.send pacemaker-client -- (HBMessage. HBServerMessageType/SEND_PULSE -- (HBMessageData/pulse -- (doto (HBPulse.) -- (.set_id path) -- (.set_details data)))))] -- (if (= (.get_type response) HBServerMessageType/SEND_PULSE_RESPONSE) -- :ok -- (throw (HBExecutionException. "Invalid Response Type")))))) -- -- (delete_worker_hb [this path] - (util/retry-on-exception - (retry-on-exception -- max-retries -- "delete_worker_hb" -- #(let [response -- (.send pacemaker-client -- (HBMessage. HBServerMessageType/DELETE_PATH -- (HBMessageData/path path)))] -- (if (= (.get_type response) HBServerMessageType/DELETE_PATH_RESPONSE) -- :ok -- (throw (HBExecutionException. "Invalid Response Type")))))) -- -- (get_worker_hb [this path watch?] - (util/retry-on-exception - (retry-on-exception -- max-retries -- "get_worker_hb" -- #(let [response -- (.send pacemaker-client -- (HBMessage. HBServerMessageType/GET_PULSE -- (HBMessageData/path path)))] -- (if (= (.get_type response) HBServerMessageType/GET_PULSE_RESPONSE) -- (try -- (.get_details (.get_pulse (.get_data response))) -- (catch Exception e -- (throw (HBExecutionException. (.toString e))))) -- (throw (HBExecutionException. "Invalid Response Type")))))) -- -- (get_worker_hb_children [this path watch?] - (util/retry-on-exception - (retry-on-exception -- max-retries -- "get_worker_hb_children" -- #(let [response -- (.send pacemaker-client -- (HBMessage. HBServerMessageType/GET_ALL_NODES_FOR_PATH -- (HBMessageData/path path)))] -- (if (= (.get_type response) HBServerMessageType/GET_ALL_NODES_FOR_PATH_RESPONSE) -- (try -- (into [] (.get_pulseIds (.get_nodes (.get_data response)))) -- (catch Exception e -- (throw (HBExecutionException. (.toString e))))) -- (throw (HBExecutionException. "Invalid Response Type")))))) -- -- (close [this] -- (.close zk-state) -- (.close pacemaker-client))))) http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/stats.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/stats.clj index 0bf1757,8b37fc3..8632ed3 --- a/storm-core/src/clj/org/apache/storm/stats.clj +++ b/storm-core/src/clj/org/apache/storm/stats.clj @@@ -24,8 -24,8 +24,9 @@@ ExecutorAggregateStats SpecificAggregateStats SpoutAggregateStats TopologyPageInfo TopologyStats]) (:import [org.apache.storm.utils Utils]) + (:import [org.apache.storm.cluster StormClusterStateImpl]) - (:import [org.apache.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric]) + (:import [org.apache.storm.metric.internal MultiCountStatAndMetric MultiLatencyStatAndMetric] + [java.util Collection]) (:use [org.apache.storm log util]) (:use [clojure.math.numeric-tower :only [ceil]])) http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/testing.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/testing.clj index 5a0bdf2,c872742..eef7754 --- a/storm-core/src/clj/org/apache/storm/testing.clj +++ b/storm-core/src/clj/org/apache/storm/testing.clj @@@ -44,11 -45,12 +45,12 @@@ (:import [org.apache.storm.transactional.partitioned PartitionedTransactionalSpoutExecutor]) (:import [org.apache.storm.tuple Tuple]) (:import [org.apache.storm.generated StormTopology]) - (:import [org.apache.storm.task TopologyContext]) + (:import [org.apache.storm.task TopologyContext] + (org.apache.storm.messaging IContext) + [org.json.simple JSONValue]) - (:require [org.apache.storm [zookeeper :as zk]]) + (:import [org.apache.storm.cluster ZKStateStorage ClusterStateContext StormClusterStateImpl ClusterUtils]) - (:require [org.apache.storm.messaging.loader :as msg-loader]) (:require [org.apache.storm.daemon.acker :as acker]) - (:use [org.apache.storm cluster util thrift config log local-state])) + (:use [org.apache.storm util thrift config log local-state converter])) (defn feeder-spout [fields] http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/thrift.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/thrift.clj index 4dc21f9,779c1d1..7aab729 --- a/storm-core/src/clj/org/apache/storm/thrift.clj +++ b/storm-core/src/clj/org/apache/storm/thrift.clj @@@ -29,8 -29,9 +29,9 @@@ (:import [org.apache.storm.grouping CustomStreamGrouping]) (:import [org.apache.storm.topology TopologyBuilder]) (:import [org.apache.storm.clojure RichShellBolt RichShellSpout]) - (:import [org.apache.thrift.transport TTransport]) + (:import [org.apache.thrift.transport TTransport] + (org.json.simple JSONValue)) - (:use [org.apache.storm util config log zookeeper])) + (:use [org.apache.storm util config log])) (defn instantiate-java-object [^JavaObject obj] http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/ui/core.clj ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/clj/org/apache/storm/util.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/util.clj index 165d8ee,f685d12..72778bb --- a/storm-core/src/clj/org/apache/storm/util.clj +++ b/storm-core/src/clj/org/apache/storm/util.clj @@@ -20,9 -20,8 +20,9 @@@ (:import [java.io FileReader FileNotFoundException]) (:import [java.nio.file Paths]) (:import [org.apache.storm Config]) - (:import [org.apache.storm.generated ErrorInfo]) - (:import [org.apache.storm.utils Time Container ClojureTimerTask Utils - MutableObject MutableInt]) ++ (:import [org.apache.storm.generated ErrorInfo]) + (:import [org.apache.storm.utils Time ClojureTimerTask Utils + MutableObject]) (:import [org.apache.storm.security.auth NimbusPrincipal]) (:import [javax.security.auth Subject]) (:import [java.util UUID Random ArrayList List Collections]) @@@ -262,58 -163,9 +164,19 @@@ (instance? Boolean x) (boolean x) true x)) s)) +; move this func form convert.clj due to cyclic load dependency +(defn clojurify-error [^ErrorInfo error] + (if error + { + :error (.get_error error) + :time-secs (.get_error_time_secs error) + :host (.get_host error) + :port (.get_port error) + } + )) - (defmacro with-file-lock - [path & body] - `(let [f# (File. ~path) - _# (.createNewFile f#) - rf# (RandomAccessFile. f# "rw") - lock# (.. rf# (getChannel) (lock))] - (try - ~@body - (finally - (.release lock#) - (.close rf#))))) - - (defn tokenize-path - [^String path] - (let [toks (.split path "/")] - (vec (filter (complement empty?) toks)))) - - (defn assoc-conj - [m k v] - (merge-with concat m {k [v]})) - - ;; returns [ones in first set not in second, ones in second set not in first] - (defn set-delta - [old curr] - (let [s1 (set old) - s2 (set curr)] - [(set/difference s1 s2) (set/difference s2 s1)])) - - (defn parent-path - [path] - (let [toks (tokenize-path path)] - (str "/" (str/join "/" (butlast toks))))) - - (defn toks->path - [toks] - (str "/" (str/join "/" toks))) - - (defn normalize-path - [^String path] - (toks->path (tokenize-path path))) - + ;TODO: We're keeping this function around until all the code using it is properly tranlated to java + ;TODO: by properly having the for loop IN THE JAVA FUNCTION that originally used this function. (defn map-val [afn amap] (into {} http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java index b30d1d2,0000000..0c663f0 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/ClusterUtils.java @@@ -1,249 -1,0 +1,282 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.cluster; + +import clojure.lang.APersistentMap; ++import clojure.lang.PersistentArrayMap; ++import clojure.lang.RT; +import org.apache.storm.Config; +import org.apache.storm.generated.ClusterWorkerHeartbeat; +import org.apache.storm.generated.ExecutorInfo; +import org.apache.storm.generated.ExecutorStats; +import org.apache.storm.generated.ProfileAction; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Id; +import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; + ++import java.io.PrintWriter; ++import java.io.StringWriter; +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.security.NoSuchAlgorithmException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ClusterUtils { + + public static final String ZK_SEPERATOR = "/"; + + public static final String ASSIGNMENTS_ROOT = "assignments"; + public static final String CODE_ROOT = "code"; + public static final String STORMS_ROOT = "storms"; + public static final String SUPERVISORS_ROOT = "supervisors"; + public static final String WORKERBEATS_ROOT = "workerbeats"; + public static final String BACKPRESSURE_ROOT = "backpressure"; + public static final String ERRORS_ROOT = "errors"; + public static final String BLOBSTORE_ROOT = "blobstore"; + public static final String BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_ROOT = "blobstoremaxkeysequencenumber"; + public static final String NIMBUSES_ROOT = "nimbuses"; + public static final String CREDENTIALS_ROOT = "credentials"; + public static final String LOGCONFIG_ROOT = "logconfigs"; + public static final String PROFILERCONFIG_ROOT = "profilerconfigs"; + + public static final String ASSIGNMENTS_SUBTREE = ZK_SEPERATOR + ASSIGNMENTS_ROOT; + public static final String STORMS_SUBTREE = ZK_SEPERATOR + STORMS_ROOT; + public static final String SUPERVISORS_SUBTREE = ZK_SEPERATOR + SUPERVISORS_ROOT; + public static final String WORKERBEATS_SUBTREE = ZK_SEPERATOR + WORKERBEATS_ROOT; + public static final String BACKPRESSURE_SUBTREE = ZK_SEPERATOR + BACKPRESSURE_ROOT; + public static final String ERRORS_SUBTREE = ZK_SEPERATOR + ERRORS_ROOT; + public static final String BLOBSTORE_SUBTREE = ZK_SEPERATOR + BLOBSTORE_ROOT; + public static final String BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE = ZK_SEPERATOR + BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_ROOT; + public static final String NIMBUSES_SUBTREE = ZK_SEPERATOR + NIMBUSES_ROOT; + public static final String CREDENTIALS_SUBTREE = ZK_SEPERATOR + CREDENTIALS_ROOT; + public static final String LOGCONFIG_SUBTREE = ZK_SEPERATOR + LOGCONFIG_ROOT; + public static final String PROFILERCONFIG_SUBTREE = ZK_SEPERATOR + PROFILERCONFIG_ROOT; + + // A singleton instance allows us to mock delegated static methods in our + // tests by subclassing. + private static final ClusterUtils INSTANCE = new ClusterUtils(); + private static ClusterUtils _instance = INSTANCE; + + /** + * Provide an instance of this class for delegates to use. To mock out delegated methods, provide an instance of a subclass that overrides the + * implementation of the delegated method. + * + * @param u a Cluster instance + */ + public static void setInstance(ClusterUtils u) { + _instance = u; + } + + /** + * Resets the singleton instance to the default. This is helpful to reset the class to its original functionality when mocking is no longer desired. + */ + public static void resetInstance() { + _instance = INSTANCE; + } + + public static List<ACL> mkTopoOnlyAcls(Map topoConf) throws NoSuchAlgorithmException { + List<ACL> aclList = null; + String payload = (String) topoConf.get(Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD); + if (Utils.isZkAuthenticationConfiguredStormServer(topoConf)) { + aclList = new ArrayList<>(); + ACL acl1 = ZooDefs.Ids.CREATOR_ALL_ACL.get(0); + aclList.add(acl1); + ACL acl2 = new ACL(ZooDefs.Perms.READ, new Id("digest", DigestAuthenticationProvider.generateDigest(payload))); + aclList.add(acl2); + } + return aclList; + } + + public static String supervisorPath(String id) { + return SUPERVISORS_SUBTREE + ZK_SEPERATOR + id; + } + + public static String assignmentPath(String id) { + return ASSIGNMENTS_SUBTREE + ZK_SEPERATOR + id; + } + + public static String blobstorePath(String key) { + return BLOBSTORE_SUBTREE + ZK_SEPERATOR + key; + } + + public static String blobstoreMaxKeySequenceNumberPath(String key) { + return BLOBSTORE_MAX_KEY_SEQUENCE_NUMBER_SUBTREE + ZK_SEPERATOR + key; + } + + public static String nimbusPath(String id) { + return NIMBUSES_SUBTREE + ZK_SEPERATOR + id; + } + + public static String stormPath(String id) { + return STORMS_SUBTREE + ZK_SEPERATOR + id; + } + + public static String workerbeatStormRoot(String stormId) { + return WORKERBEATS_SUBTREE + ZK_SEPERATOR + stormId; + } + + public static String workerbeatPath(String stormId, String node, Long port) { + return workerbeatStormRoot(stormId) + ZK_SEPERATOR + node + "-" + port; + } + + public static String backpressureStormRoot(String stormId) { + return BACKPRESSURE_SUBTREE + ZK_SEPERATOR + stormId; + } + + public static String backpressurePath(String stormId, String node, Long port) { + return backpressureStormRoot(stormId) + ZK_SEPERATOR + node + "-" + port; + } + + public static String errorStormRoot(String stormId) { + return ERRORS_SUBTREE + ZK_SEPERATOR + stormId; + } + + public static String errorPath(String stormId, String componentId) { + try { + return errorStormRoot(stormId) + ZK_SEPERATOR + URLEncoder.encode(componentId, "UTF-8"); + } catch (UnsupportedEncodingException e) { + throw Utils.wrapInRuntime(e); + } + } + + public static String lastErrorPath(String stormId, String componentId) { + return errorPath(stormId, componentId) + "-last-error"; + } + + public static String credentialsPath(String stormId) { + return CREDENTIALS_SUBTREE + ZK_SEPERATOR + stormId; + } + + public static String logConfigPath(String stormId) { + return LOGCONFIG_SUBTREE + ZK_SEPERATOR + stormId; + } + + public static String profilerConfigPath(String stormId) { + return PROFILERCONFIG_SUBTREE + ZK_SEPERATOR + stormId; + } + + public static String profilerConfigPath(String stormId, String host, Long port, ProfileAction requestType) { + return profilerConfigPath(stormId) + ZK_SEPERATOR + host + "_" + port + "_" + requestType; + } + + public static <T> T maybeDeserialize(byte[] serialized, Class<T> clazz) { + if (serialized != null) { + return Utils.deserialize(serialized, clazz); + } + return null; + } + - // Ensures that we only return heartbeats for executors assigned to this worker - public static Map<ExecutorInfo, ClusterWorkerHeartbeat> convertExecutorBeats(List<ExecutorInfo> executors, ClusterWorkerHeartbeat workerHeartbeat) { - Map<ExecutorInfo, ClusterWorkerHeartbeat> executorWhb = new HashMap<>(); ++ /** ++ * Ensures that we only return heartbeats for executors assigned to this worker ++ * @param executors ++ * @param workerHeartbeat ++ * @return ++ */ ++ public static Map<ExecutorInfo, APersistentMap> convertExecutorBeats(List<ExecutorInfo> executors, ClusterWorkerHeartbeat workerHeartbeat) { ++ Map<ExecutorInfo, APersistentMap> executorWhb = new HashMap<>(); + Map<ExecutorInfo, ExecutorStats> executorStatsMap = workerHeartbeat.get_executor_stats(); + for (ExecutorInfo executor : executors) { + if (executorStatsMap.containsKey(executor)) { - executorWhb.put(executor, workerHeartbeat); ++ APersistentMap executorBeat = ++ new PersistentArrayMap(new Object[] { RT.keyword(null, "time-secs"), workerHeartbeat.get_time_secs(), RT.keyword(null, "uptime"), ++ workerHeartbeat.get_uptime_secs(), RT.keyword(null, "stats"), workerHeartbeat.get_executor_stats().get(executor) }); ++ executorWhb.put(executor, executorBeat); + } + } + return executorWhb; + } + + public IStormClusterState mkStormClusterStateImpl(Object stateStorage, List<ACL> acls, ClusterStateContext context) throws Exception { + if (stateStorage instanceof IStateStorage) { + return new StormClusterStateImpl((IStateStorage) stateStorage, acls, context, false); + } else { + IStateStorage Storage = _instance.mkStateStorageImpl((APersistentMap) stateStorage, (APersistentMap) stateStorage, acls, context); + return new StormClusterStateImpl(Storage, acls, context, true); + } + + } + - public IStateStorage mkStateStorageImpl(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) - throws Exception { ++ public IStateStorage mkStateStorageImpl(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception { + String className = null; + IStateStorage stateStorage = null; + if (config.get(Config.STORM_CLUSTER_STATE_STORE) != null) { + className = (String) config.get(Config.STORM_CLUSTER_STATE_STORE); + } else { + className = "org.apache.storm.cluster.ZKStateStorageFactory"; + } + Class clazz = Class.forName(className); + StateStorageFactory storageFactory = (StateStorageFactory) clazz.newInstance(); + stateStorage = storageFactory.mkStore(config, auth_conf, acls, context); + return stateStorage; + } + - public static IStateStorage mkStateStorage(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) - throws Exception { ++ public static IStateStorage mkStateStorage(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context) throws Exception { + return _instance.mkStateStorageImpl(config, auth_conf, acls, context); + } + + public static IStormClusterState mkStormClusterState(Object StateStorage, List<ACL> acls, ClusterStateContext context) throws Exception { + return _instance.mkStormClusterStateImpl(StateStorage, acls, context); + } + + // TO be remove + public static <K, V> HashMap<V, List<K>> reverseMap(Map<K, V> map) { + HashMap<V, List<K>> rtn = new HashMap<V, List<K>>(); + if (map == null) { + return rtn; + } + for (Map.Entry<K, V> entry : map.entrySet()) { + K key = entry.getKey(); + V val = entry.getValue(); + List<K> list = rtn.get(val); + if (list == null) { + list = new ArrayList<K>(); + rtn.put(entry.getValue(), list); + } + list.add(key); + } + return rtn; + } ++ ++ public static String StringifyError(Throwable error) { ++ String errorString = null; ++ StringWriter result = null; ++ PrintWriter printWriter = null; ++ try { ++ result = new StringWriter(); ++ printWriter = new PrintWriter(result); ++ error.printStackTrace(printWriter); ++ if (result != null) { ++ errorString = result.toString(); ++ } ++ } finally { ++ try { ++ if (result != null) ++ result.close(); ++ if (printWriter != null) ++ printWriter.close(); ++ } catch (Exception e) { ++ } ++ } ++ return errorString; ++ } +} http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java index 59d1af7,0000000..01cf56a mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/IStormClusterState.java @@@ -1,129 -1,0 +1,124 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.cluster; + +import clojure.lang.APersistentMap; +import clojure.lang.IFn; +import org.apache.storm.generated.*; +import org.apache.storm.nimbus.NimbusInfo; + +import java.security.NoSuchAlgorithmException; +import java.util.List; +import java.util.Map; + +public interface IStormClusterState { + public List<String> assignments(IFn callback); + + public Assignment assignmentInfo(String stormId, IFn callback); + + public APersistentMap assignmentInfoWithVersion(String stormId, IFn callback); + + public Integer assignmentVersion(String stormId, IFn callback) throws Exception; + - // returns key information under /storm/blobstore/key + public List<String> blobstoreInfo(String blobKey); + - // returns list of nimbus summaries stored under /stormroot/nimbuses/<nimbus-ids> -> <data> + public List nimbuses(); + - // adds the NimbusSummary to /stormroot/nimbuses/nimbus-id + public void addNimbusHost(String nimbusId, NimbusSummary nimbusSummary); + + public List<String> activeStorms(); + + public StormBase stormBase(String stormId, IFn callback); + + public ClusterWorkerHeartbeat getWorkerHeartbeat(String stormId, String node, Long port); + + public List<ProfileRequest> getWorkerProfileRequests(String stormId, NodeInfo nodeInfo, boolean isThrift); + + public List<ProfileRequest> getTopologyProfileRequests(String stormId, boolean isThrift); + + public void setWorkerProfileRequest(String stormId, ProfileRequest profileRequest); + + public void deleteTopologyProfileRequests(String stormId, ProfileRequest profileRequest); + - public Map<ExecutorInfo, ClusterWorkerHeartbeat> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort); ++ public Map<ExecutorInfo, APersistentMap> executorBeats(String stormId, Map<List<Long>, NodeInfo> executorNodePort); + + public List<String> supervisors(IFn callback); + + public SupervisorInfo supervisorInfo(String supervisorId); // returns nil if doesn't exist + + public void setupHeatbeats(String stormId); + + public void teardownHeartbeats(String stormId); + + public void teardownTopologyErrors(String stormId); + + public List<String> heartbeatStorms(); + + public List<String> errorTopologies(); + + public void setTopologyLogConfig(String stormId, LogConfig logConfig); + + public LogConfig topologyLogConfig(String stormId, IFn cb); + + public void workerHeartbeat(String stormId, String node, Long port, ClusterWorkerHeartbeat info); + + public void removeWorkerHeartbeat(String stormId, String node, Long port); + + public void supervisorHeartbeat(String supervisorId, SupervisorInfo info); + + public void workerBackpressure(String stormId, String node, Long port, boolean on); + + public boolean topologyBackpressure(String stormId, IFn callback); + + public void setupBackpressure(String stormId); + + public void removeWorkerBackpressure(String stormId, String node, Long port); + + public void activateStorm(String stormId, StormBase stormBase); + + public void updateStorm(String stormId, StormBase newElems); + + public void removeStormBase(String stormId); + + public void setAssignment(String stormId, Assignment info); + - // sets up information related to key consisting of nimbus - // host:port and version info of the blob + public void setupBlobstore(String key, NimbusInfo nimbusInfo, Integer versionInfo); + + public List<String> activeKeys(); + + public List<String> blobstore(IFn callback); + + public void removeStorm(String stormId); + + public void removeBlobstoreKey(String blobKey); + + public void removeKeyVersion(String blobKey); + - public void reportError(String stormId, String componentId, String node, Long port, String error); ++ public void reportError(String stormId, String componentId, String node, Long port, Throwable error); + + public List<ErrorInfo> errors(String stormId, String componentId); + + public ErrorInfo lastError(String stormId, String componentId); + + public void setCredentials(String stormId, Credentials creds, Map topoConf) throws NoSuchAlgorithmException; + + public Credentials credentials(String stormId, IFn callback); + + public void disconnect(); + +} http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java index 1226c55,0000000..a9c4d89 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/PaceMakerStateStorage.java @@@ -1,212 -1,0 +1,212 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.cluster; + +import clojure.lang.APersistentMap; +import org.apache.curator.framework.state.ConnectionStateListener; +import org.apache.storm.callback.ZKStateChangedCallback; +import org.apache.storm.generated.*; +import org.apache.storm.pacemaker.PacemakerClient; +import org.apache.storm.utils.Utils; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class PaceMakerStateStorage implements IStateStorage { + + private static Logger LOG = LoggerFactory.getLogger(PaceMakerStateStorage.class); + + private PacemakerClient pacemakerClient; + private IStateStorage stateStorage; + private static final int maxRetries = 10; + + public PaceMakerStateStorage(PacemakerClient pacemakerClient, IStateStorage stateStorage) throws Exception { + this.pacemakerClient = pacemakerClient; + this.stateStorage = stateStorage; + } + + @Override + public String register(ZKStateChangedCallback callback) { + return stateStorage.register(callback); + } + + @Override + public void unregister(String id) { + stateStorage.unregister(id); + } + + @Override + public String create_sequential(String path, byte[] data, List<ACL> acls) { + return stateStorage.create_sequential(path, data, acls); + } + + @Override + public void mkdirs(String path, List<ACL> acls) { + stateStorage.mkdirs(path, acls); + } + + @Override + public void delete_node(String path) { + stateStorage.delete_node(path); + } + + @Override + public void set_ephemeral_node(String path, byte[] data, List<ACL> acls) { + stateStorage.set_ephemeral_node(path, data, acls); + } + + @Override + public Integer get_version(String path, boolean watch) throws Exception { + return stateStorage.get_version(path, watch); + } + + @Override + public boolean node_exists(String path, boolean watch) { + return stateStorage.node_exists(path, watch); + } + + @Override + public List<String> get_children(String path, boolean watch) { + return stateStorage.get_children(path, watch); + } + + @Override + public void close() { + stateStorage.close(); + pacemakerClient.close(); + } + + @Override + public void set_data(String path, byte[] data, List<ACL> acls) { + stateStorage.set_data(path, data, acls); + } + + @Override + public byte[] get_data(String path, boolean watch) { + return stateStorage.get_data(path, watch); + } + + @Override + public APersistentMap get_data_with_version(String path, boolean watch) { + return stateStorage.get_data_with_version(path, watch); + } + + @Override + public void set_worker_hb(String path, byte[] data, List<ACL> acls) { + int retry = maxRetries; + while (true) { + try { + HBPulse hbPulse = new HBPulse(); + hbPulse.set_id(path); + hbPulse.set_details(data); + HBMessage message = new HBMessage(HBServerMessageType.SEND_PULSE, HBMessageData.pulse(hbPulse)); + HBMessage response = pacemakerClient.send(message); + if (response.get_type() != HBServerMessageType.SEND_PULSE_RESPONSE) { + throw new HBExecutionException("Invalid Response Type"); + } + LOG.debug("Successful set_worker_hb"); + break; + } catch (Exception e) { + if (retry <= 0) { + throw Utils.wrapInRuntime(e); + } + LOG.error("{} Failed to set_worker_hb. Will make {} more attempts.", e.getMessage(), retry--); + } + } + } + + @Override + public byte[] get_worker_hb(String path, boolean watch) { + int retry = maxRetries; + while (true) { + try { + HBMessage message = new HBMessage(HBServerMessageType.GET_PULSE, HBMessageData.path(path)); + HBMessage response = pacemakerClient.send(message); + if (response.get_type() != HBServerMessageType.GET_PULSE_RESPONSE) { + throw new HBExecutionException("Invalid Response Type"); + } + LOG.debug("Successful get_worker_hb"); + return response.get_data().get_pulse().get_details(); + } catch (Exception e) { + if (retry <= 0) { + throw Utils.wrapInRuntime(e); + } + LOG.error("{} Failed to get_worker_hb. Will make {} more attempts.", e.getMessage(), retry--); + } + } + } + + @Override + public List<String> get_worker_hb_children(String path, boolean watch) { + int retry = maxRetries; + while (true) { + try { - HBMessage message = new HBMessage(HBServerMessageType.GET_PULSE, HBMessageData.path(path)); ++ HBMessage message = new HBMessage(HBServerMessageType.GET_ALL_NODES_FOR_PATH, HBMessageData.path(path)); + HBMessage response = pacemakerClient.send(message); + if (response.get_type() != HBServerMessageType.GET_ALL_NODES_FOR_PATH_RESPONSE) { + throw new HBExecutionException("Invalid Response Type"); + } + LOG.debug("Successful get_worker_hb"); + return response.get_data().get_nodes().get_pulseIds(); + } catch (Exception e) { + if (retry <= 0) { + throw Utils.wrapInRuntime(e); + } + LOG.error("{} Failed to get_worker_hb_children. Will make {} more attempts.", e.getMessage(), retry--); + } + } + } + + @Override + public void delete_worker_hb(String path) { + int retry = maxRetries; + while (true) { + try { - HBMessage message = new HBMessage(HBServerMessageType.GET_PULSE, HBMessageData.path(path)); ++ HBMessage message = new HBMessage(HBServerMessageType.DELETE_PATH, HBMessageData.path(path)); + HBMessage response = pacemakerClient.send(message); + if (response.get_type() != HBServerMessageType.DELETE_PATH_RESPONSE) { + throw new HBExecutionException("Invalid Response Type"); + } + LOG.debug("Successful get_worker_hb"); + break; + } catch (Exception e) { + if (retry <= 0) { + throw Utils.wrapInRuntime(e); + } + LOG.error("{} Failed to delete_worker_hb. Will make {} more attempts.", e.getMessage(), retry--); + } + } + } + + @Override + public void add_listener(ConnectionStateListener listener) { + stateStorage.add_listener(listener); + } + + @Override + public void sync_path(String path) { + stateStorage.sync_path(path); + } + + @Override + public void delete_node_blobstore(String path, String nimbusHostPortInfo) { + stateStorage.delete_node_blobstore(path, nimbusHostPortInfo); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/924f9a29/storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java ---------------------------------------------------------------------- diff --cc storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java index c2477d6,0000000..110da41 mode 100644,000000..100644 --- a/storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java +++ b/storm-core/src/jvm/org/apache/storm/cluster/StateStorageFactory.java @@@ -1,28 -1,0 +1,28 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.cluster; + +import clojure.lang.APersistentMap; +import java.util.List; +import org.apache.zookeeper.data.ACL; + +public interface StateStorageFactory { - ++ + IStateStorage mkStore(APersistentMap config, APersistentMap auth_conf, List<ACL> acls, ClusterStateContext context); + +}