STORM-2153: fix a missing spot, extract string to constant, modify accessors
Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7947a075 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7947a075 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7947a075 Branch: refs/heads/1.x-branch Commit: 7947a0755c6717de16a7755ec7f1f2dc83388d11 Parents: b257ba4 Author: Jungtaek Lim <[email protected]> Authored: Fri Jan 12 12:56:22 2018 +0900 Committer: Jungtaek Lim <[email protected]> Committed: Fri Jan 12 12:56:22 2018 +0900 ---------------------------------------------------------------------- .../org/apache/storm/metrics2/TaskMetrics.java | 23 ++++++++++++-------- 1 file changed, 14 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/7947a075/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java ---------------------------------------------------------------------- diff --git a/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java b/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java index 05c62da..239c1a0 100644 --- a/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java +++ b/storm-core/src/jvm/org/apache/storm/metrics2/TaskMetrics.java @@ -26,10 +26,15 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; public class TaskMetrics { - ConcurrentMap<String, Counter> ackedByStream = new ConcurrentHashMap<>(); - ConcurrentMap<String, Counter> failedByStream = new ConcurrentHashMap<>(); - ConcurrentMap<String, Counter> emittedByStream = new ConcurrentHashMap<>(); - ConcurrentMap<String, Counter> transferredByStream = new ConcurrentHashMap<>(); + private static final String METRIC_NAME_ACKED = "acked"; + private static final String METRIC_NAME_FAILED = "failed"; + private static final String METRIC_NAME_EMITTED = "emitted"; + private static final String METRIC_NAME_TRANSFERRED = "transferred"; + + private ConcurrentMap<String, Counter> ackedByStream = new ConcurrentHashMap<>(); + private ConcurrentMap<String, Counter> failedByStream = new ConcurrentHashMap<>(); + private ConcurrentMap<String, Counter> emittedByStream = new ConcurrentHashMap<>(); + private ConcurrentMap<String, Counter> transferredByStream = new ConcurrentHashMap<>(); private String topologyId; private String componentId; @@ -46,16 +51,16 @@ public class TaskMetrics { public Counter getAcked(String streamId) { Counter c = this.ackedByStream.get(streamId); if (c == null) { - c = StormMetricRegistry.counter("acked", this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); + c = StormMetricRegistry.counter(METRIC_NAME_ACKED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); this.ackedByStream.put(streamId, c); } return c; } public Counter getFailed(String streamId) { - Counter c = this.ackedByStream.get(streamId); + Counter c = this.failedByStream.get(streamId); if (c == null) { - c = StormMetricRegistry.counter("failed", this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); + c = StormMetricRegistry.counter(METRIC_NAME_FAILED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); this.failedByStream.put(streamId, c); } return c; @@ -64,7 +69,7 @@ public class TaskMetrics { public Counter getEmitted(String streamId) { Counter c = this.emittedByStream.get(streamId); if (c == null) { - c = StormMetricRegistry.counter("emitted", this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); + c = StormMetricRegistry.counter(METRIC_NAME_EMITTED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); this.emittedByStream.put(streamId, c); } return c; @@ -73,7 +78,7 @@ public class TaskMetrics { public Counter getTransferred(String streamId) { Counter c = this.transferredByStream.get(streamId); if (c == null) { - c = StormMetricRegistry.counter("transferred", this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); + c = StormMetricRegistry.counter(METRIC_NAME_TRANSFERRED, this.topologyId, this.componentId, this.taskId, this.workerPort, streamId); this.transferredByStream.put(streamId, c); } return c;
