STORM-2153: add streamId and executorId to metrics names; replace '.' with '_' in metrics names
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c91da676 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c91da676 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c91da676 Branch: refs/heads/1.x-branch Commit: c91da676e7fc550ab44fd2d4d91dc95243059eb7 Parents: dd977e8 Author: P. Taylor Goetz <[email protected]> Authored: Wed Dec 13 15:25:56 2017 -0500 Committer: P. Taylor Goetz <[email protected]> Committed: Wed Dec 13 15:25:56 2017 -0500 ---------------------------------------------------------------------- .../clj/org/apache/storm/daemon/executor.clj | 6 ++-- .../src/clj/org/apache/storm/daemon/task.clj | 7 ++--- .../storm/metrics2/StormMetricRegistry.java | 32 +++++++++++++++----- 3 files changed, 30 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/c91da676/storm-core/src/clj/org/apache/storm/daemon/executor.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/executor.clj b/storm-core/src/clj/org/apache/storm/daemon/executor.clj index 0aca4bd..fa7d44c 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/executor.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/executor.clj @@ -280,8 +280,6 @@ (log-message "Got interrupted excpetion shutting thread down...") ((:suicide-fn <>)))) :sampler (mk-stats-sampler storm-conf) - :failed-counter (StormMetricRegistry/counter "failed" worker-context component-id) - :acked-counter (StormMetricRegistry/counter "acked" worker-context component-id) :spout-throttling-metrics (if (= executor-type :spout) (builtin-metrics/make-spout-throttling-data) nil) @@ -444,7 +442,7 @@ (.fail spout msg-id) (task/apply-hooks (:user-context task-data) .spoutFail (SpoutFailInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-failed-tuple! (:stats executor-data) (:failed-counter executor-data) (:stream tuple-info) time-delta)))) + (stats/spout-failed-tuple! (:stats executor-data) (StormMetricRegistry/counter "failed" worker-context (:component-id executor-data) (:executor-id executor-data) (:stream tuple-info)) (:stream tuple-info) time-delta)))) (defn- ack-spout-msg [executor-data task-data msg-id tuple-info time-delta id debug?] (let [^ISpout spout (:object task-data) @@ -453,7 +451,7 @@ (.ack spout msg-id) (task/apply-hooks (:user-context task-data) .spoutAck (SpoutAckInfo. msg-id task-id time-delta)) (when time-delta - (stats/spout-acked-tuple! (:stats executor-data) (:acked-counter executor-data) (:stream tuple-info) time-delta)))) + (stats/spout-acked-tuple! (:stats executor-data) (StormMetricRegistry/counter "acked" worker-context (:component-id executor-data) (:executor-id executor-data) (:stream tuple-info)) (:stream tuple-info) time-delta)))) (defn mk-task-receiver [executor-data tuple-action-fn] (let [task-ids (:task-ids executor-data) http://git-wip-us.apache.org/repos/asf/storm/blob/c91da676/storm-core/src/clj/org/apache/storm/daemon/task.clj ---------------------------------------------------------------------- diff --git a/storm-core/src/clj/org/apache/storm/daemon/task.clj b/storm-core/src/clj/org/apache/storm/daemon/task.clj index a2f6c54..edc144c 100644 --- a/storm-core/src/clj/org/apache/storm/daemon/task.clj +++ b/storm-core/src/clj/org/apache/storm/daemon/task.clj @@ -130,8 +130,7 @@ stream->component->grouper (:stream->component->grouper executor-data) user-context (:user-context task-data) executor-stats (:stats executor-data) - debug? (= true (storm-conf TOPOLOGY-DEBUG)) - ^Counter emitted-counter (StormMetricRegistry/counter "emitted" worker-context component-id)] + debug? (= true (storm-conf TOPOLOGY-DEBUG))] (fn ([^Integer out-task-id ^String stream ^List values] (when debug? @@ -144,7 +143,7 @@ (throw (IllegalArgumentException. "Cannot emitDirect to a task expecting a regular grouping"))) (apply-hooks user-context .emit (EmitInfo. values stream task-id [out-task-id])) (when (emit-sampler) - (stats/emitted-tuple! executor-stats emitted-counter stream) + (stats/emitted-tuple! executor-stats (StormMetricRegistry/counter "emitted" worker-context component-id (:executor-id executor-data) stream) stream) (if out-task-id (stats/transferred-tuples! executor-stats stream 1))) (if out-task-id [out-task-id]) @@ -164,7 +163,7 @@ ))) (apply-hooks user-context .emit (EmitInfo. values stream task-id out-tasks)) (when (emit-sampler) - (stats/emitted-tuple! executor-stats emitted-counter stream) + (stats/emitted-tuple! executor-stats (StormMetricRegistry/counter "emitted" worker-context component-id (:executor-id executor-data) stream) stream) (stats/transferred-tuples! executor-stats stream (count out-tasks))) out-tasks))) )) http://git-wip-us.apache.org/repos/asf/storm/blob/c91da676/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java index 200ddcf..2bab4e9 100644 --- a/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java +++ b/storm-core/src/jvm/org/apache/storm/metrics2/StormMetricRegistry.java @@ -68,20 +68,20 @@ public class StormMetricRegistry { ); } - public static Meter meter(String name, WorkerTopologyContext context, String componentId){ - String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort()); + public static Meter meter(String name, WorkerTopologyContext context, String componentId, String executorId, String streamId){ + String metricName = metricName(name, context.getStormId(), componentId, streamId,executorId, context.getThisWorkerPort()); return REGISTRY.meter(metricName); } - public static Counter counter(String name, WorkerTopologyContext context, String componentId){ - String metricName = metricName(name, context.getStormId(), componentId, context.getThisWorkerPort()); + public static Counter counter(String name, WorkerTopologyContext context, String componentId, String executorId, String streamId){ + String metricName = metricName(name, context.getStormId(), componentId, streamId,executorId, context.getThisWorkerPort()); return REGISTRY.counter(metricName); } public static void start(Map<String, Object> stormConfig, DaemonType type){ String localHost = "localhost"; try { - hostName = Utils.localHostname(); + hostName = dotToUnderScore(Utils.localHostname()); } catch (UnknownHostException e) { LOG.warn("Unable to determine hostname while starting the metrics system. Hostname will be reported" + " as 'localhost'."); @@ -130,9 +130,27 @@ public class StormMetricRegistry { } } - public static String metricName(String name, String stormId, String componentId, Integer workerPort){ - return String.format("storm.worker.%s.%s.%s.%s-%s", stormId, hostName, componentId, workerPort, name); + public static String metricName(String name, String stormId, String componentId, String streamId, String executorId, Integer workerPort){ + return String.format("storm.worker.%s.%s.%s.%s.%s.%s-%s", + stormId, + hostName, + dotToUnderScore(componentId), + dotToUnderScore(streamId), + dotToUnderScore(executorId), + workerPort, + name); } + public static String metricName(String name, String stormId, String componentId, Integer workerPort){ + return String.format("storm.worker.%s.%s.%s.%s-%s", + stormId, + hostName, + dotToUnderScore(componentId), + workerPort, + name); + } + private static String dotToUnderScore(String str){ + return str.replace('.', '_'); + } }
