Repository: storm
Updated Branches:
  refs/heads/1.1.x-branch 43a842954 -> cb331d454


STORM-2775 Update kafkaPartition metrics to match the same format as kafkaOffset

STORM-2775 Send kafkaPartition metrics in both the "old" and "new" format for 
backwards compatibility

STORM-2775 Update kafkaPartition metrics to match the same format as kafkaOffset


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/c1504b18
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/c1504b18
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/c1504b18

Branch: refs/heads/1.1.x-branch
Commit: c1504b18dd5337597ddf512da36050f35ce85930
Parents: 11bdbb0
Author: Kevin Conaway <kevin.cona...@walmart.com>
Authored: Thu Oct 12 14:48:30 2017 -0400
Committer: Kevin Conaway <kevin.cona...@walmart.com>
Committed: Sat Oct 14 10:16:27 2017 -0400

----------------------------------------------------------------------
 .../apache/storm/kafka/PartitionManager.java    | 21 ++++++++++++++------
 1 file changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/c1504b18/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index 5420887..c025e3b 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -166,13 +166,22 @@ public class PartitionManager {
     }
 
     public Map getMetricsDataMap() {
+        String[] metricPrefixes = new String[] {
+            _partition.getId(),     // Correct metric prefix, see STORM-2775
+            _partition.toString()   // Old prefix, kept for backwards 
compatibility
+        };
+
         Map<String, Object> ret = new HashMap<>();
-        ret.put(_partition + "/fetchAPILatencyMax", 
_fetchAPILatencyMax.getValueAndReset());
-        ret.put(_partition + "/fetchAPILatencyMean", 
_fetchAPILatencyMean.getValueAndReset());
-        ret.put(_partition + "/fetchAPICallCount", 
_fetchAPICallCount.getValueAndReset());
-        ret.put(_partition + "/fetchAPIMessageCount", 
_fetchAPIMessageCount.getValueAndReset());
-        ret.put(_partition + "/lostMessageCount", 
_lostMessageCount.getValueAndReset());
-        ret.put(_partition + "/messageIneligibleForRetryCount", 
_messageIneligibleForRetryCount.getValueAndReset());
+
+        for (String metricPrefix : metricPrefixes) {
+            ret.put(metricPrefix + "/fetchAPILatencyMax", 
_fetchAPILatencyMax.getValueAndReset());
+            ret.put(metricPrefix + "/fetchAPILatencyMean", 
_fetchAPILatencyMean.getValueAndReset());
+            ret.put(metricPrefix + "/fetchAPICallCount", 
_fetchAPICallCount.getValueAndReset());
+            ret.put(metricPrefix + "/fetchAPIMessageCount", 
_fetchAPIMessageCount.getValueAndReset());
+            ret.put(metricPrefix + "/lostMessageCount", 
_lostMessageCount.getValueAndReset());
+            ret.put(metricPrefix + "/messageIneligibleForRetryCount", 
_messageIneligibleForRetryCount.getValueAndReset());
+        }
+
         return ret;
     }
 

Reply via email to