cadonna commented on a change in pull request #8697: URL: https://github.com/apache/kafka/pull/8697#discussion_r429814548
########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java ########## @@ -260,6 +261,43 @@ public void shouldGetProcessAtSourceSensorOrForwardSensor() { } } + @Test + public void shouldGetRecordE2ELatencySensor() { + final String operation = "record-e2e-latency"; + final String recordE2ELatencyMinDescription = + "The minimum end-to-end latency of a record, measuring by comparing the record timestamp with the " + + "system time when it has been fully processed by the node"; + final String recordE2ELatencyMaxDescription = + "The maximum end-to-end latency of a record, measuring by comparing the record timestamp with the " + + "system time when it has been fully processed by the node"; + final String recordE2ELatencyP99Description = + "The 99th percentile end-to-end latency of a record, measuring by comparing the record timestamp with the " + + "system time when it has been fully processed by the node"; + final String recordE2ELatencyP90Description = + "The 90th percentile end-to-end latency of a record, measuring by comparing the record timestamp with the " + + "system time when it has been fully processed by the node"; + expect(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, operation, RecordingLevel.INFO)) + .andReturn(expectedSensor); + expect(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).andReturn(tagMap); + StreamsMetricsImpl.addMinAndMaxAndP99AndP90ToSensor( + expectedSensor, + PROCESSOR_NODE_LEVEL_GROUP, + tagMap, + operation, + recordE2ELatencyMinDescription, + recordE2ELatencyMaxDescription, + recordE2ELatencyP99Description, + recordE2ELatencyP90Description + ); + + replay(StreamsMetricsImpl.class, streamsMetrics); + + final Sensor sensor = ProcessorNodeMetrics.recordE2ELatencySensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, RecordingLevel.INFO, streamsMetrics); + + verify(StreamsMetricsImpl.class, streamsMetrics); + assertThat(sensor, is(expectedSensor)); Review comment: req: ```suggestion verifySensor(() -> ProcessorNodeMetrics.recordE2ELatencySensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, RecordingLevel.INFO, streamsMetrics)); ``` ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java ########## @@ -50,6 +51,13 @@ public ProcessorTopology(final List<ProcessorNode<?, ?>> processorNodes, this.globalStateStores = Collections.unmodifiableList(globalStateStores); this.storeToChangelogTopic = Collections.unmodifiableMap(storeToChangelogTopic); this.repartitionTopics = Collections.unmodifiableSet(repartitionTopics); + + this.terminalNodes = new HashSet<>(); + for (final ProcessorNode<?, ?> node : processorNodes) { + if (node.children().isEmpty()) { Review comment: See my comment in `ProcessorContextImpl`. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() { return punctuated; } + void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) { + maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName); + } + + private void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) { + final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName); + if (e2eLatencySensor == null) { + throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName); + } else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) { + final long e2eLatency = now - recordTimestamp; + if (e2eLatency > MAXIMUM_E2E_LATENCY) { Review comment: Q: Wouldn't it be better to count measurements beyond the maximum latency towards the highest bucket as the `Percentiles` metric does? Admittedly, the measured value would be quite wrong in the case of a lot of measurements greater than the maximum latency. However, with the sizes of the buckets that increase linearly, the reported values would be quite wrong anyways due to the increased approximation error. Furthermore, I guess users would put an alert on substantially smaller values. OTOH, not counting measurements beyond the maximum latency would falsify a bit the metric because they would not count towards the remaining 1% or 10% (for p99 and p90, respectively). Additionally, the max metric would also be falsified by not counting those measurements. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java ########## @@ -223,6 +223,9 @@ public StateStore getStateStore(final String name) { final V value) { setCurrentNode(child); child.process(key, value); + if (child.children().isEmpty()) { Review comment: prop: For the sake of readability, could you extract this check to a method named `isTerminalNode()`? Even better would be to add a method named `isTerminalNode()` to `ProcessorNode` and use it here and in `ProcessorTopology`. ########## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ########## @@ -547,6 +624,28 @@ private KafkaMetric getMetric(final String operation, )); } + private KafkaMetric getProcessorMetric(final String operation, + final String nameFormat, + final String taskId, + final String processorNodeId, + final String builtInMetricsVersion) { + final String descriptionIsNotVerified = ""; + return metrics.metrics().get(metrics.metricName( + String.format(nameFormat, operation), + "stream-processor-node-metrics", + descriptionIsNotVerified, + mkMap( + mkEntry("task-id", taskId), + mkEntry("processor-node-id", processorNodeId), + mkEntry( + StreamsConfig.METRICS_LATEST.equals(builtInMetricsVersion) ? THREAD_ID_TAG + : THREAD_ID_TAG_0100_TO_24, + Thread.currentThread().getName() + ) + ) + )); + } + Review comment: prop: Take a look into `StreamsTestUtils` and see what you can re-use there to retrieve specific metrics. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java ########## @@ -149,6 +154,10 @@ public int hashCode() { public static final String RATE_DESCRIPTION_PREFIX = "The average number of "; public static final String RATE_DESCRIPTION_SUFFIX = " per second"; + public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000; // 1 MB + public static long MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000L; // maximum latency is 10 days Review comment: I understand that we need a maximum due to the way the percentiles are approximated. Since the e2e latency depends on user requirements, it would make sense to consider a config for the max latency. I see two reasons for such a config. 1. We always think about near-realtime use cases, but there could also be use cases that are allowed to provide a much higher latency but the latency should still be within a certain limit. For example, one were producers are not always online. Admittedly, 10 days is already quite high. 2. OTOH, decreasing the max latency would also make the metric more accurate, AFAIU. That would also be a reason for a config that users can tweak. For both cases, we could leave it like it is for now and see if there is really the need for such a config. WDYT? ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ########## @@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() { return punctuated; } + void maybeRecordE2ELatency(final long recordTimestamp, final String nodeName) { + maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName); + } + + private void maybeRecordE2ELatency(final long recordTimestamp, final long now, final String nodeName) { + final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName); + if (e2eLatencySensor == null) { + throw new IllegalStateException("Requested to record e2e latency but could not find sensor for node " + nodeName); + } else if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) { + final long e2eLatency = now - recordTimestamp; + if (e2eLatency > MAXIMUM_E2E_LATENCY) { + log.warn("Skipped recording e2e latency for node {} because {} is higher than maximum allowed latency {}", + nodeName, e2eLatency, MAXIMUM_E2E_LATENCY); + } else if (e2eLatency < MINIMUM_E2E_LATENCY) { Review comment: @ableegoldman I agree with your thinking here. IMO, we should just log the warning for now. If we see that there is a demand for such a metric, we can add it later on. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org