Repository: storm
Updated Branches:
  refs/heads/master 3b6813838 -> 500ef20d5


STORM-971: Metric for messages lost due to kafka retention


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8c761c5d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8c761c5d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8c761c5d

Branch: refs/heads/master
Commit: 8c761c5d6d01cdc50db27f144ca361067ffb3ba7
Parents: 6390d18
Author: Abhishek Agarwal <[email protected]>
Authored: Sun Mar 13 00:37:14 2016 +0530
Committer: Abhishek Agarwal <[email protected]>
Committed: Sun Mar 13 00:37:14 2016 +0530

----------------------------------------------------------------------
 .../jvm/org/apache/storm/kafka/PartitionManager.java    | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8c761c5d/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git 
a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java 
b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
index 9d78fdc..5c8fda8 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
@@ -44,6 +44,8 @@ public class PartitionManager {
     private final ReducedMetric _fetchAPILatencyMean;
     private final CountMetric _fetchAPICallCount;
     private final CountMetric _fetchAPIMessageCount;
+    // Count of messages which could not be emitted or retried because they 
were deleted from kafka
+    private final CountMetric _lostMessageCount;
     Long _emittedToOffset;
     // _pending key = Kafka offset, value = time at which the message was 
first submitted to the topology
     private SortedMap<Long,Long> _pending = new TreeMap<Long,Long>();
@@ -117,6 +119,7 @@ public class PartitionManager {
         _fetchAPILatencyMean = new ReducedMetric(new MeanReducer());
         _fetchAPICallCount = new CountMetric();
         _fetchAPIMessageCount = new CountMetric();
+        _lostMessageCount = new CountMetric();
     }
 
     public Map getMetricsDataMap() {
@@ -125,6 +128,7 @@ public class PartitionManager {
         ret.put(_partition + "/fetchAPILatencyMean", 
_fetchAPILatencyMean.getValueAndReset());
         ret.put(_partition + "/fetchAPICallCount", 
_fetchAPICallCount.getValueAndReset());
         ret.put(_partition + "/fetchAPIMessageCount", 
_fetchAPIMessageCount.getValueAndReset());
+        ret.put(_partition + "/lostMessageCount", 
_lostMessageCount.getValueAndReset());
         return ret;
     }
 
@@ -185,7 +189,7 @@ public class PartitionManager {
             msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, 
_partition, offset);
         } catch (TopicOffsetOutOfRangeException e) {
             offset = KafkaUtils.getOffset(_consumer, _partition.topic, 
_partition.partition, kafka.api.OffsetRequest.EarliestTime());
-            // fetch failed, so don't update the metrics
+            // fetch failed, so don't update the fetch metrics
             
             //fix bug [STORM-643] : remove outdated failed offsets
             if (!processingNewTuples) {
@@ -194,11 +198,17 @@ public class PartitionManager {
                 // offset, since they are anyway not there.
                 // These calls to broker API will be then saved.
                 Set<Long> omitted = 
this._failedMsgRetryManager.clearInvalidMessages(offset);
+
+                // Omitted messages have not been acked and may be lost
+                if (null != omitted) {
+                    _lostMessageCount.incrBy(omitted.size());
+                }
                 
                 LOG.warn("Removing the failed offsets that are out of range: 
{}", omitted);
             }
 
             if (offset > _emittedToOffset) {
+                _lostMessageCount.incrBy(offset - _emittedToOffset);
                 _emittedToOffset = offset;
                 LOG.warn("{} Using new offset: {}", _partition.partition, 
_emittedToOffset);
             }

Reply via email to