Github user omkreddy commented on a diff in the pull request:
https://github.com/apache/storm/pull/2480#discussion_r159075477
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
---
@@ -31,77 +32,75 @@
public class KafkaOffsetMetric implements IMetric {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaOffsetMetric.class);
- private Map<TopicPartition, OffsetManager> offsetManagers;
- private KafkaConsumer kafkaConsumer;
+ private Supplier<Map<TopicPartition, OffsetManager>>
offsetManagerSupplier;
+ private Supplier<KafkaConsumer> consumerSupplier;
- public KafkaOffsetMetric(Map<TopicPartition, OffsetManager>
offsetManagers) {
- this.offsetManagers = offsetManagers;
+ public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier
consumerSupplier) {
+ this.offsetManagerSupplier = offsetManagerSupplier;
+ this.consumerSupplier = consumerSupplier;
}
@Override
public Object getValueAndReset() {
- try {
- HashMap<String, Long> ret = new HashMap<>();
- if (offsetManagers != null && !offsetManagers.isEmpty() &&
kafkaConsumer != null) {
- Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
- Set<TopicPartition> topicPartitions =
offsetManagers.keySet();
-
- Map<TopicPartition, Long> beginningOffsets=
kafkaConsumer.beginningOffsets(topicPartitions);
- Map<TopicPartition, Long> endOffsets=
kafkaConsumer.endOffsets(topicPartitions);
-
- for (Map.Entry<TopicPartition, OffsetManager> entry :
offsetManagers.entrySet()) {
- TopicPartition topicPartition = entry.getKey();
- OffsetManager offsetManager = entry.getValue();
-
- long latestTimeOffset = endOffsets.get(topicPartition);
- long earliestTimeOffset =
beginningOffsets.get(topicPartition);
-
- long latestEmittedOffset =
offsetManager.getLatestEmittedOffset();
- long latestCompletedOffset =
offsetManager.getCommittedOffset();
- long spoutLag = latestTimeOffset -
latestCompletedOffset;
-
- String metricPath = topicPartition.topic() +
"/partition_" + topicPartition.partition();
- ret.put(metricPath + "/" + "spoutLag", spoutLag);
- ret.put(metricPath + "/" + "earliestTimeOffset",
earliestTimeOffset);
- ret.put(metricPath + "/" + "latestTimeOffset",
latestTimeOffset);
- ret.put(metricPath + "/" + "latestEmittedOffset",
latestEmittedOffset);
- ret.put(metricPath + "/" + "latestCompletedOffset",
latestCompletedOffset);
-
- TopicMetrics topicMetrics =
topicMetricsMap.get(topicPartition.topic());
- if(topicMetrics == null) {
- topicMetrics = new TopicMetrics();
- topicMetricsMap.put(topicPartition.topic(),
topicMetrics);
- }
-
- topicMetrics.totalSpoutLag += spoutLag;
- topicMetrics.totalEarliestTimeOffset +=
earliestTimeOffset;
- topicMetrics.totalLatestTimeOffset += latestTimeOffset;
- topicMetrics.totalLatestEmittedOffset +=
latestEmittedOffset;
- topicMetrics.totalLatestCompletedOffset +=
latestCompletedOffset;
- }
-
- for(Map.Entry<String, TopicMetrics> e :
topicMetricsMap.entrySet()) {
- String topic = e.getKey();
- TopicMetrics topicMetrics = e.getValue();
- ret.put(topic + "/" + "totalSpoutLag",
topicMetrics.totalSpoutLag);
- ret.put(topic + "/" + "totalEarliestTimeOffset",
topicMetrics.totalEarliestTimeOffset);
- ret.put(topic + "/" + "totalLatestTimeOffset",
topicMetrics.totalLatestTimeOffset);
- ret.put(topic + "/" + "totalLatestEmittedOffset",
topicMetrics.totalLatestEmittedOffset);
- ret.put(topic + "/" + "totalLatestCompletedOffset",
topicMetrics.totalLatestCompletedOffset);
- }
- LOG.debug("Metrics Tick: value : {}", ret);
- return ret;
- } else {
- LOG.info("Metrics Tick: Not enough data to calculate spout
lag.");
+
+ Map<TopicPartition, OffsetManager> offsetManagers =
offsetManagerSupplier.get();
+ KafkaConsumer kafkaConsumer = consumerSupplier.get();
+
+ if (offsetManagers == null || offsetManagers.isEmpty() ||
kafkaConsumer == null) {
+ LOG.info("Metrics Tick: offsetManagers or kafkaConsumer is
null.");
+ return null;
+ }
+
+ Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
+ Set<TopicPartition> topicPartitions = offsetManagers.keySet();
+
+ Map<TopicPartition, Long> beginningOffsets=
kafkaConsumer.beginningOffsets(topicPartitions);
+ Map<TopicPartition, Long> endOffsets=
kafkaConsumer.endOffsets(topicPartitions);
+ Map<String, Long> result = new HashMap<>();
+
+ for (Map.Entry<TopicPartition, OffsetManager> entry :
offsetManagers.entrySet()) {
+ TopicPartition topicPartition = entry.getKey();
+ OffsetManager offsetManager = entry.getValue();
+
+ long latestTimeOffset = endOffsets.get(topicPartition);
+ long earliestTimeOffset = beginningOffsets.get(topicPartition);
+
+ long latestEmittedOffset =
offsetManager.getLatestEmittedOffset();
+ long latestCompletedOffset =
offsetManager.getCommittedOffset();
+ long spoutLag = latestTimeOffset - latestCompletedOffset;
+
+ String metricPath = topicPartition.topic() + "/partition_" +
topicPartition.partition();
+ result.put(metricPath + "/" + "spoutLag", spoutLag);
+ result.put(metricPath + "/" + "earliestTimeOffset",
earliestTimeOffset);
+ result.put(metricPath + "/" + "latestTimeOffset",
latestTimeOffset);
+ result.put(metricPath + "/" + "latestEmittedOffset",
latestEmittedOffset);
+ result.put(metricPath + "/" + "latestCompletedOffset",
latestCompletedOffset);
+
+ TopicMetrics topicMetrics =
topicMetricsMap.get(topicPartition.topic());
+ if(topicMetrics == null) {
+ topicMetrics = new TopicMetrics();
+ topicMetricsMap.put(topicPartition.topic(), topicMetrics);
}
- } catch (Exception t) {
- LOG.warn("Metrics Tick: Exception when computing kafkaOffset
metric.", t);
+
+ topicMetrics.totalSpoutLag += spoutLag;
+ topicMetrics.totalEarliestTimeOffset += earliestTimeOffset;
+ topicMetrics.totalLatestTimeOffset += latestTimeOffset;
+ topicMetrics.totalLatestEmittedOffset += latestEmittedOffset;
+ topicMetrics.totalLatestCompletedOffset +=
latestCompletedOffset;
+ }
+
+ for(Map.Entry<String, TopicMetrics> e :
topicMetricsMap.entrySet()) {
+ String topic = e.getKey();
+ TopicMetrics topicMetrics = e.getValue();
+ result.put(topic + "/" + "totalSpoutLag",
topicMetrics.totalSpoutLag);
+ result.put(topic + "/" + "totalEarliestTimeOffset",
topicMetrics.totalEarliestTimeOffset);
--- End diff --
@srdo I agree. totalEarliestTimeOffset alone is not much useful. I just
added it to make the metrics compatible with old spout (storm-kafka) metrics.
adding totalRecordsInAssignedPartitions is good idea. I have included
totalRecordsInAssignedPartitions to metrics. Can you explain why we need to
decrease the count by 1? I am not understood what do you mean by disregarding
log compaction?
---