Github user omkreddy commented on a diff in the pull request: https://github.com/apache/storm/pull/2480#discussion_r159075477 --- Diff: external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java --- @@ -31,77 +32,75 @@ public class KafkaOffsetMetric implements IMetric { private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class); - private Map<TopicPartition, OffsetManager> offsetManagers; - private KafkaConsumer kafkaConsumer; + private Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier; + private Supplier<KafkaConsumer> consumerSupplier; - public KafkaOffsetMetric(Map<TopicPartition, OffsetManager> offsetManagers) { - this.offsetManagers = offsetManagers; + public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier consumerSupplier) { + this.offsetManagerSupplier = offsetManagerSupplier; + this.consumerSupplier = consumerSupplier; } @Override public Object getValueAndReset() { - try { - HashMap<String, Long> ret = new HashMap<>(); - if (offsetManagers != null && !offsetManagers.isEmpty() && kafkaConsumer != null) { - Map<String,TopicMetrics> topicMetricsMap = new HashMap<>(); - Set<TopicPartition> topicPartitions = offsetManagers.keySet(); - - Map<TopicPartition, Long> beginningOffsets= kafkaConsumer.beginningOffsets(topicPartitions); - Map<TopicPartition, Long> endOffsets= kafkaConsumer.endOffsets(topicPartitions); - - for (Map.Entry<TopicPartition, OffsetManager> entry : offsetManagers.entrySet()) { - TopicPartition topicPartition = entry.getKey(); - OffsetManager offsetManager = entry.getValue(); - - long latestTimeOffset = endOffsets.get(topicPartition); - long earliestTimeOffset = beginningOffsets.get(topicPartition); - - long latestEmittedOffset = offsetManager.getLatestEmittedOffset(); - long latestCompletedOffset = offsetManager.getCommittedOffset(); - long spoutLag = latestTimeOffset - latestCompletedOffset; - - String metricPath = topicPartition.topic() + "/partition_" + topicPartition.partition(); - ret.put(metricPath + "/" + "spoutLag", spoutLag); - ret.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffset); - ret.put(metricPath + "/" + "latestTimeOffset", latestTimeOffset); - ret.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffset); - ret.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffset); - - TopicMetrics topicMetrics = topicMetricsMap.get(topicPartition.topic()); - if(topicMetrics == null) { - topicMetrics = new TopicMetrics(); - topicMetricsMap.put(topicPartition.topic(), topicMetrics); - } - - topicMetrics.totalSpoutLag += spoutLag; - topicMetrics.totalEarliestTimeOffset += earliestTimeOffset; - topicMetrics.totalLatestTimeOffset += latestTimeOffset; - topicMetrics.totalLatestEmittedOffset += latestEmittedOffset; - topicMetrics.totalLatestCompletedOffset += latestCompletedOffset; - } - - for(Map.Entry<String, TopicMetrics> e : topicMetricsMap.entrySet()) { - String topic = e.getKey(); - TopicMetrics topicMetrics = e.getValue(); - ret.put(topic + "/" + "totalSpoutLag", topicMetrics.totalSpoutLag); - ret.put(topic + "/" + "totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset); - ret.put(topic + "/" + "totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset); - ret.put(topic + "/" + "totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset); - ret.put(topic + "/" + "totalLatestCompletedOffset", topicMetrics.totalLatestCompletedOffset); - } - LOG.debug("Metrics Tick: value : {}", ret); - return ret; - } else { - LOG.info("Metrics Tick: Not enough data to calculate spout lag."); + + Map<TopicPartition, OffsetManager> offsetManagers = offsetManagerSupplier.get(); + KafkaConsumer kafkaConsumer = consumerSupplier.get(); + + if (offsetManagers == null || offsetManagers.isEmpty() || kafkaConsumer == null) { + LOG.info("Metrics Tick: offsetManagers or kafkaConsumer is null."); + return null; + } + + Map<String,TopicMetrics> topicMetricsMap = new HashMap<>(); + Set<TopicPartition> topicPartitions = offsetManagers.keySet(); + + Map<TopicPartition, Long> beginningOffsets= kafkaConsumer.beginningOffsets(topicPartitions); + Map<TopicPartition, Long> endOffsets= kafkaConsumer.endOffsets(topicPartitions); + Map<String, Long> result = new HashMap<>(); + + for (Map.Entry<TopicPartition, OffsetManager> entry : offsetManagers.entrySet()) { + TopicPartition topicPartition = entry.getKey(); + OffsetManager offsetManager = entry.getValue(); + + long latestTimeOffset = endOffsets.get(topicPartition); + long earliestTimeOffset = beginningOffsets.get(topicPartition); + + long latestEmittedOffset = offsetManager.getLatestEmittedOffset(); + long latestCompletedOffset = offsetManager.getCommittedOffset(); + long spoutLag = latestTimeOffset - latestCompletedOffset; + + String metricPath = topicPartition.topic() + "/partition_" + topicPartition.partition(); + result.put(metricPath + "/" + "spoutLag", spoutLag); + result.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffset); + result.put(metricPath + "/" + "latestTimeOffset", latestTimeOffset); + result.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffset); + result.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffset); + + TopicMetrics topicMetrics = topicMetricsMap.get(topicPartition.topic()); + if(topicMetrics == null) { + topicMetrics = new TopicMetrics(); + topicMetricsMap.put(topicPartition.topic(), topicMetrics); } - } catch (Exception t) { - LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t); + + topicMetrics.totalSpoutLag += spoutLag; + topicMetrics.totalEarliestTimeOffset += earliestTimeOffset; + topicMetrics.totalLatestTimeOffset += latestTimeOffset; + topicMetrics.totalLatestEmittedOffset += latestEmittedOffset; + topicMetrics.totalLatestCompletedOffset += latestCompletedOffset; + } + + for(Map.Entry<String, TopicMetrics> e : topicMetricsMap.entrySet()) { + String topic = e.getKey(); + TopicMetrics topicMetrics = e.getValue(); + result.put(topic + "/" + "totalSpoutLag", topicMetrics.totalSpoutLag); + result.put(topic + "/" + "totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset); --- End diff -- @srdo I agree. totalEarliestTimeOffset alone is not much useful. I just added it to make the metrics compatible with old spout (storm-kafka) metrics. adding totalRecordsInAssignedPartitions is good idea. I have included totalRecordsInAssignedPartitions to metrics. Can you explain why we need to decrease the count by 1? I am not understood what do you mean by disregarding log compaction?
---