Github user omkreddy commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2480#discussion_r159075477
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
    @@ -31,77 +32,75 @@
     public class KafkaOffsetMetric implements IMetric {
     
         private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
    -    private Map<TopicPartition, OffsetManager> offsetManagers;
    -    private KafkaConsumer kafkaConsumer;
    +    private Supplier<Map<TopicPartition, OffsetManager>> 
offsetManagerSupplier;
    +    private Supplier<KafkaConsumer> consumerSupplier;
     
    -    public KafkaOffsetMetric(Map<TopicPartition, OffsetManager> 
offsetManagers) {
    -        this.offsetManagers = offsetManagers;
    +    public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
    +        this.offsetManagerSupplier = offsetManagerSupplier;
    +        this.consumerSupplier = consumerSupplier;
         }
     
         @Override
         public Object getValueAndReset() {
    -        try {
    -            HashMap<String, Long> ret = new HashMap<>();
    -            if (offsetManagers != null && !offsetManagers.isEmpty() && 
kafkaConsumer != null) {
    -                Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
    -                Set<TopicPartition> topicPartitions = 
offsetManagers.keySet();
    -
    -                Map<TopicPartition, Long> beginningOffsets= 
kafkaConsumer.beginningOffsets(topicPartitions);
    -                Map<TopicPartition, Long> endOffsets= 
kafkaConsumer.endOffsets(topicPartitions);
    -
    -                for (Map.Entry<TopicPartition, OffsetManager> entry : 
offsetManagers.entrySet()) {
    -                    TopicPartition topicPartition = entry.getKey();
    -                    OffsetManager offsetManager = entry.getValue();
    -
    -                    long latestTimeOffset = endOffsets.get(topicPartition);
    -                    long earliestTimeOffset = 
beginningOffsets.get(topicPartition);
    -
    -                    long latestEmittedOffset = 
offsetManager.getLatestEmittedOffset();
    -                    long latestCompletedOffset = 
offsetManager.getCommittedOffset();
    -                    long spoutLag = latestTimeOffset - 
latestCompletedOffset;
    -
    -                    String metricPath = topicPartition.topic()  + 
"/partition_" + topicPartition.partition();
    -                    ret.put(metricPath + "/" + "spoutLag", spoutLag);
    -                    ret.put(metricPath + "/" + "earliestTimeOffset", 
earliestTimeOffset);
    -                    ret.put(metricPath + "/" + "latestTimeOffset", 
latestTimeOffset);
    -                    ret.put(metricPath + "/" + "latestEmittedOffset", 
latestEmittedOffset);
    -                    ret.put(metricPath + "/" + "latestCompletedOffset", 
latestCompletedOffset);
    -
    -                    TopicMetrics topicMetrics = 
topicMetricsMap.get(topicPartition.topic());
    -                    if(topicMetrics == null) {
    -                        topicMetrics = new TopicMetrics();
    -                        topicMetricsMap.put(topicPartition.topic(), 
topicMetrics);
    -                    }
    -
    -                    topicMetrics.totalSpoutLag += spoutLag;
    -                    topicMetrics.totalEarliestTimeOffset += 
earliestTimeOffset;
    -                    topicMetrics.totalLatestTimeOffset += latestTimeOffset;
    -                    topicMetrics.totalLatestEmittedOffset += 
latestEmittedOffset;
    -                    topicMetrics.totalLatestCompletedOffset += 
latestCompletedOffset;
    -                }
    -
    -                for(Map.Entry<String, TopicMetrics> e : 
topicMetricsMap.entrySet()) {
    -                    String topic = e.getKey();
    -                    TopicMetrics topicMetrics = e.getValue();
    -                    ret.put(topic + "/" + "totalSpoutLag", 
topicMetrics.totalSpoutLag);
    -                    ret.put(topic + "/" + "totalEarliestTimeOffset", 
topicMetrics.totalEarliestTimeOffset);
    -                    ret.put(topic + "/" + "totalLatestTimeOffset", 
topicMetrics.totalLatestTimeOffset);
    -                    ret.put(topic + "/" + "totalLatestEmittedOffset", 
topicMetrics.totalLatestEmittedOffset);
    -                    ret.put(topic + "/" + "totalLatestCompletedOffset", 
topicMetrics.totalLatestCompletedOffset);
    -                }
    -                LOG.debug("Metrics Tick: value : {}", ret);
    -                return ret;
    -            } else {
    -                LOG.info("Metrics Tick: Not enough data to calculate spout 
lag.");
    +
    +        Map<TopicPartition, OffsetManager> offsetManagers = 
offsetManagerSupplier.get();
    +        KafkaConsumer kafkaConsumer = consumerSupplier.get();
    +
    +        if (offsetManagers == null || offsetManagers.isEmpty() || 
kafkaConsumer == null) {
    +            LOG.info("Metrics Tick: offsetManagers or kafkaConsumer is 
null.");
    +            return null;
    +        }
    +
    +        Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
    +        Set<TopicPartition> topicPartitions = offsetManagers.keySet();
    +
    +        Map<TopicPartition, Long> beginningOffsets= 
kafkaConsumer.beginningOffsets(topicPartitions);
    +        Map<TopicPartition, Long> endOffsets= 
kafkaConsumer.endOffsets(topicPartitions);
    +        Map<String, Long> result = new HashMap<>();
    +
    +        for (Map.Entry<TopicPartition, OffsetManager> entry : 
offsetManagers.entrySet()) {
    +            TopicPartition topicPartition = entry.getKey();
    +            OffsetManager offsetManager = entry.getValue();
    +
    +            long latestTimeOffset = endOffsets.get(topicPartition);
    +            long earliestTimeOffset = beginningOffsets.get(topicPartition);
    +
    +            long latestEmittedOffset = 
offsetManager.getLatestEmittedOffset();
    +            long latestCompletedOffset = 
offsetManager.getCommittedOffset();
    +            long spoutLag = latestTimeOffset - latestCompletedOffset;
    +
    +            String metricPath = topicPartition.topic()  + "/partition_" + 
topicPartition.partition();
    +            result.put(metricPath + "/" + "spoutLag", spoutLag);
    +            result.put(metricPath + "/" + "earliestTimeOffset", 
earliestTimeOffset);
    +            result.put(metricPath + "/" + "latestTimeOffset", 
latestTimeOffset);
    +            result.put(metricPath + "/" + "latestEmittedOffset", 
latestEmittedOffset);
    +            result.put(metricPath + "/" + "latestCompletedOffset", 
latestCompletedOffset);
    +
    +            TopicMetrics topicMetrics = 
topicMetricsMap.get(topicPartition.topic());
    +            if(topicMetrics == null) {
    +                topicMetrics = new TopicMetrics();
    +                topicMetricsMap.put(topicPartition.topic(), topicMetrics);
                 }
    -        } catch (Exception t) {
    -            LOG.warn("Metrics Tick: Exception when computing kafkaOffset 
metric.", t);
    +
    +            topicMetrics.totalSpoutLag += spoutLag;
    +            topicMetrics.totalEarliestTimeOffset += earliestTimeOffset;
    +            topicMetrics.totalLatestTimeOffset += latestTimeOffset;
    +            topicMetrics.totalLatestEmittedOffset += latestEmittedOffset;
    +            topicMetrics.totalLatestCompletedOffset += 
latestCompletedOffset;
    +        }
    +
    +        for(Map.Entry<String, TopicMetrics> e : 
topicMetricsMap.entrySet()) {
    +            String topic = e.getKey();
    +            TopicMetrics topicMetrics = e.getValue();
    +            result.put(topic + "/" + "totalSpoutLag", 
topicMetrics.totalSpoutLag);
    +            result.put(topic + "/" + "totalEarliestTimeOffset", 
topicMetrics.totalEarliestTimeOffset);
    --- End diff --
    
    @srdo  I agree. totalEarliestTimeOffset alone is not much useful. I just 
added it to make the metrics compatible with old spout (storm-kafka) metrics. 
    
    adding totalRecordsInAssignedPartitions is good idea. I have included 
totalRecordsInAssignedPartitions to metrics. Can you explain why we need to 
decrease the count by 1?  I am not understood what do you mean by disregarding 
log compaction?



---

Reply via email to