Merge branch 'master' into ClusterUtils
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d5463879 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d5463879 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d5463879 Branch: refs/heads/master Commit: d5463879c0441a5a57eb23653a70bcf95d2efbaf Parents: e0f3cb5 4699990 Author: xiaojian.fxj <[email protected]> Authored: Wed Feb 17 11:28:10 2016 +0800 Committer: xiaojian.fxj <[email protected]> Committed: Wed Feb 17 11:28:10 2016 +0800 ---------------------------------------------------------------------- CHANGELOG.md | 8 + bin/storm-config.cmd | 4 + bin/storm.cmd | 22 +- bin/storm.py | 8 +- .../spout/RandomNumberGeneratorSpout.java | 95 +++++ .../trident/TridentMinMaxOfDevicesTopology.java | 201 +++++++++++ .../TridentMinMaxOfVehiclesTopology.java | 180 ++++++++++ pom.xml | 6 + storm-core/pom.xml | 9 + .../clj/org/apache/storm/command/activate.clj | 24 -- .../clj/org/apache/storm/command/deactivate.clj | 24 -- .../org/apache/storm/command/kill_topology.clj | 29 -- .../src/clj/org/apache/storm/command/list.clj | 38 -- .../clj/org/apache/storm/daemon/executor.clj | 68 ++-- .../src/clj/org/apache/storm/daemon/worker.clj | 54 +-- .../src/clj/org/apache/storm/disruptor.clj | 89 ----- .../jvm/org/apache/storm/command/Activate.java | 40 +++ .../src/jvm/org/apache/storm/command/CLI.java | 353 +++++++++++++++++++ .../org/apache/storm/command/Deactivate.java | 40 +++ .../org/apache/storm/command/KillTopology.java | 51 +++ .../src/jvm/org/apache/storm/command/List.java | 50 +++ .../jvm/org/apache/storm/trident/Stream.java | 121 ++++++- .../operation/builtin/ComparisonAggregator.java | 91 +++++ .../storm/trident/operation/builtin/Max.java | 37 ++ .../operation/builtin/MaxWithComparator.java | 51 +++ .../storm/trident/operation/builtin/Min.java | 36 ++ .../operation/builtin/MinWithComparator.java | 51 +++ .../org/apache/storm/utils/DisruptorQueue.java | 15 +- .../org/apache/storm/utils/NimbusClient.java | 19 +- .../src/jvm/org/apache/storm/utils/Utils.java | 20 +- .../jvm/org/apache/storm/command/TestCLI.java | 59 ++++ 31 files changed, 1593 insertions(+), 300 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/d5463879/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/executor.clj index 33b89ed,3af365b..902650c --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@@ -38,8 -38,10 +38,9 @@@ (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping]) (:import [java.lang Thread Thread$UncaughtExceptionHandler] [java.util.concurrent ConcurrentLinkedQueue] - [org.json.simple JSONValue]) - (:require [org.apache.storm [thrift :as thrift] [disruptor :as disruptor] [stats :as stats]]) + [org.json.simple JSONValue] + [com.lmax.disruptor.dsl ProducerType]) - (:require [org.apache.storm [thrift :as thrift] - [cluster :as cluster] [stats :as stats]]) ++ (:require [org.apache.storm [thrift :as thrift] [stats :as stats]]) (:require [org.apache.storm.daemon [task :as task]]) (:require [org.apache.storm.daemon.builtin-metrics :as builtin-metrics]) (:require [clojure.set :as set])) http://git-wip-us.apache.org/repos/asf/storm/blob/d5463879/storm-core/src/clj/org/apache/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --cc storm-core/src/clj/org/apache/storm/daemon/worker.clj index 9863427,83ae9be..b80cd9e --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@@ -19,7 -19,7 +19,7 @@@ (:require [clj-time.core :as time]) (:require [clj-time.coerce :as coerce]) (:require [org.apache.storm.daemon [executor :as executor]]) - (:require [org.apache.storm [disruptor :as disruptor]]) - (:require [org.apache.storm [cluster :as cluster]]) ++ (:require [clojure.set :as set]) (:import [java.util.concurrent Executors] [org.apache.storm.hooks IWorkerHook BaseWorkerHook] @@@ -244,13 -244,14 +244,14 @@@ ) :timer-name timer-name)) -(defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf cluster-state storm-cluster-state] +(defn worker-data [conf mq-context storm-id assignment-id port worker-id storm-conf state-store storm-cluster-state] (let [assignment-versions (atom {}) executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions)) - transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) + transfer-queue (DisruptorQueue. "worker-transfer-queue" + (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) - :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) - :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS)) + (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) + (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS)) executor-receive-queue-map (mk-receive-queue-map storm-conf executors) receive-queue-map (->> executor-receive-queue-map
