Repository: storm Updated Branches: refs/heads/1.1.x-branch 43a842954 -> cb331d454
STORM-2775 Update kafkaPartition metrics to match the same format as kafkaOffset STORM-2775 Send kafkaPartition metrics in both the "old" and "new" format for backwards compatibility STORM-2775 Update kafkaPartition metrics to match the same format as kafkaOffset Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c1504b18 Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c1504b18 Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c1504b18 Branch: refs/heads/1.1.x-branch Commit: c1504b18dd5337597ddf512da36050f35ce85930 Parents: 11bdbb0 Author: Kevin Conaway <kevin.cona...@walmart.com> Authored: Thu Oct 12 14:48:30 2017 -0400 Committer: Kevin Conaway <kevin.cona...@walmart.com> Committed: Sat Oct 14 10:16:27 2017 -0400 ---------------------------------------------------------------------- .../apache/storm/kafka/PartitionManager.java | 21 ++++++++++++++------ 1 file changed, 15 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/c1504b18/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java index 5420887..c025e3b 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java @@ -166,13 +166,22 @@ public class PartitionManager { } public Map getMetricsDataMap() { + String[] metricPrefixes = new String[] { + _partition.getId(), // Correct metric prefix, see STORM-2775 + _partition.toString() // Old prefix, kept for backwards compatibility + }; + Map<String, Object> ret = new HashMap<>(); - ret.put(_partition + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset()); - ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset()); - ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset()); - ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset()); - ret.put(_partition + "/lostMessageCount", _lostMessageCount.getValueAndReset()); - ret.put(_partition + "/messageIneligibleForRetryCount", _messageIneligibleForRetryCount.getValueAndReset()); + + for (String metricPrefix : metricPrefixes) { + ret.put(metricPrefix + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset()); + ret.put(metricPrefix + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset()); + ret.put(metricPrefix + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset()); + ret.put(metricPrefix + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset()); + ret.put(metricPrefix + "/lostMessageCount", _lostMessageCount.getValueAndReset()); + ret.put(metricPrefix + "/messageIneligibleForRetryCount", _messageIneligibleForRetryCount.getValueAndReset()); + } + return ret; }