Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2480#discussion_r159076756
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
 ---
    @@ -31,77 +32,75 @@
     public class KafkaOffsetMetric implements IMetric {
     
         private static final Logger LOG = 
LoggerFactory.getLogger(KafkaOffsetMetric.class);
    -    private Map<TopicPartition, OffsetManager> offsetManagers;
    -    private KafkaConsumer kafkaConsumer;
    +    private Supplier<Map<TopicPartition, OffsetManager>> 
offsetManagerSupplier;
    +    private Supplier<KafkaConsumer> consumerSupplier;
     
    -    public KafkaOffsetMetric(Map<TopicPartition, OffsetManager> 
offsetManagers) {
    -        this.offsetManagers = offsetManagers;
    +    public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier 
consumerSupplier) {
    +        this.offsetManagerSupplier = offsetManagerSupplier;
    +        this.consumerSupplier = consumerSupplier;
         }
     
         @Override
         public Object getValueAndReset() {
    -        try {
    -            HashMap<String, Long> ret = new HashMap<>();
    -            if (offsetManagers != null && !offsetManagers.isEmpty() && 
kafkaConsumer != null) {
    -                Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
    -                Set<TopicPartition> topicPartitions = 
offsetManagers.keySet();
    -
    -                Map<TopicPartition, Long> beginningOffsets= 
kafkaConsumer.beginningOffsets(topicPartitions);
    -                Map<TopicPartition, Long> endOffsets= 
kafkaConsumer.endOffsets(topicPartitions);
    -
    -                for (Map.Entry<TopicPartition, OffsetManager> entry : 
offsetManagers.entrySet()) {
    -                    TopicPartition topicPartition = entry.getKey();
    -                    OffsetManager offsetManager = entry.getValue();
    -
    -                    long latestTimeOffset = endOffsets.get(topicPartition);
    -                    long earliestTimeOffset = 
beginningOffsets.get(topicPartition);
    -
    -                    long latestEmittedOffset = 
offsetManager.getLatestEmittedOffset();
    -                    long latestCompletedOffset = 
offsetManager.getCommittedOffset();
    -                    long spoutLag = latestTimeOffset - 
latestCompletedOffset;
    -
    -                    String metricPath = topicPartition.topic()  + 
"/partition_" + topicPartition.partition();
    -                    ret.put(metricPath + "/" + "spoutLag", spoutLag);
    -                    ret.put(metricPath + "/" + "earliestTimeOffset", 
earliestTimeOffset);
    -                    ret.put(metricPath + "/" + "latestTimeOffset", 
latestTimeOffset);
    -                    ret.put(metricPath + "/" + "latestEmittedOffset", 
latestEmittedOffset);
    -                    ret.put(metricPath + "/" + "latestCompletedOffset", 
latestCompletedOffset);
    -
    -                    TopicMetrics topicMetrics = 
topicMetricsMap.get(topicPartition.topic());
    -                    if(topicMetrics == null) {
    -                        topicMetrics = new TopicMetrics();
    -                        topicMetricsMap.put(topicPartition.topic(), 
topicMetrics);
    -                    }
    -
    -                    topicMetrics.totalSpoutLag += spoutLag;
    -                    topicMetrics.totalEarliestTimeOffset += 
earliestTimeOffset;
    -                    topicMetrics.totalLatestTimeOffset += latestTimeOffset;
    -                    topicMetrics.totalLatestEmittedOffset += 
latestEmittedOffset;
    -                    topicMetrics.totalLatestCompletedOffset += 
latestCompletedOffset;
    -                }
    -
    -                for(Map.Entry<String, TopicMetrics> e : 
topicMetricsMap.entrySet()) {
    -                    String topic = e.getKey();
    -                    TopicMetrics topicMetrics = e.getValue();
    -                    ret.put(topic + "/" + "totalSpoutLag", 
topicMetrics.totalSpoutLag);
    -                    ret.put(topic + "/" + "totalEarliestTimeOffset", 
topicMetrics.totalEarliestTimeOffset);
    -                    ret.put(topic + "/" + "totalLatestTimeOffset", 
topicMetrics.totalLatestTimeOffset);
    -                    ret.put(topic + "/" + "totalLatestEmittedOffset", 
topicMetrics.totalLatestEmittedOffset);
    -                    ret.put(topic + "/" + "totalLatestCompletedOffset", 
topicMetrics.totalLatestCompletedOffset);
    -                }
    -                LOG.debug("Metrics Tick: value : {}", ret);
    -                return ret;
    -            } else {
    -                LOG.info("Metrics Tick: Not enough data to calculate spout 
lag.");
    +
    +        Map<TopicPartition, OffsetManager> offsetManagers = 
offsetManagerSupplier.get();
    +        KafkaConsumer kafkaConsumer = consumerSupplier.get();
    +
    +        if (offsetManagers == null || offsetManagers.isEmpty() || 
kafkaConsumer == null) {
    +            LOG.info("Metrics Tick: offsetManagers or kafkaConsumer is 
null.");
    +            return null;
    +        }
    +
    +        Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
    +        Set<TopicPartition> topicPartitions = offsetManagers.keySet();
    +
    +        Map<TopicPartition, Long> beginningOffsets= 
kafkaConsumer.beginningOffsets(topicPartitions);
    +        Map<TopicPartition, Long> endOffsets= 
kafkaConsumer.endOffsets(topicPartitions);
    +        Map<String, Long> result = new HashMap<>();
    +
    +        for (Map.Entry<TopicPartition, OffsetManager> entry : 
offsetManagers.entrySet()) {
    +            TopicPartition topicPartition = entry.getKey();
    +            OffsetManager offsetManager = entry.getValue();
    +
    +            long latestTimeOffset = endOffsets.get(topicPartition);
    +            long earliestTimeOffset = beginningOffsets.get(topicPartition);
    +
    +            long latestEmittedOffset = 
offsetManager.getLatestEmittedOffset();
    +            long latestCompletedOffset = 
offsetManager.getCommittedOffset();
    +            long spoutLag = latestTimeOffset - latestCompletedOffset;
    +
    +            String metricPath = topicPartition.topic()  + "/partition_" + 
topicPartition.partition();
    +            result.put(metricPath + "/" + "spoutLag", spoutLag);
    +            result.put(metricPath + "/" + "earliestTimeOffset", 
earliestTimeOffset);
    +            result.put(metricPath + "/" + "latestTimeOffset", 
latestTimeOffset);
    +            result.put(metricPath + "/" + "latestEmittedOffset", 
latestEmittedOffset);
    +            result.put(metricPath + "/" + "latestCompletedOffset", 
latestCompletedOffset);
    +
    +            TopicMetrics topicMetrics = 
topicMetricsMap.get(topicPartition.topic());
    +            if(topicMetrics == null) {
    +                topicMetrics = new TopicMetrics();
    +                topicMetricsMap.put(topicPartition.topic(), topicMetrics);
                 }
    -        } catch (Exception t) {
    -            LOG.warn("Metrics Tick: Exception when computing kafkaOffset 
metric.", t);
    +
    +            topicMetrics.totalSpoutLag += spoutLag;
    +            topicMetrics.totalEarliestTimeOffset += earliestTimeOffset;
    +            topicMetrics.totalLatestTimeOffset += latestTimeOffset;
    +            topicMetrics.totalLatestEmittedOffset += latestEmittedOffset;
    +            topicMetrics.totalLatestCompletedOffset += 
latestCompletedOffset;
    +        }
    +
    +        for(Map.Entry<String, TopicMetrics> e : 
topicMetricsMap.entrySet()) {
    +            String topic = e.getKey();
    +            TopicMetrics topicMetrics = e.getValue();
    +            result.put(topic + "/" + "totalSpoutLag", 
topicMetrics.totalSpoutLag);
    +            result.put(topic + "/" + "totalEarliestTimeOffset", 
topicMetrics.totalEarliestTimeOffset);
    --- End diff --
    
    The reason I'm asking is because I'd rather only copy over the useful 
metrics from storm-kafka. It's easier to not add metrics we don't think are 
useful, than to remove them later.
    
    Sorry, I made a mistake. We don't need to subtract 1. My bad.
    
    Regarding why we disregard log compaction, if there's a partition with a 
log end offset (latestTimeOffset) of 10, and a beginningOffset of 0, we'd 
expect there to be 10 messages. When log compaction is enabled, some of the 
messages between 0 and 10 may have been replaced by tombstones, so they're not 
there anymore. The metric will still show 10 messages present, because we have 
no way to know if some of the messages between the beginningOffset and log end 
offset have been compacted away.
    
    Great, thanks for taking the time to verify that the reflection code works.


---

Reply via email to