mjsax commented on a change in pull request #8697: URL: https://github.com/apache/kafka/pull/8697#discussion_r430753853
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ########## @@ -149,6 +154,10 @@ public int hashCode() { public static final String RATE_DESCRIPTION_PREFIX = "The average number of "; public static final String RATE_DESCRIPTION_SUFFIX = " per second"; + public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000; // 1 MB + public static long MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000L; // maximum latency is 10 days Review comment: Thanks for the details. Avoiding a config for now sounds good to me. This leave the path open to add a config later, or as John suggested to maybe change the algorithm (that might not need a max). I am fine with a hard coded max of 10 days. Also +1 to John's suggestion to split percentiles and min/max to avoid applying the hard coded limed to min/max metric. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() { return punctuated; } + void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) { + maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName); + } + + private void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) { + final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName); + if (e2eLatencySensor == null) { + throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName); + } else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) { + final long e2eLatency = now - recordTimestamp; + if (e2eLatency > MAXIMUM_E2E_LATENCY) { + log.warn("Skipped recording e2e latency for node {} because {} is higher than maximum allowed latency {}", + nodeName, e2eLatency, MAXIMUM_E2E_LATENCY); + } else if (e2eLatency < MINIMUM_E2E_LATENCY) { Review comment: I would not record a negative latency. That seems to be kinda weird. I am fine with skipping and warning, too. Just wanted to clarify. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -586,6 +606,7 @@ public boolean process(final long wallClockTime) { log.trace("Start processing one record [{}]", record); updateProcessorContext(record, currNode, wallClockTime); + maybeRecordE2ELatency(record.timestamp, wallClockTime, currNode.name()); Review comment: > o whether we measure these nodes before or "after" their processing logic should make no practical difference at all. I think it make a big difference, and only recording _before_ processing is what we want (according to what the KIP says). Otherwise, the latency includes the processing time for one or more processors (in the worst case even all processors). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org