ableegoldman commented on a change in pull request #8697: URL: https://github.com/apache/kafka/pull/8697#discussion_r429505441
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() { return punctuated; } + void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) { + maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName); + } + + private void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) { + final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName); + if (e2eLatencySensor == null) { + throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName); + } else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) { + final long e2eLatency = now - recordTimestamp; + if (e2eLatency > MAXIMUM_E2E_LATENCY) { + log.warn("Skipped recording e2e latency for node {} because {} is higher than maximum allowed latency {}", + nodeName, e2eLatency, MAXIMUM_E2E_LATENCY); + } else if (e2eLatency < MINIMUM_E2E_LATENCY) { Review comment: I was debating this...my thinking here was that a negative value probably means you're processing some records with "future" timestamps, for whatever reason, in which case the e2e latency isn't meaningful and they shouldn't affect the statistics. Or, your clocks are out of sync. I suppose we could add a separate metric that counts the number of records with negative e2e latency? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org