ableegoldman commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429505441



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() {
         return punctuated;
     }
 
+    void maybeRecordE2ELatency(final long recordTimestamp, final String 
nodeName) {
+        maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName);
+    }
+
+    private void maybeRecordE2ELatency(final long recordTimestamp, final long 
now, final String nodeName) {
+        final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
+        if (e2eLatencySensor == null) {
+            throw new IllegalStateException("Requested to record e2e latency 
but could not find sensor for node " + nodeName);
+        } else if (e2eLatencySensor.shouldRecord() && 
e2eLatencySensor.hasMetrics()) {
+            final long e2eLatency = now - recordTimestamp;
+            if (e2eLatency >  MAXIMUM_E2E_LATENCY) {
+                log.warn("Skipped recording e2e latency for node {} because {} 
is higher than maximum allowed latency {}",
+                         nodeName, e2eLatency, MAXIMUM_E2E_LATENCY);
+            } else if (e2eLatency < MINIMUM_E2E_LATENCY) {

Review comment:
       I was debating this...my thinking here was that a negative value 
probably means you're processing some records with "future" timestamps, for 
whatever reason, in which case the e2e latency isn't meaningful and they 
shouldn't affect the statistics. 
   Or, your clocks are out of sync. I suppose we could add a separate metric 
that counts the number of records with negative e2e latency?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to