Repository: storm Updated Branches: refs/heads/master 3b6813838 -> 500ef20d5
STORM-971: Metric for messages lost due to kafka retention Project: http://git-wip-us.apache.org/repos/asf/storm/repo Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8c761c5d Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8c761c5d Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8c761c5d Branch: refs/heads/master Commit: 8c761c5d6d01cdc50db27f144ca361067ffb3ba7 Parents: 6390d18 Author: Abhishek Agarwal <[email protected]> Authored: Sun Mar 13 00:37:14 2016 +0530 Committer: Abhishek Agarwal <[email protected]> Committed: Sun Mar 13 00:37:14 2016 +0530 ---------------------------------------------------------------------- .../jvm/org/apache/storm/kafka/PartitionManager.java | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/storm/blob/8c761c5d/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java ---------------------------------------------------------------------- diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java index 9d78fdc..5c8fda8 100644 --- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java +++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java @@ -44,6 +44,8 @@ public class PartitionManager { private final ReducedMetric _fetchAPILatencyMean; private final CountMetric _fetchAPICallCount; private final CountMetric _fetchAPIMessageCount; + // Count of messages which could not be emitted or retried because they were deleted from kafka + private final CountMetric _lostMessageCount; Long _emittedToOffset; // _pending key = Kafka offset, value = time at which the message was first submitted to the topology private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>(); @@ -117,6 +119,7 @@ public class PartitionManager { _fetchAPILatencyMean = new ReducedMetric(new MeanReducer()); _fetchAPICallCount = new CountMetric(); _fetchAPIMessageCount = new CountMetric(); + _lostMessageCount = new CountMetric(); } public Map getMetricsDataMap() { @@ -125,6 +128,7 @@ public class PartitionManager { ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset()); ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset()); ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset()); + ret.put(_partition + "/lostMessageCount", _lostMessageCount.getValueAndReset()); return ret; } @@ -185,7 +189,7 @@ public class PartitionManager { msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset); } catch (TopicOffsetOutOfRangeException e) { offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime()); - // fetch failed, so don't update the metrics + // fetch failed, so don't update the fetch metrics //fix bug [STORM-643] : remove outdated failed offsets if (!processingNewTuples) { @@ -194,11 +198,17 @@ public class PartitionManager { // offset, since they are anyway not there. // These calls to broker API will be then saved. Set<Long> omitted = this._failedMsgRetryManager.clearInvalidMessages(offset); + + // Omitted messages have not been acked and may be lost + if (null != omitted) { + _lostMessageCount.incrBy(omitted.size()); + } LOG.warn("Removing the failed offsets that are out of range: {}", omitted); } if (offset > _emittedToOffset) { + _lostMessageCount.incrBy(offset - _emittedToOffset); _emittedToOffset = offset; LOG.warn("{} Using new offset: {}", _partition.partition, _emittedToOffset); }
