Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2480#discussion_r159076756
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
---
@@ -31,77 +32,75 @@
public class KafkaOffsetMetric implements IMetric {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaOffsetMetric.class);
- private Map<TopicPartition, OffsetManager> offsetManagers;
- private KafkaConsumer kafkaConsumer;
+ private Supplier<Map<TopicPartition, OffsetManager>>
offsetManagerSupplier;
+ private Supplier<KafkaConsumer> consumerSupplier;
- public KafkaOffsetMetric(Map<TopicPartition, OffsetManager>
offsetManagers) {
- this.offsetManagers = offsetManagers;
+ public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier
consumerSupplier) {
+ this.offsetManagerSupplier = offsetManagerSupplier;
+ this.consumerSupplier = consumerSupplier;
}
@Override
public Object getValueAndReset() {
- try {
- HashMap<String, Long> ret = new HashMap<>();
- if (offsetManagers != null && !offsetManagers.isEmpty() &&
kafkaConsumer != null) {
- Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
- Set<TopicPartition> topicPartitions =
offsetManagers.keySet();
-
- Map<TopicPartition, Long> beginningOffsets=
kafkaConsumer.beginningOffsets(topicPartitions);
- Map<TopicPartition, Long> endOffsets=
kafkaConsumer.endOffsets(topicPartitions);
-
- for (Map.Entry<TopicPartition, OffsetManager> entry :
offsetManagers.entrySet()) {
- TopicPartition topicPartition = entry.getKey();
- OffsetManager offsetManager = entry.getValue();
-
- long latestTimeOffset = endOffsets.get(topicPartition);
- long earliestTimeOffset =
beginningOffsets.get(topicPartition);
-
- long latestEmittedOffset =
offsetManager.getLatestEmittedOffset();
- long latestCompletedOffset =
offsetManager.getCommittedOffset();
- long spoutLag = latestTimeOffset -
latestCompletedOffset;
-
- String metricPath = topicPartition.topic() +
"/partition_" + topicPartition.partition();
- ret.put(metricPath + "/" + "spoutLag", spoutLag);
- ret.put(metricPath + "/" + "earliestTimeOffset",
earliestTimeOffset);
- ret.put(metricPath + "/" + "latestTimeOffset",
latestTimeOffset);
- ret.put(metricPath + "/" + "latestEmittedOffset",
latestEmittedOffset);
- ret.put(metricPath + "/" + "latestCompletedOffset",
latestCompletedOffset);
-
- TopicMetrics topicMetrics =
topicMetricsMap.get(topicPartition.topic());
- if(topicMetrics == null) {
- topicMetrics = new TopicMetrics();
- topicMetricsMap.put(topicPartition.topic(),
topicMetrics);
- }
-
- topicMetrics.totalSpoutLag += spoutLag;
- topicMetrics.totalEarliestTimeOffset +=
earliestTimeOffset;
- topicMetrics.totalLatestTimeOffset += latestTimeOffset;
- topicMetrics.totalLatestEmittedOffset +=
latestEmittedOffset;
- topicMetrics.totalLatestCompletedOffset +=
latestCompletedOffset;
- }
-
- for(Map.Entry<String, TopicMetrics> e :
topicMetricsMap.entrySet()) {
- String topic = e.getKey();
- TopicMetrics topicMetrics = e.getValue();
- ret.put(topic + "/" + "totalSpoutLag",
topicMetrics.totalSpoutLag);
- ret.put(topic + "/" + "totalEarliestTimeOffset",
topicMetrics.totalEarliestTimeOffset);
- ret.put(topic + "/" + "totalLatestTimeOffset",
topicMetrics.totalLatestTimeOffset);
- ret.put(topic + "/" + "totalLatestEmittedOffset",
topicMetrics.totalLatestEmittedOffset);
- ret.put(topic + "/" + "totalLatestCompletedOffset",
topicMetrics.totalLatestCompletedOffset);
- }
- LOG.debug("Metrics Tick: value : {}", ret);
- return ret;
- } else {
- LOG.info("Metrics Tick: Not enough data to calculate spout
lag.");
+
+ Map<TopicPartition, OffsetManager> offsetManagers =
offsetManagerSupplier.get();
+ KafkaConsumer kafkaConsumer = consumerSupplier.get();
+
+ if (offsetManagers == null || offsetManagers.isEmpty() ||
kafkaConsumer == null) {
+ LOG.info("Metrics Tick: offsetManagers or kafkaConsumer is
null.");
+ return null;
+ }
+
+ Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
+ Set<TopicPartition> topicPartitions = offsetManagers.keySet();
+
+ Map<TopicPartition, Long> beginningOffsets=
kafkaConsumer.beginningOffsets(topicPartitions);
+ Map<TopicPartition, Long> endOffsets=
kafkaConsumer.endOffsets(topicPartitions);
+ Map<String, Long> result = new HashMap<>();
+
+ for (Map.Entry<TopicPartition, OffsetManager> entry :
offsetManagers.entrySet()) {
+ TopicPartition topicPartition = entry.getKey();
+ OffsetManager offsetManager = entry.getValue();
+
+ long latestTimeOffset = endOffsets.get(topicPartition);
+ long earliestTimeOffset = beginningOffsets.get(topicPartition);
+
+ long latestEmittedOffset =
offsetManager.getLatestEmittedOffset();
+ long latestCompletedOffset =
offsetManager.getCommittedOffset();
+ long spoutLag = latestTimeOffset - latestCompletedOffset;
+
+ String metricPath = topicPartition.topic() + "/partition_" +
topicPartition.partition();
+ result.put(metricPath + "/" + "spoutLag", spoutLag);
+ result.put(metricPath + "/" + "earliestTimeOffset",
earliestTimeOffset);
+ result.put(metricPath + "/" + "latestTimeOffset",
latestTimeOffset);
+ result.put(metricPath + "/" + "latestEmittedOffset",
latestEmittedOffset);
+ result.put(metricPath + "/" + "latestCompletedOffset",
latestCompletedOffset);
+
+ TopicMetrics topicMetrics =
topicMetricsMap.get(topicPartition.topic());
+ if(topicMetrics == null) {
+ topicMetrics = new TopicMetrics();
+ topicMetricsMap.put(topicPartition.topic(), topicMetrics);
}
- } catch (Exception t) {
- LOG.warn("Metrics Tick: Exception when computing kafkaOffset
metric.", t);
+
+ topicMetrics.totalSpoutLag += spoutLag;
+ topicMetrics.totalEarliestTimeOffset += earliestTimeOffset;
+ topicMetrics.totalLatestTimeOffset += latestTimeOffset;
+ topicMetrics.totalLatestEmittedOffset += latestEmittedOffset;
+ topicMetrics.totalLatestCompletedOffset +=
latestCompletedOffset;
+ }
+
+ for(Map.Entry<String, TopicMetrics> e :
topicMetricsMap.entrySet()) {
+ String topic = e.getKey();
+ TopicMetrics topicMetrics = e.getValue();
+ result.put(topic + "/" + "totalSpoutLag",
topicMetrics.totalSpoutLag);
+ result.put(topic + "/" + "totalEarliestTimeOffset",
topicMetrics.totalEarliestTimeOffset);
--- End diff --
The reason I'm asking is because I'd rather only copy over the useful
metrics from storm-kafka. It's easier to not add metrics we don't think are
useful, than to remove them later.
Sorry, I made a mistake. We don't need to subtract 1. My bad.
Regarding why we disregard log compaction, if there's a partition with a
log end offset (latestTimeOffset) of 10, and a beginningOffset of 0, we'd
expect there to be 10 messages. When log compaction is enabled, some of the
messages between 0 and 10 may have been replaced by tombstones, so they're not
there anymore. The metric will still show 10 messages present, because we have
no way to know if some of the messages between the beginningOffset and log end
offset have been compacted away.
Great, thanks for taking the time to verify that the reflection code works.
---