Repository: storm Updated Branches: refs/heads/1.x-branch 90e138526 -> 39e12aa22
STORM-2775 Update kafkaPartition metrics to match the same format as kafkaOffset STORM-2775 Send kafkaPartition metrics in both the "old" and "new" format for backwards compatibility STORM-2775 Update kafkaPartition metrics to match the same format as kafkaOffset Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/39e12aa2 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/39e12aa2 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/39e12aa2 Branch: refs/heads/1.x-branch Commit: 39e12aa221edc3e815c52622258f8d3ceb3a2891 Parents: 90e1385 Author: Kevin Conaway <kevin.cona...@walmart.com> Authored: Thu Oct 12 14:48:30 2017 -0400 Committer: Stig Rohde Døssing <s...@apache.org> Committed: Sat Oct 14 16:22:35 2017 +0200 ---------------------------------------------------------------------- .../apache/storm/kafka/PartitionManager.java | 21 ++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/39e12aa2/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java index 5420887..c025e3b 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java @@ -166,13 +166,22 @@ public class PartitionManager { } public Map getMetricsDataMap() { + String[] metricPrefixes = new String[] { + _partition.getId(), // Correct metric prefix, see STORM-2775 + _partition.toString() // Old prefix, kept for backwards compatibility + }; + Map<String, Object> ret = new HashMap<>(); - ret.put(_partition + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset()); - ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset()); - ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset()); - ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset()); - ret.put(_partition + "/lostMessageCount", _lostMessageCount.getValueAndReset()); - ret.put(_partition + "/messageIneligibleForRetryCount", _messageIneligibleForRetryCount.getValueAndReset()); + + for (String metricPrefix : metricPrefixes) { + ret.put(metricPrefix + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset()); + ret.put(metricPrefix + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset()); + ret.put(metricPrefix + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset()); + ret.put(metricPrefix + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset()); + ret.put(metricPrefix + "/lostMessageCount", _lostMessageCount.getValueAndReset()); + ret.put(metricPrefix + "/messageIneligibleForRetryCount", _messageIneligibleForRetryCount.getValueAndReset()); + } + return ret; }