mjsax commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r430753853



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -149,6 +154,10 @@ public int hashCode() {
     public static final String RATE_DESCRIPTION_PREFIX = "The average number 
of ";
     public static final String RATE_DESCRIPTION_SUFFIX = " per second";
 
+    public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000;    // 1 MB
+    public static long MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000L; // 
maximum latency is 10 days

Review comment:
       Thanks for the details. Avoiding a config for now sounds good to me. 
This leave the path open to add a config later, or as John suggested to maybe 
change the algorithm (that might not need a max). I am fine with a hard coded 
max of 10 days.
   
   Also +1 to John's suggestion to split percentiles and min/max to avoid 
applying the hard coded limed to min/max metric.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() {
         return punctuated;
     }
 
+    void maybeRecordE2ELatency(final long recordTimestamp, final String 
nodeName) {
+        maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName);
+    }
+
+    private void maybeRecordE2ELatency(final long recordTimestamp, final long 
now, final String nodeName) {
+        final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
+        if (e2eLatencySensor == null) {
+            throw new IllegalStateException("Requested to record e2e latency 
but could not find sensor for node " + nodeName);
+        } else if (e2eLatencySensor.shouldRecord() && 
e2eLatencySensor.hasMetrics()) {
+            final long e2eLatency = now - recordTimestamp;
+            if (e2eLatency >  MAXIMUM_E2E_LATENCY) {
+                log.warn("Skipped recording e2e latency for node {} because {} 
is higher than maximum allowed latency {}",
+                         nodeName, e2eLatency, MAXIMUM_E2E_LATENCY);
+            } else if (e2eLatency < MINIMUM_E2E_LATENCY) {

Review comment:
       I would not record a negative latency. That seems to be kinda weird. I 
am fine with skipping and warning, too. Just wanted to clarify.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -586,6 +606,7 @@ public boolean process(final long wallClockTime) {
             log.trace("Start processing one record [{}]", record);
 
             updateProcessorContext(record, currNode, wallClockTime);
+            maybeRecordE2ELatency(record.timestamp, wallClockTime, 
currNode.name());

Review comment:
       > o whether we measure these nodes before or "after" their processing 
logic should make no practical difference at all.
   
   I think it make a big difference, and only recording _before_ processing is 
what we want (according to what the KIP says). Otherwise, the latency includes 
the processing time for one or more processors (in the worst case even all 
processors).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to