mjsax commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429499839



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() {
         return punctuated;
     }
 
+    void maybeRecordE2ELatency(final long recordTimestamp, final String 
nodeName) {
+        maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName);
+    }
+
+    private void maybeRecordE2ELatency(final long recordTimestamp, final long 
now, final String nodeName) {
+        final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
+        if (e2eLatencySensor == null) {
+            throw new IllegalStateException("Requested to record e2e latency 
but could not find sensor for node " + nodeName);
+        } else if (e2eLatencySensor.shouldRecord() && 
e2eLatencySensor.hasMetrics()) {
+            final long e2eLatency = now - recordTimestamp;
+            if (e2eLatency >  MAXIMUM_E2E_LATENCY) {

Review comment:
       Not sure about this... Why do we need/want to have a limit?
   
   Nit: double space




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to