WIP: replace Meter to Counter
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e9a9f507 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e9a9f507 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e9a9f507 Branch: refs/heads/1.x-branch Commit: e9a9f507eaeb1022615066a98b7822e829f58e0a Parents: 99bcf68 Author: Jungtaek Lim <[email protected]> Authored: Mon Nov 27 17:50:04 2017 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Mon Nov 27 17:50:04 2017 +0900 ---------------------------------------------------------------------- .../src/clj/org/apache/storm/daemon/executor.clj | 14 +++++++------- storm-core/src/clj/org/apache/storm/daemon/task.clj | 8 ++++---- .../apache/storm/metrics2/StormMetricRegistry.java | 6 ++++++ 3 files changed, 17 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/e9a9f507/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index c6f206e..94bd7af 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -36,7 +36,7 @@ (:import [org.apache.storm Config Constants]) (:import [org.apache.storm.cluster ClusterStateContext DaemonType]) (:import [org.apache.storm.metrics2 StormMetricRegistry]) - (:import [com.codahale.metrics Meter]) + (:import [com.codahale.metrics Meter Counter]) (:import [org.apache.storm.grouping LoadAwareCustomStreamGrouping LoadAwareShuffleGrouping LoadMapping ShuffleGrouping]) (:import [java.util.concurrent ConcurrentLinkedQueue]) (:require [org.apache.storm [thrift :as thrift] @@ -280,8 +280,8 @@ (log-message "Got interrupted excpetion shutting thread down...") ((:suicide-fn <>)))) :sampler (mk-stats-sampler storm-conf) - :failed-meter (StormMetricRegistry/meter "failed" worker-context component-id) - :acked-meter (StormMetricRegistry/meter "acked" worker-context component-id) + :failed-meter (StormMetricRegistry/counter "failed" worker-context component-id) + :acked-meter (StormMetricRegistry/counter "acked" worker-context component-id) :spout-throttling-metrics (if (= executor-type :spout) (builtin-metrics/make-spout-throttling-data) nil) @@ -442,7 +442,7 @@ ;;TODO: need to throttle these when there's lots of failures (when debug? (log-message "SPOUT Failing " id ": " tuple-info " REASON: " reason " MSG-ID: " msg-id)) - (.mark failed-meter) + (.inc ^Counter failed-meter) (.fail spout msg-id) (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta)) (when time-delta @@ -453,7 +453,7 @@ task-id (:task-id task-data) acked-meter (:acked-meter executor-data)] (when debug? (log-message "SPOUT Acking message " id " " msg-id)) - (.mark acked-meter) + (.inc ^Counter acked-meter) (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta @@ -823,7 +823,7 @@ (let [delta (tuple-time-delta! tuple)] (when debug? (log-message "BOLT ack TASK: " task-id " TIME: " delta " TUPLE: " tuple)) - (.mark ^Meter (:acked-meter (:executor-data task-data))) + (.inc ^Counter (:acked-meter (:executor-data task-data))) (task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta)) (when (<= 0 delta) (stats/bolt-acked-tuple! executor-stats @@ -839,7 +839,7 @@ debug? (= true (storm-conf TOPOLOGY-DEBUG))] (when debug? (log-message "BOLT fail TASK: " task-id " TIME: " delta " TUPLE: " tuple)) - (.mark ^Meter (:failed-meter (:executor-data task-data))) + (.inc ^Counter (:failed-meter (:executor-data task-data))) (task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta)) (when (<= 0 delta) (stats/bolt-failed-tuple! executor-stats http://git-wip-us.apache.org/repos/asf/storm/blob/e9a9f507/storm-core/src/clj/org/apache/storm/daemon/task.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj index 2e4df75..7162f7f 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/task.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj @@ -28,7 +28,7 @@ (:import [org.apache.storm.generated ShellComponent JavaObject]) (:import [org.apache.storm.spout ShellSpout]) (:import [java.util Collection List ArrayList]) - (:import [com.codahale.metrics Meter]) + (:import [com.codahale.metrics Meter Counter]) (:require [org.apache.storm [thrift :as thrift] [stats :as stats]]) @@ -131,10 +131,10 @@ user-context (:user-context task-data) executor-stats (:stats executor-data) debug? (= true (storm-conf TOPOLOGY-DEBUG)) - ^Meter emitted-meter (StormMetricRegistry/meter "emitted" worker-context component-id)] + ^Counter emitted-meter (StormMetricRegistry/counter "emitted" worker-context component-id)] (fn ([^Integer out-task-id ^String stream ^List values] - (.mark emitted-meter) + (.inc ^Counter emitted-meter) (when debug? (log-message "Emitting direct: " out-task-id "; " component-id " " stream " " values)) (let [target-component (.getComponentId worker-context out-task-id) @@ -151,7 +151,7 @@ (if out-task-id [out-task-id]) )) ([^String stream ^List values] - (.mark emitted-meter) + (.inc ^Counter emitted-meter) (when debug? (log-message "Emitting: " component-id " " stream " " values)) (let [out-tasks (ArrayList.)] http://git-wip-us.apache.org/repos/asf/storm/blob/e9a9f507/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java index 60d4191..912d888 100644 --- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java +++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java @@ -17,6 +17,7 @@ */ package org.apache.storm.metrics2; +import com.codahale.metrics.Counter; import com.codahale.metrics.Meter; import com.codahale.metrics.MetricRegistry; import org.apache.storm.Config; @@ -72,6 +73,11 @@ public class StormMetricRegistry { return REGISTRY.meter(metricName); } + public static Counter counter(String name, WorkerTopologyContext context, String componentId){ + String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort()); + return REGISTRY.counter(metricName); + } + public static void start(Map<String, Object> stormConfig, DaemonType type){ String localHost = "localhost"; try {
