Repository: storm Updated Branches: refs/heads/1.x-branch 25fa9dd7c -> c4404cab6
STORM-2153: New Metrics Reporting API Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fa1e59f4 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fa1e59f4 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fa1e59f4 Branch: refs/heads/1.x-branch Commit: fa1e59f4408f4017d4b6c69e672eb7c27d68f3a7 Parents: e85b64a Author: P. Taylor Goetz <[email protected]> Authored: Tue Jul 11 13:58:16 2017 -0400 Committer: P. Taylor Goetz <[email protected]> Committed: Tue Jul 11 13:58:16 2017 -0400 ---------------------------------------------------------------------- conf/defaults.yaml | 25 ++++ .../storm/starter/ExclamationTopology.java | 2 +- .../apache/storm/starter/ReliableWordCount.java | 121 +++++++++++++++++ .../apache/storm/starter/WordCountTopology.java | 2 +- external/storm-autocreds/pom.xml | 4 +- .../hdfs/avro/ConfluentAvroSerializer.java | 2 +- pom.xml | 10 ++ storm-core/pom.xml | 17 ++- .../clj/org/apache/storm/daemon/executor.clj | 18 ++- .../src/clj/org/apache/storm/daemon/task.clj | 7 +- .../src/clj/org/apache/storm/daemon/worker.clj | 12 +- .../src/clj/org/apache/storm/disruptor.clj | 5 +- storm-core/src/jvm/org/apache/storm/Config.java | 3 + .../apache/storm/metrics2/DisruptorMetrics.java | 93 +++++++++++++ .../org/apache/storm/metrics2/SimpleGauge.java | 38 ++++++ .../storm/metrics2/StormMetricRegistry.java | 133 +++++++++++++++++++ .../reporters/ConsoleStormReporter.java | 63 +++++++++ .../metrics2/reporters/CsvStormReporter.java | 93 +++++++++++++ .../reporters/GangliaStormReporter.java | 133 +++++++++++++++++++ .../reporters/GraphiteStormReporter.java | 100 ++++++++++++++ .../metrics2/reporters/JmxStormReporter.java | 88 ++++++++++++ .../reporters/SheduledStormReporter.java | 71 ++++++++++ .../storm/metrics2/reporters/StormReporter.java | 32 +++++ .../org/apache/storm/task/TopologyContext.java | 26 ++++ .../org/apache/storm/utils/DisruptorQueue.java | 43 ++++-- .../utils/DisruptorQueueBackpressureTest.java | 2 +- .../apache/storm/utils/DisruptorQueueTest.java | 4 +- 27 files changed, 1111 insertions(+), 36 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/conf/defaults.yaml ---------------------------------------------------------------------- diff --git a/conf/defaults.yaml b/conf/defaults.yaml index f89211b..b01e0b7 100644 --- a/conf/defaults.yaml +++ b/conf/defaults.yaml @@ -293,3 +293,28 @@ storm.daemon.metrics.reporter.plugins: # configuration of cluster metrics consumer storm.cluster.metrics.consumer.publish.interval.secs: 60 + + +storm.metrics.reporters: + # Graphite Reporter + - class: "org.apache.storm.metrics2.reporters.GraphiteStormReporter" + daemons: + - "supervisor" + - "nimbus" + - "worker" + report.period: 60 + report.period.units: "SECONDS" + graphite.host: "localhost" + graphite.port: 2003 + + # Console Reporter + - class: "org.apache.storm.metrics2.reporters.ConsoleStormReporter" + daemons: + - "worker" + report.period: 10 + report.period.units: "SECONDS" + + #TODO: not funtional, but you get the idea + filters: + "org.apache.storm.metrics2.filters.RegexFilter": + expression: ".*my_component.*emitted.*" http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java index 26e0430..9284b52 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ExclamationTopology.java @@ -79,7 +79,7 @@ public class ExclamationTopology { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("test", conf, builder.createTopology()); - Utils.sleep(10000); + Utils.sleep(100000); cluster.killTopology("test"); cluster.shutdown(); } http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/examples/storm-starter/src/jvm/org/apache/storm/starter/ReliableWordCount.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/ReliableWordCount.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReliableWordCount.java new file mode 100644 index 0000000..f05b521 --- /dev/null +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/ReliableWordCount.java @@ -0,0 +1,121 @@ +package org.apache.storm.starter; + +import org.apache.storm.Config; +import org.apache.storm.LocalCluster; +import org.apache.storm.spout.SpoutOutputCollector; +import org.apache.storm.task.TopologyContext; +import org.apache.storm.topology.BasicOutputCollector; +import org.apache.storm.topology.OutputFieldsDeclarer; +import org.apache.storm.topology.TopologyBuilder; +import org.apache.storm.topology.base.BaseBasicBolt; +import org.apache.storm.topology.base.BaseRichSpout; +import org.apache.storm.tuple.Fields; +import org.apache.storm.tuple.Tuple; +import org.apache.storm.tuple.Values; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.Random; +import java.util.UUID; + + +public class ReliableWordCount { + public static class RandomSentenceSpout extends BaseRichSpout { + private static final Logger LOG = LoggerFactory.getLogger(RandomSentenceSpout.class); + + SpoutOutputCollector _collector; + Random _rand; + + + @Override + public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) { + _collector = collector; + _rand = new Random(); + } + + @Override + public void nextTuple() { + Utils.sleep(10); + String[] sentences = new String[]{sentence("the cow jumped over the moon"), sentence("an apple a day keeps the doctor away"), + sentence("four score and seven years ago"), sentence("snow white and the seven dwarfs"), sentence("i am at two with nature")}; + final String sentence = sentences[_rand.nextInt(sentences.length)]; + + _collector.emit(new Values(sentence), UUID.randomUUID()); + } + + protected String sentence(String input) { + return input; + } + + @Override + public void ack(Object id) { + } + + @Override + public void fail(Object id) { + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + } + + + public static class SplitSentence extends BaseBasicBolt { + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String sentence = tuple.getString(0); + for (String word: sentence.split("\\s+")) { + collector.emit(new Values(word, 1)); + } + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + } + + public static class WordCount extends BaseBasicBolt { + Map<String, Integer> counts = new HashMap<String, Integer>(); + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String word = tuple.getString(0); + Integer count = counts.get(word); + if (count == null) + count = 0; + count++; + counts.put(word, count); + collector.emit(new Values(word, count)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + } + + public static void main(String[] args) throws Exception { + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("spout", new RandomSentenceSpout(), 4); + + builder.setBolt("split", new SplitSentence(), 4).shuffleGrouping("spout"); + builder.setBolt("count", new WordCount(), 4).fieldsGrouping("split", new Fields("word")); + + Config conf = new Config(); + conf.setMaxTaskParallelism(3); + + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("word-count", conf, builder.createTopology()); + + Thread.sleep(600000); + + cluster.shutdown(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java ---------------------------------------------------------------------- diff --git a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java index e4a5711..0611894 100644 --- a/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java +++ b/examples/storm-starter/src/jvm/org/apache/storm/starter/WordCountTopology.java @@ -98,7 +98,7 @@ public class WordCountTopology { LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); - Thread.sleep(10000); + Thread.sleep(60000); cluster.shutdown(); } http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/external/storm-autocreds/pom.xml ---------------------------------------------------------------------- diff --git a/external/storm-autocreds/pom.xml b/external/storm-autocreds/pom.xml index ab654a3..c05b620 100644 --- a/external/storm-autocreds/pom.xml +++ b/external/storm-autocreds/pom.xml @@ -15,9 +15,7 @@ See the License for the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" - xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>storm</artifactId> <groupId>org.apache.storm</groupId> http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java ---------------------------------------------------------------------- diff --git a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java index 2008a3e..087aec5 100644 --- a/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java +++ b/external/storm-hdfs/src/main/java/org/apache/storm/hdfs/avro/ConfluentAvroSerializer.java @@ -27,7 +27,7 @@ import java.io.IOException; import java.util.Map; /** - * This class provides a mechanism to utilize the Confluent Schema Registry (https://github.com/confluentinc/schema-registry) + * This class provides a mechanism to utilize the Confluent Schema StormMetricRegistry (https://github.com/confluentinc/schema-registry) * for Storm to (de)serialize Avro generic records across a topology. It assumes the schema registry is up and running * completely independent of Storm. */ http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index f53171b..6ecb150 100644 --- a/pom.xml +++ b/pom.xml @@ -886,6 +886,16 @@ <version>${metrics.version}</version> </dependency> <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-graphite</artifactId> + <version>${metrics.version}</version> + </dependency> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-ganglia</artifactId> + <version>${metrics.version}</version> + </dependency> + <dependency> <groupId>metrics-clojure</groupId> <artifactId>metrics-clojure</artifactId> <version>${metrics-clojure.version}</version> http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/pom.xml ---------------------------------------------------------------------- diff --git a/storm-core/pom.xml b/storm-core/pom.xml index 0497bdc..e10222a 100644 --- a/storm-core/pom.xml +++ b/storm-core/pom.xml @@ -281,6 +281,14 @@ <artifactId>metrics-core</artifactId> </dependency> <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-graphite</artifactId> + </dependency> + <dependency> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-ganglia</artifactId> + </dependency> + <dependency> <groupId>metrics-clojure</groupId> <artifactId>metrics-clojure</artifactId> </dependency> @@ -526,7 +534,6 @@ <include>org.clojure:tools.namespace</include> <include>cheshire:cheshire</include> <include>org.clojure:core.incubator</include> - <include>io.dropwizard.metrics:*</include> <include>metrics-clojure:*</include> </includes> </artifactSet> @@ -700,10 +707,10 @@ <pattern>org.eclipse.jetty</pattern> <shadedPattern>org.apache.storm.shade.org.eclipse.jetty</shadedPattern> </relocation> - <relocation> - <pattern>com.codahale.metrics</pattern> - <shadedPattern>org.apache.storm.shade.com.codahale.metrics</shadedPattern> - </relocation> + <!--<relocation>--> + <!--<pattern>com.codahale.metrics</pattern>--> + <!--<shadedPattern>org.apache.storm.shade.com.codahale.metrics</shadedPattern>--> + <!--</relocation>--> <relocation> <pattern>metrics.core</pattern> <shadedPattern>org.apache.storm.shade.metrics.core</shadedPattern> http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index 8126a80..3e5dd20 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -35,6 +35,8 @@ (:import [org.apache.storm.metric.api IMetric IMetricsConsumer$TaskInfo IMetricsConsumer$DataPoint StateMetric]) (:import [org.apache.storm Config Constants]) (:import [org.apache.storm.cluster ClusterStateContext DaemonType]) + (:import [org.apache.storm.metrics2 StormMetricRegistry]) + (:import [com.codahale.metrics Meter]) (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping]) (:import [java.util.concurrent ConcurrentLinkedQueue]) (:require [org.apache.storm [thrift :as thrift] @@ -231,6 +233,8 @@ (str "executor" executor-id "-send-queue") (storm-conf TOPOLOGY-EXECUTOR-SEND-BUFFER-SIZE) (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) + (.getStormId worker-context) + (.getThisWorkerPort worker-context) :producer-type :single-threaded :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS)) @@ -275,7 +279,9 @@ (log-message "Got interrupted excpetion shutting thread down...") ((:suicide-fn <>)))) :sampler (mk-stats-sampler storm-conf) - :spout-throttling-metrics (if (= executor-type :spout) + :failed-meter (StormMetricRegistry/meter "failed" worker-context component-id) + :acked-meter (StormMetricRegistry/meter "acked" worker-context component-id) + :spout-throttling-metrics (if (= executor-type :spout) (builtin-metrics/make-spout-throttling-data) nil) ;; TODO: add in the executor-specific stuff in a :specific... or make a spout-data, bolt-data function? @@ -429,10 +435,12 @@ (defn- fail-spout-msg [executor-data task-data msg-id tuple-info time-delta reason id debug?] (let [^ISpout spout (:object task-data) storm-conf (:storm-conf executor-data) - task-id (:task-id task-data)] + task-id (:task-id task-data) + failed-meter (:failed-meter executor-data)] ;;TODO: need to throttle these when there's lots of failures (when debug? (log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " MSG-ID: " msg-id)) + (.mark failed-meter) (.fail spout msg-id) (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta)) (when time-delta @@ -440,8 +448,10 @@ (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id debug?] (let [^ISpout spout (:object task-data) - task-id (:task-id task-data)] + task-id (:task-id task-data) + acked-meter (:acked-meter executor-data)] (when debug? (log-message "SPOUT Acking message " id " " msg-id)) + (.mark acked-meter) (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta @@ -809,6 +819,7 @@ (let [delta (tuple-time-delta! tuple)] (when debug? (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple)) + (.mark ^Meter (:acked-meter (:executor-data task-data))) (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta)) (when delta (stats/bolt-acked-tuple! executor-stats @@ -824,6 +835,7 @@ debug? (= true (storm-conf TOPOLOGY-DEBUG))] (when debug? (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple)) + (.mark ^Meter (:failed-meter (:executor-data task-data))) (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta)) (when delta (stats/bolt-failed-tuple! executor-stats http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/clj/org/apache/storm/daemon/task.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj index 1ae9b22..2e4df75 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/task.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj @@ -23,10 +23,12 @@ (:import [org.apache.storm.hooks.info SpoutAckInfo SpoutFailInfo EmitInfo BoltFailInfo BoltAckInfo]) (:import [org.apache.storm.task TopologyContext ShellBolt WorkerTopologyContext]) + (:import [org.apache.storm.metrics2 StormMetricRegistry]) (:import [org.apache.storm.utils Utils]) (:import [org.apache.storm.generated ShellComponent JavaObject]) (:import [org.apache.storm.spout ShellSpout]) (:import [java.util Collection List ArrayList]) + (:import [com.codahale.metrics Meter]) (:require [org.apache.storm [thrift :as thrift] [stats :as stats]]) @@ -128,9 +130,11 @@ stream->component->grouper (:stream->component->grouper executor-data) user-context (:user-context task-data) executor-stats (:stats executor-data) - debug? (= true (storm-conf TOPOLOGY-DEBUG))] + debug? (= true (storm-conf TOPOLOGY-DEBUG)) + ^Meter emitted-meter (StormMetricRegistry/meter "emitted" worker-context component-id)] (fn ([^Integer out-task-id ^String stream ^List values] + (.mark emitted-meter) (when debug? (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values)) (let [target-component (.getComponentId worker-context out-task-id) @@ -147,6 +151,7 @@ (if out-task-id [out-task-id]) )) ([^String stream ^List values] + (.mark emitted-meter) (when debug? (log-message "Emitting: " component-id " " stream " " values)) (let [out-tasks (ArrayList.)] http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/clj/org/apache/storm/daemon/worker.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/worker.clj b/storm-core/src/clj/org/apache/storm/daemon/worker.clj index 6626272..b2810db 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/worker.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/worker.clj @@ -43,6 +43,7 @@ (:import [org.apache.logging.log4j Level]) (:import [org.apache.logging.log4j.core.config LoggerConfig]) (:import [org.apache.storm.generated LogConfig LogLevelAction]) + (:import [org.apache.storm.metrics2 StormMetricRegistry]) (:gen-class)) (defmulti mk-suicide-fn cluster-mode) @@ -204,17 +205,19 @@ (transfer-fn serializer tuple-batch))) transfer-fn))) -(defn- mk-receive-queue-map [storm-conf executors] +(defn- mk-receive-queue-map [storm-conf executors storm-id port] (->> executors ;; TODO: this depends on the type of executor (map (fn [e] [e (disruptor/disruptor-queue (str "receive-queue" e) (storm-conf TOPOLOGY-EXECUTOR-RECEIVE-BUFFER-SIZE) (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) + storm-id port :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS))])) (into {}) )) + (defn- stream->fields [^StormTopology topology component] (->> (ThriftTopologyUtils/getComponentCommon topology component) .get_streams @@ -253,9 +256,10 @@ executors (set (read-worker-executors storm-conf storm-cluster-state storm-id assignment-id port assignment-versions)) transfer-queue (disruptor/disruptor-queue "worker-transfer-queue" (storm-conf TOPOLOGY-TRANSFER-BUFFER-SIZE) (storm-conf TOPOLOGY-DISRUPTOR-WAIT-TIMEOUT-MILLIS) + storm-id port :batch-size (storm-conf TOPOLOGY-DISRUPTOR-BATCH-SIZE) :batch-timeout (storm-conf TOPOLOGY-DISRUPTOR-BATCH-TIMEOUT-MILLIS)) - executor-receive-queue-map (mk-receive-queue-map storm-conf executors) + executor-receive-queue-map (mk-receive-queue-map storm-conf executors storm-id port) receive-queue-map (->> executor-receive-queue-map (mapcat (fn [[e queue]] (for [t (executor-id->tasks e)] [t queue]))) @@ -595,7 +599,7 @@ (spit (worker-artifacts-pid-path conf storm-id port) pid))) (declare establish-log-setting-callback) - + (StormMetricRegistry/start conf DaemonType/WORKER) ;; start out with empty list of timeouts (def latest-log-config (atom {})) (def original-log-levels (atom {})) @@ -689,6 +693,8 @@ (close-resources worker) + (StormMetricRegistry/stop) + (log-message "Trigger any worker shutdown hooks") (run-worker-shutdown-hooks worker) http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/clj/org/apache/storm/disruptor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/disruptor.clj b/storm-core/src/clj/org/apache/storm/disruptor.clj index 1546b3f..73a9d84 100644 --- a/storm-core/src/clj/org/apache/storm/disruptor.clj +++ b/storm-core/src/clj/org/apache/storm/disruptor.clj @@ -16,6 +16,7 @@ (ns org.apache.storm.disruptor (:import [org.apache.storm.utils DisruptorQueue WorkerBackpressureCallback DisruptorBackpressureCallback]) + (:import [org.apache.storm.task WorkerTopologyContext]) (:import [com.lmax.disruptor.dsl ProducerType]) (:require [clojure [string :as str]]) (:require [clojure [set :as set]]) @@ -27,10 +28,10 @@ :single-threaded ProducerType/SINGLE}) (defnk disruptor-queue - [^String queue-name buffer-size timeout :producer-type :multi-threaded :batch-size 100 :batch-timeout 1] + [^String queue-name buffer-size timeout ^String storm-id ^Integer worker-port :producer-type :multi-threaded :batch-size 100 :batch-timeout 1] (DisruptorQueue. queue-name (PRODUCER-TYPE producer-type) buffer-size - timeout batch-size batch-timeout)) + timeout batch-size batch-timeout storm-id worker-port)) (defn clojure-handler [afn] http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/Config.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/Config.java b/storm-core/src/jvm/org/apache/storm/Config.java index 43df951..c547530 100644 --- a/storm-core/src/jvm/org/apache/storm/Config.java +++ b/storm-core/src/jvm/org/apache/storm/Config.java @@ -139,6 +139,9 @@ public class Config extends HashMap<String, Object> { @isString public static final String STORM_META_SERIALIZATION_DELEGATE = "storm.meta.serialization.delegate"; + @isType(type=List.class) + public static final String STORM_METRICS_REPORTERS = "storm.metrics.reporters"; + /** * A list of daemon metrics reporter plugin class names. * These plugins must implement {@link org.apache.storm.daemon.metrics.reporters.PreparableReporter} interface. http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java b/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java new file mode 100644 index 0000000..994a965 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metrics2/DisruptorMetrics.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import org.apache.storm.utils.DisruptorQueue; + +public class DisruptorMetrics { + private SimpleGauge<Long> capacity; + private SimpleGauge<Long> population; + private SimpleGauge<Long> writePosition; + private SimpleGauge<Long> readPosition; + private SimpleGauge<Double> arrivalRate; // TODO: Change to meter + private SimpleGauge<Double> sojournTime; + private SimpleGauge<Long> overflow; + private SimpleGauge<Float> pctFull; + + + DisruptorMetrics(SimpleGauge<Long> capacity, + SimpleGauge<Long> population, + SimpleGauge<Long> writePosition, + SimpleGauge<Long> readPosition, + SimpleGauge<Double> arrivalRate, + SimpleGauge<Double> sojournTime, + SimpleGauge<Long> overflow, + SimpleGauge<Float> pctFull) { + this.capacity = capacity; + this.population = population; + this.writePosition = writePosition; + this.readPosition = readPosition; + this.arrivalRate = arrivalRate; + this.sojournTime = sojournTime; + this.overflow = overflow; + this.pctFull = pctFull; + } + + public void setCapacity(Long capacity) { + this.capacity.set(capacity); + } + + public void setPopulation(Long population) { + this.population.set(population); + } + + public void setWritePosition(Long writePosition) { + this.writePosition.set(writePosition); + } + + public void setReadPosition(Long readPosition) { + this.readPosition.set(readPosition); + } + + public void setArrivalRate(Double arrivalRate) { + this.arrivalRate.set(arrivalRate); + } + + public void setSojournTime(Double soujournTime) { + this.sojournTime.set(soujournTime); + } + + public void setOverflow(Long overflow) { + this.overflow.set(overflow); + } + + public void setPercentFull(Float pctFull){ + this.pctFull.set(pctFull); + } + + public void set(DisruptorQueue.QueueMetrics metrics){ + this.capacity.set(metrics.capacity()); + this.population.set(metrics.population()); + this.writePosition.set(metrics.writePos()); + this.readPosition.set(metrics.readPos()); + this.arrivalRate.set(metrics.arrivalRate()); + this.sojournTime.set(metrics.sojournTime()); + this.overflow.set(metrics.overflow()); + this.pctFull.set(metrics.pctFull()); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java b/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java new file mode 100644 index 0000000..5240f26 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metrics2/SimpleGauge.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + + +import com.codahale.metrics.Gauge; + +public class SimpleGauge<T> implements Gauge<T> { + private T value; + + public SimpleGauge(T value){ + this.value = value; + } + + @Override + public T getValue() { + return this.value; + } + + public void set(T value){ + this.value = value; + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java new file mode 100644 index 0000000..ced1233 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.Config; +import org.apache.storm.cluster.DaemonType; +import org.apache.storm.metrics2.reporters.StormReporter; +import org.apache.storm.task.WorkerTopologyContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class StormMetricRegistry { + + private static final Logger LOG = LoggerFactory.getLogger(StormMetricRegistry.class); + + private static final MetricRegistry REGISTRY = new MetricRegistry(); + + private static final List<StormReporter> REPORTERS = new ArrayList<>(); + + private static String hostName = null; + + public static <T> SimpleGauge<T> gauge(T initialValue, String name, String topologyId, Integer port){ + SimpleGauge<T> gauge = new SimpleGauge<>(initialValue); + String metricName = String.format("storm.worker.%s.%s-%s", topologyId, port, name); + if(REGISTRY.getGauges().containsKey(metricName)){ + return (SimpleGauge)REGISTRY.getGauges().get(metricName); + } else { + return REGISTRY.register(metricName, gauge); + } + } + + public static DisruptorMetrics disruptorMetrics(String name, String topologyId, Integer port){ + return new DisruptorMetrics( + StormMetricRegistry.gauge(0L, name + "-capacity", topologyId, port), + StormMetricRegistry.gauge(0L, name + "-population", topologyId, port), + StormMetricRegistry.gauge(0L, name + "-write-position", topologyId, port), + StormMetricRegistry.gauge(0L, name + "-read-position", topologyId, port), + StormMetricRegistry.gauge(0.0, name + "-arrival-rate", topologyId, port), + StormMetricRegistry.gauge(0.0, name + "-sojourn-time-ms", topologyId, port), + StormMetricRegistry.gauge(0L, name + "-overflow", topologyId, port), + StormMetricRegistry.gauge(0.0F, name + "-percent-full", topologyId, port) + ); + } + + public static Meter meter(String name, WorkerTopologyContext context, String componentId){ + // storm.worker.{topology}.{host}.{port} + String metricName = String.format("storm.worker.%s.%s.%s.%s-%s", context.getStormId(), hostName, + componentId, context.getThisWorkerPort(), name); + return REGISTRY.meter(metricName); + } + + public static void start(Map<String, Object> stormConfig, DaemonType type){ + String localHost = (String)stormConfig.get(Config.STORM_LOCAL_HOSTNAME); + if(localHost != null){ + hostName = localHost; + } else { + try { + hostName = InetAddress.getLocalHost().getCanonicalHostName(); + } catch (UnknownHostException e) { + LOG.warn("Unable to determine hostname while starting the metrics system. Hostname ill be reported" + + " as 'localhost'."); + } + } + + LOG.info("Starting metrics reporters..."); + List<Map<String, Object>> reporterList = (List<Map<String, Object>>)stormConfig.get(Config.STORM_METRICS_REPORTERS); + for(Map<String, Object> reporterConfig : reporterList){ + // only start those requested + List<String> daemons = (List<String>)reporterConfig.get("daemons"); + for(String daemon : daemons){ + if(DaemonType.valueOf(daemon.toUpperCase()) == type){ + startReporter(stormConfig, reporterConfig); + } + } + } + } + + public static MetricRegistry registtry(){ + return REGISTRY; + } + + private static void startReporter(Map<String, Object> stormConfig, Map<String, Object> reporterConfig){ + String clazz = (String)reporterConfig.get("class"); + StormReporter reporter = null; + LOG.info("Attempting to instantiate reporter class: {}", clazz); + try{ + reporter = instantiate(clazz); + } catch(Exception e){ + LOG.warn("Unable to instantiate metrics reporter class: {}. Will skip this reporter.", clazz, e); + } + if(reporter != null){ + reporter.prepare(REGISTRY, stormConfig, reporterConfig); + reporter.start(); + REPORTERS.add(reporter); + } + + } + + private static StormReporter instantiate(String klass) throws ClassNotFoundException, IllegalAccessException, InstantiationException { + Class<?> c = Class.forName(klass); + return (StormReporter) c.newInstance(); + } + + public static void stop(){ + for(StormReporter sr : REPORTERS){ + sr.stop(); + } + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java new file mode 100644 index 0000000..5322bf8 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/ConsoleStormReporter.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ConsoleReporter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.daemon.metrics.MetricsUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class ConsoleStormReporter extends SheduledStormReporter<ConsoleReporter> { + private final static Logger LOG = LoggerFactory.getLogger(ConsoleStormReporter.class); + + @Override + public void prepare(MetricRegistry registry, Map stormConf, Map reporterConf) { + LOG.debug("Preparing ConsoleReporter"); + ConsoleReporter.Builder builder = ConsoleReporter.forRegistry(registry); + + builder.outputTo(System.out); + Locale locale = MetricsUtils.getMetricsReporterLocale(stormConf); + if (locale != null) { + builder.formattedFor(locale); + } + + TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(stormConf); + if (rateUnit != null) { + builder.convertRatesTo(rateUnit); + } + + TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(stormConf); + if (durationUnit != null) { + builder.convertDurationsTo(durationUnit); + } + + //defaults to 10 + reportingPeriod = getReportPeriod(reporterConf); + + //defaults to seconds + reportingPeriodUnit = getReportPeriodUnit(reporterConf); + + reporter = builder.build(); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java new file mode 100644 index 0000000..4225b7c --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/CsvStormReporter.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.CsvReporter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.daemon.metrics.MetricsUtils; +import org.apache.storm.utils.ConfigUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class CsvStormReporter extends SheduledStormReporter<CsvReporter> { + private final static Logger LOG = LoggerFactory.getLogger(CsvStormReporter.class); + + public static final String CSV_LOG_DIR = "csv.log.dir"; + + @Override + public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) { + LOG.debug("Preparing..."); + CsvReporter.Builder builder = CsvReporter.forRegistry(metricsRegistry); + + Locale locale = MetricsUtils.getMetricsReporterLocale(reporterConf); + if (locale != null) { + builder.formatFor(locale); + } + + TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf); + if (rateUnit != null) { + builder.convertRatesTo(rateUnit); + } + + TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf); + if (durationUnit != null) { + builder.convertDurationsTo(durationUnit); + } + + //TODO: expose some simple MetricFilters + + //defaults to 10 + reportingPeriod = getReportPeriod(reporterConf); + + //defaults to seconds + reportingPeriodUnit = getReportPeriodUnit(reporterConf); + + File csvMetricsDir = getCsvLogDir(stormConf, reporterConf); + reporter = builder.build(csvMetricsDir); + } + + + private static File getCsvLogDir(Map stormConf, Map reporterConf) { + String csvMetricsLogDirectory = Utils.getString(reporterConf.get(CSV_LOG_DIR), null); + if (csvMetricsLogDirectory == null) { + csvMetricsLogDirectory = ConfigUtils.absoluteStormLocalDir(stormConf); + csvMetricsLogDirectory = csvMetricsLogDirectory + ConfigUtils.FILE_SEPARATOR + "csvmetrics"; + } + File csvMetricsDir = new File(csvMetricsLogDirectory); + validateCreateOutputDir(csvMetricsDir); + return csvMetricsDir; + } + + private static void validateCreateOutputDir(File dir) { + if (!dir.exists()) { + dir.mkdirs(); + } + if (!dir.canWrite()) { + throw new IllegalStateException(dir.getName() + " does not have write permissions."); + } + if (!dir.isDirectory()) { + throw new IllegalStateException(dir.getName() + " is not a directory."); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java new file mode 100644 index 0000000..d8d0269 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GangliaStormReporter.java @@ -0,0 +1,133 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ganglia.GangliaReporter; +import com.codahale.metrics.MetricRegistry; +import info.ganglia.gmetric4j.gmetric.GMetric; +import org.apache.storm.daemon.metrics.MetricsUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class GangliaStormReporter extends SheduledStormReporter<GangliaReporter> { + private final static Logger LOG = LoggerFactory.getLogger(GangliaStormReporter.class); + + public static final String GANGLIA_HOST = "ganglia.host"; + public static final String GANGLIA_PORT = "ganglia.port"; + public static final String GANGLIA_PREFIXED_WITH = "ganglia.prefixed.with"; + public static final String GANGLIA_DMAX = "ganglia.dmax"; + public static final String GANGLIA_TMAX = "ganglia.tmax"; + public static final String GANGLIA_UDP_ADDRESSING_MODE = "ganglia.udp.addressing.mode"; + public static final String GANGLIA_RATE_UNIT = "ganglia.rate.unit"; + public static final String GANGLIA_DURATION_UNIT = "ganglia.duration.unit"; + public static final String GANGLIA_TTL = "ganglia.ttl"; + public static final String GANGLIA_UDP_GROUP = "ganglia.udp.group"; + + @Override + public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) { + LOG.debug("Preparing..."); + GangliaReporter.Builder builder = GangliaReporter.forRegistry(metricsRegistry); + + TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf); + if (durationUnit != null) { + builder.convertDurationsTo(durationUnit); + } + + TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf); + if (rateUnit != null) { + builder.convertRatesTo(rateUnit); + } + + //TODO: expose some simple MetricFilters + String prefix = getMetricsPrefixedWith(reporterConf); + if (prefix != null) { + builder.prefixedWith(prefix); + } + + Integer dmax = getGangliaDMax(reporterConf); + if (prefix != null) { + builder.withDMax(dmax); + } + + Integer tmax = getGangliaTMax(reporterConf); + if (prefix != null) { + builder.withTMax(tmax); + } + + //defaults to 10 + reportingPeriod = getReportPeriod(reporterConf); + + //defaults to seconds + reportingPeriodUnit = getReportPeriodUnit(reporterConf); + + // Not exposed: + // * withClock(Clock) + + String group = getMetricsTargetUDPGroup(reporterConf); + Integer port = getMetricsTargetPort(reporterConf); + String udpAddressingMode = getMetricsTargetUDPAddressingMode(reporterConf); + Integer ttl = getMetricsTargetTtl(reporterConf); + + GMetric.UDPAddressingMode mode = udpAddressingMode.equalsIgnoreCase("multicast") ? + GMetric.UDPAddressingMode.MULTICAST : GMetric.UDPAddressingMode.UNICAST; + + try { + GMetric sender = new GMetric(group, port, mode, ttl); + reporter = builder.build(sender); + }catch (IOException ioe){ + //TODO + LOG.error("Exception in GangliaReporter config", ioe); + } + } + + + public static String getMetricsTargetUDPGroup(Map reporterConf) { + return Utils.getString(reporterConf.get(GANGLIA_UDP_GROUP), null); + } + + public static String getMetricsTargetUDPAddressingMode(Map reporterConf) { + return Utils.getString(reporterConf.get(GANGLIA_UDP_ADDRESSING_MODE), null); + } + + public static Integer getMetricsTargetTtl(Map reporterConf) { + return Utils.getInt(reporterConf.get(GANGLIA_TTL), null); + } + + public static Integer getGangliaDMax(Map reporterConf) { + return Utils.getInt(reporterConf.get(GANGLIA_DMAX), null); + } + + public static Integer getGangliaTMax(Map reporterConf) { + return Utils.getInt(reporterConf.get(GANGLIA_TMAX), null); + } + + + private static Integer getMetricsTargetPort(Map reporterConf) { + return Utils.getInt(reporterConf.get(GANGLIA_PORT), null); + } + + private static String getMetricsPrefixedWith(Map reporterConf) { + return Utils.getString(reporterConf.get(GANGLIA_PREFIXED_WITH), null); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java new file mode 100644 index 0000000..7a2b31b --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/GraphiteStormReporter.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.graphite.GraphiteReporter; +import com.codahale.metrics.graphite.GraphiteSender; +import com.codahale.metrics.graphite.GraphiteUDP; +import com.codahale.metrics.graphite.Graphite; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.daemon.metrics.MetricsUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class GraphiteStormReporter extends SheduledStormReporter<GraphiteReporter> { + private final static Logger LOG = LoggerFactory.getLogger(GraphiteStormReporter.class); + + public static final String GRAPHITE_PREFIXED_WITH = "graphite.prefixed.with"; + public static final String GRAPHITE_HOST = "graphite.host"; + public static final String GRAPHITE_PORT = "graphite.port"; + public static final String GRAPHITE_TRANSPORT = "graphite.transport"; + + @Override + public void prepare(MetricRegistry metricsRegistry, Map stormConf, Map reporterConf) { + LOG.debug("Preparing..."); + GraphiteReporter.Builder builder = GraphiteReporter.forRegistry(metricsRegistry); + + TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf); + if (durationUnit != null) { + builder.convertDurationsTo(durationUnit); + } + + TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf); + if (rateUnit != null) { + builder.convertRatesTo(rateUnit); + } + + //TODO: expose some simple MetricFilters + String prefix = getMetricsPrefixedWith(reporterConf); + if (prefix != null) { + builder.prefixedWith(prefix); + } + + //defaults to 10 + reportingPeriod = getReportPeriod(reporterConf); + + //defaults to seconds + reportingPeriodUnit = getReportPeriodUnit(reporterConf); + + // Not exposed: + // * withClock(Clock) + + String host = getMetricsTargetHost(reporterConf); + Integer port = getMetricsTargetPort(reporterConf); + String transport = getMetricsTargetTransport(reporterConf); + GraphiteSender sender = null; + //TODO: error checking + if (transport.equalsIgnoreCase("udp")) { + sender = new GraphiteUDP(host, port); + } else { + //TODO: pickled support + sender = new Graphite(host, port); + } + reporter = builder.build(sender); + } + + private static String getMetricsPrefixedWith(Map reporterConf) { + return Utils.getString(reporterConf.get(GRAPHITE_PREFIXED_WITH), null); + } + + private static String getMetricsTargetHost(Map reporterConf) { + return Utils.getString(reporterConf.get(GRAPHITE_HOST), null); + } + + private static Integer getMetricsTargetPort(Map reporterConf) { + return Utils.getInt(reporterConf.get(GRAPHITE_PORT), null); + } + + private static String getMetricsTargetTransport(Map reporterConf) { + return Utils.getString(reporterConf.get(GRAPHITE_TRANSPORT), "tcp"); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java new file mode 100644 index 0000000..7ac6cde --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/JmxStormReporter.java @@ -0,0 +1,88 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.JmxReporter; +import com.codahale.metrics.MetricRegistry; +import org.apache.storm.daemon.metrics.MetricsUtils; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public class JmxStormReporter implements StormReporter<JmxReporter> { + private final static Logger LOG = LoggerFactory.getLogger(JmxStormReporter.class); + public static final String JMX_DOMAIN = "jmx.domain"; + JmxReporter reporter = null; + + @Override + public void prepare(MetricRegistry metricsRegistry, Map<String, Object> stormConf, Map<String, Object> reporterConf) { + LOG.info("Preparing..."); + JmxReporter.Builder builder = JmxReporter.forRegistry(metricsRegistry); + + TimeUnit durationUnit = MetricsUtils.getMetricsDurationUnit(reporterConf); + if (durationUnit != null) { + builder.convertDurationsTo(durationUnit); + } + + TimeUnit rateUnit = MetricsUtils.getMetricsRateUnit(reporterConf); + if (rateUnit != null) { + builder.convertRatesTo(rateUnit); + } + + String domain = getMetricsJMXDomain(reporterConf); + if (domain != null) { + builder.inDomain(domain); + } + + // TODO: expose some simple MetricFilters + // other builder functions not exposed: + // * createsObjectNamesWith(ObjectNameFactory onFactory) + // * registerWith (MBeanServer) + // * specificDurationUnits (Map<String,TimeUnit> specificDurationUnits) + // * specificRateUnits(Map<String,TimeUnit> specificRateUnits) + + reporter = builder.build(); + } + + public static String getMetricsJMXDomain(Map reporterConf) { + return Utils.getString(reporterConf, JMX_DOMAIN); + } + + @Override + public void start() { + if (reporter != null) { + LOG.debug("Starting..."); + reporter.start(); + } else { + throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); + } + } + + @Override + public void stop() { + if (reporter != null) { + LOG.debug("Stopping..."); + reporter.stop(); + } else { + throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java new file mode 100644 index 0000000..1b1e7a0 --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/SheduledStormReporter.java @@ -0,0 +1,71 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.ScheduledReporter; +import org.apache.storm.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.TimeUnit; + +public abstract class SheduledStormReporter<T extends ScheduledReporter> implements StormReporter{ + private static final Logger LOG = LoggerFactory.getLogger(SheduledStormReporter.class); + protected ScheduledReporter reporter; + long reportingPeriod; + TimeUnit reportingPeriodUnit; + + @Override + public void start() { + if (reporter != null) { + LOG.debug("Starting..."); + reporter.start(reportingPeriod, reportingPeriodUnit); + } else { + throw new IllegalStateException("Attempt to start without preparing " + getClass().getSimpleName()); + } + } + + @Override + public void stop() { + if (reporter != null) { + LOG.debug("Stopping..."); + reporter.stop(); + } else { + throw new IllegalStateException("Attempt to stop without preparing " + getClass().getSimpleName()); + } + } + + + static TimeUnit getReportPeriodUnit(Map<String, Object> reporterConf) { + TimeUnit unit = getTimeUnitForConfig(reporterConf, REPORT_PERIOD_UNITS); + return unit == null ? TimeUnit.SECONDS : unit; + } + + private static TimeUnit getTimeUnitForConfig(Map reporterConf, String configName) { + String rateUnitString = Utils.getString(reporterConf.get(configName), null); + if (rateUnitString != null) { + return TimeUnit.valueOf(rateUnitString); + } + return null; + } + + static long getReportPeriod(Map reporterConf) { + return Utils.getInt(reporterConf.get(REPORT_PERIOD), 10).longValue(); + } +} http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java new file mode 100644 index 0000000..c36e44e --- /dev/null +++ b/storm-core/src/jvm/org/apache/storm/metrics2/reporters/StormReporter.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.storm.metrics2.reporters; + +import com.codahale.metrics.MetricRegistry; +import com.codahale.metrics.Reporter; + +import java.util.Map; + +public interface StormReporter<T extends Reporter> { + String REPORT_PERIOD = "report.period"; + String REPORT_PERIOD_UNITS = "report.period.units"; + + void prepare(MetricRegistry metricsRegistry, Map<String, Object> conf, Map<String, Object> reporterConf); + void start(); + void stop(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java b/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java index 91cbee9..080eb9a 100644 --- a/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java +++ b/storm-core/src/jvm/org/apache/storm/task/TopologyContext.java @@ -17,6 +17,7 @@ */ package org.apache.storm.task; +import com.codahale.metrics.*; import org.apache.storm.generated.GlobalStreamId; import org.apache.storm.generated.Grouping; import org.apache.storm.generated.StormTopology; @@ -26,6 +27,7 @@ import org.apache.storm.metric.api.IReducer; import org.apache.storm.metric.api.ICombiner; import org.apache.storm.metric.api.ReducedMetric; import org.apache.storm.metric.api.CombinedMetric; +import org.apache.storm.metrics2.StormMetricRegistry; import org.apache.storm.state.ISubscribedState; import org.apache.storm.tuple.Fields; import org.apache.storm.utils.Utils; @@ -386,4 +388,28 @@ public class TopologyContext extends WorkerTopologyContext implements IMetricsCo public CombinedMetric registerMetric(String name, ICombiner combiner, int timeBucketSizeInSecs) { return registerMetric(name, new CombinedMetric(combiner), timeBucketSizeInSecs); } + + public Timer registerTimer(String name){ + return StormMetricRegistry.registtry().timer(metricName(name)); + } + + public Histogram registerHistogram(String name){ + return StormMetricRegistry.registtry().histogram(metricName(name)); + } + + public Meter registerMeter(String name){ + return StormMetricRegistry.registtry().meter(metricName(name)); + } + + public Counter registerCounter(String name){ + return StormMetricRegistry.registtry().counter(metricName(name)); + } + + public Gauge registerGauge(String name, Gauge gauge){ + return StormMetricRegistry.registtry().register(metricName(name), gauge); + } + + private String metricName(String name){ + return String.format("storm.topology.%s.%s.%s-%s", getStormId(), getThisComponentId(), getThisWorkerPort(), name); + } } http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java index fe90240..35bc83f 100644 --- a/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java +++ b/storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java @@ -34,6 +34,9 @@ import com.lmax.disruptor.dsl.ProducerType; import org.apache.storm.Config; import org.apache.storm.metric.api.IStatefulObject; import org.apache.storm.metric.internal.RateTracker; +import org.apache.storm.metrics2.DisruptorMetrics; +import org.apache.storm.metrics2.StormMetricRegistry; +import org.apache.storm.task.WorkerTopologyContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +67,7 @@ public class DisruptorQueue implements IStatefulObject { private static final String PREFIX = "disruptor-"; private static final FlusherPool FLUSHER = new FlusherPool(); + private static final Timer METRICS_TIMER = new Timer("disruptor-metrics-timer", true); private static int getNumFlusherPoolThreads() { int numThreads = 100; try { @@ -345,27 +349,31 @@ public class DisruptorQueue implements IStatefulObject { return (1.0F * population() / capacity()); } - public Object getState() { - Map state = new HashMap<String, Object>(); + public double arrivalRate(){ + return _rateTracker.reportRate(); + } + public double sojournTime(){ // get readPos then writePos so it's never an under-estimate long rp = readPos(); long wp = writePos(); - - final double arrivalRateInSecs = _rateTracker.reportRate(); + final double arrivalRateInSecs = arrivalRate(); //Assume the queue is stable, in which the arrival rate is equal to the consumption rate. // If this assumption does not hold, the calculation of sojourn time should also consider // departure rate according to Queuing Theory. - final double sojournTime = (wp - rp) / Math.max(arrivalRateInSecs, 0.00001) * 1000.0; + return (wp - rp) / Math.max(arrivalRateInSecs, 0.00001) * 1000.0; + } + public Object getState() { + Map state = new HashMap<String, Object>(); state.put("capacity", capacity()); - state.put("population", wp - rp); - state.put("write_pos", wp); - state.put("read_pos", rp); - state.put("arrival_rate_secs", arrivalRateInSecs); - state.put("sojourn_time_ms", sojournTime); //element sojourn time in milliseconds - state.put("overflow", _overflowCount.get()); + state.put("population", population()); + state.put("write_pos", writePos()); + state.put("read_pos", readPos()); + state.put("arrival_rate_secs", arrivalRate()); + state.put("sojourn_time_ms", sojournTime()); //element sojourn time in milliseconds + state.put("overflow", overflow()); return state; } @@ -385,7 +393,8 @@ public class DisruptorQueue implements IStatefulObject { private final int _inputBatchSize; private final ConcurrentHashMap<Long, ThreadLocalInserter> _batchers = new ConcurrentHashMap<Long, ThreadLocalInserter>(); private final Flusher _flusher; - private final QueueMetrics _metrics; + private final QueueMetrics _metrics; // old metrics API + private final DisruptorMetrics _disruptorMetrics; private String _queueName = ""; private DisruptorBackpressureCallback _cb = null; @@ -395,7 +404,7 @@ public class DisruptorQueue implements IStatefulObject { private final AtomicLong _overflowCount = new AtomicLong(0); private volatile boolean _throttleOn = false; - public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval) { + public DisruptorQueue(String queueName, ProducerType type, int size, long readTimeout, int inputBatchSize, long flushInterval, String topologyId, int port) { this._queueName = PREFIX + queueName; WaitStrategy wait; if (readTimeout <= 0) { @@ -409,12 +418,20 @@ public class DisruptorQueue implements IStatefulObject { _barrier = _buffer.newBarrier(); _buffer.addGatingSequences(_consumer); _metrics = new QueueMetrics(); + _disruptorMetrics = StormMetricRegistry.disruptorMetrics(_queueName, topologyId, port); //The batch size can be no larger than half the full queue size. //This is mostly to avoid contention issues. _inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2)); _flusher = new Flusher(Math.max(flushInterval, 1), _queueName); _flusher.start(); + + METRICS_TIMER.schedule(new TimerTask(){ + @Override + public void run() { + _disruptorMetrics.set(_metrics); + } + }, 15000, 15000); // TODO: Configurable interval } public String getName() { http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java index 7072e55..110fe88 100644 --- a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java +++ b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueBackpressureTest.java @@ -105,6 +105,6 @@ public class DisruptorQueueBackpressureTest extends TestCase { } private static DisruptorQueue createQueue(String name, int queueSize) { - return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L); + return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", 1000); } } http://git-wip-us.apache.org/repos/asf/storm/blob/fa1e59f4/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java ---------------------------------------------------------------------- diff --git a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java index e7ac54e..c834cbb 100644 --- a/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java +++ b/storm-core/test/jvm/org/apache/storm/utils/DisruptorQueueTest.java @@ -178,10 +178,10 @@ public class DisruptorQueueTest extends TestCase { } private static DisruptorQueue createQueue(String name, int queueSize) { - return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L); + return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, 1, 1L, "test", 1000); } private static DisruptorQueue createQueue(String name, int batchSize, int queueSize) { - return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, batchSize, 1L); + return new DisruptorQueue(name, ProducerType.MULTI, queueSize, 0L, batchSize, 1L, "test", 1000); } }
