cadonna commented on a change in pull request #8697:
URL: https://github.com/apache/kafka/pull/8697#discussion_r429814548



##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java
##########
@@ -260,6 +261,43 @@ public void 
shouldGetProcessAtSourceSensorOrForwardSensor() {
         }
     }
 
+    @Test
+    public void shouldGetRecordE2ELatencySensor() {
+        final String operation = "record-e2e-latency";
+        final String recordE2ELatencyMinDescription =
+            "The minimum end-to-end latency of a record, measuring by 
comparing the record timestamp with the "
+                + "system time when it has been fully processed by the node";
+        final String recordE2ELatencyMaxDescription =
+            "The maximum end-to-end latency of a record, measuring by 
comparing the record timestamp with the "
+                + "system time when it has been fully processed by the node";
+        final String recordE2ELatencyP99Description =
+            "The 99th percentile end-to-end latency of a record, measuring by 
comparing the record timestamp with the "
+                + "system time when it has been fully processed by the node";
+        final String recordE2ELatencyP90Description =
+            "The 90th percentile end-to-end latency of a record, measuring by 
comparing the record timestamp with the "
+                + "system time when it has been fully processed by the node";
+        expect(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID, operation, RecordingLevel.INFO))
+            .andReturn(expectedSensor);
+        expect(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID)).andReturn(tagMap);
+        StreamsMetricsImpl.addMinAndMaxAndP99AndP90ToSensor(
+            expectedSensor,
+            PROCESSOR_NODE_LEVEL_GROUP,
+            tagMap,
+            operation,
+            recordE2ELatencyMinDescription,
+            recordE2ELatencyMaxDescription,
+            recordE2ELatencyP99Description,
+            recordE2ELatencyP90Description
+        );
+
+        replay(StreamsMetricsImpl.class, streamsMetrics);
+
+        final Sensor sensor = 
ProcessorNodeMetrics.recordE2ELatencySensor(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID, RecordingLevel.INFO, streamsMetrics);
+
+        verify(StreamsMetricsImpl.class, streamsMetrics);
+        assertThat(sensor, is(expectedSensor));

Review comment:
       req:
   ```suggestion
           verifySensor(() -> 
ProcessorNodeMetrics.recordE2ELatencySensor(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID, RecordingLevel.INFO, streamsMetrics));
   ```

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorTopology.java
##########
@@ -50,6 +51,13 @@ public ProcessorTopology(final List<ProcessorNode<?, ?>> 
processorNodes,
         this.globalStateStores = 
Collections.unmodifiableList(globalStateStores);
         this.storeToChangelogTopic = 
Collections.unmodifiableMap(storeToChangelogTopic);
         this.repartitionTopics = 
Collections.unmodifiableSet(repartitionTopics);
+
+        this.terminalNodes = new HashSet<>();
+        for (final ProcessorNode<?, ?> node : processorNodes) {
+            if (node.children().isEmpty()) {

Review comment:
       See my comment in `ProcessorContextImpl`.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() {
         return punctuated;
     }
 
+    void maybeRecordE2ELatency(final long recordTimestamp, final String 
nodeName) {
+        maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName);
+    }
+
+    private void maybeRecordE2ELatency(final long recordTimestamp, final long 
now, final String nodeName) {
+        final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
+        if (e2eLatencySensor == null) {
+            throw new IllegalStateException("Requested to record e2e latency 
but could not find sensor for node " + nodeName);
+        } else if (e2eLatencySensor.shouldRecord() && 
e2eLatencySensor.hasMetrics()) {
+            final long e2eLatency = now - recordTimestamp;
+            if (e2eLatency >  MAXIMUM_E2E_LATENCY) {

Review comment:
       Q: Wouldn't it be better to count measurements beyond the maximum 
latency towards the highest bucket as the `Percentiles` metric does?
   Admittedly, the measured value would be quite wrong in the case of a lot of 
measurements greater than the maximum latency. However, with the sizes of the 
buckets that increase linearly, the reported values would be quite wrong 
anyways due to the increased approximation error. Furthermore, I guess users 
would put an alert on substantially smaller values.
   OTOH, not counting measurements beyond the maximum latency would falsify a 
bit the metric because they would not count towards the remaining 1% or 10% 
(for p99 and p90, respectively). Additionally, the max metric would also be 
falsified by not counting those measurements.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
##########
@@ -223,6 +223,9 @@ public StateStore getStateStore(final String name) {
                                 final V value) {
         setCurrentNode(child);
         child.process(key, value);
+        if (child.children().isEmpty()) {

Review comment:
       prop: For the sake of readability, could you extract this check to a 
method named `isTerminalNode()`? Even better would be to add a method named 
`isTerminalNode()` to `ProcessorNode` and use it here and in 
`ProcessorTopology`.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##########
@@ -547,6 +624,28 @@ private KafkaMetric getMetric(final String operation,
         ));
     }
 
+    private KafkaMetric getProcessorMetric(final String operation,
+                                           final String nameFormat,
+                                           final String taskId,
+                                           final String processorNodeId,
+                                           final String builtInMetricsVersion) 
{
+        final String descriptionIsNotVerified = "";
+        return metrics.metrics().get(metrics.metricName(
+            String.format(nameFormat, operation),
+            "stream-processor-node-metrics",
+            descriptionIsNotVerified,
+            mkMap(
+                mkEntry("task-id", taskId),
+                mkEntry("processor-node-id", processorNodeId),
+                mkEntry(
+                    StreamsConfig.METRICS_LATEST.equals(builtInMetricsVersion) 
? THREAD_ID_TAG
+                        : THREAD_ID_TAG_0100_TO_24,
+                    Thread.currentThread().getName()
+                )
+            )
+        ));
+    }
+

Review comment:
       prop: Take a look into `StreamsTestUtils` and see what you can re-use 
there to retrieve specific metrics.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java
##########
@@ -149,6 +154,10 @@ public int hashCode() {
     public static final String RATE_DESCRIPTION_PREFIX = "The average number 
of ";
     public static final String RATE_DESCRIPTION_SUFFIX = " per second";
 
+    public static final int PERCENTILES_SIZE_IN_BYTES = 1000 * 1000;    // 1 MB
+    public static long MAXIMUM_E2E_LATENCY = 10 * 24 * 60 * 60 * 1000L; // 
maximum latency is 10 days

Review comment:
       I understand that we need a maximum due to the way the percentiles are 
approximated. Since the e2e latency depends on user requirements, it would make 
sense to consider a config for the max latency. I see two reasons for such a 
config. 
   
   1. We always think about near-realtime use cases, but there could also be 
use cases that are allowed to provide a much higher latency but the latency 
should still be within a certain limit. For example, one were producers are not 
always online. Admittedly, 10 days is already quite high.
   
   2. OTOH, decreasing the max latency would also make the metric more 
accurate, AFAIU. That would also be a reason for a config that users can tweak.
   
   For both cases, we could leave it like it is for now and see if there is 
really the need for such a config. WDYT? 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
##########
@@ -899,6 +920,28 @@ public boolean maybePunctuateSystemTime() {
         return punctuated;
     }
 
+    void maybeRecordE2ELatency(final long recordTimestamp, final String 
nodeName) {
+        maybeRecordE2ELatency(recordTimestamp, time.milliseconds(), nodeName);
+    }
+
+    private void maybeRecordE2ELatency(final long recordTimestamp, final long 
now, final String nodeName) {
+        final Sensor e2eLatencySensor = e2eLatencySensors.get(nodeName);
+        if (e2eLatencySensor == null) {
+            throw new IllegalStateException("Requested to record e2e latency 
but could not find sensor for node " + nodeName);
+        } else if (e2eLatencySensor.shouldRecord() && 
e2eLatencySensor.hasMetrics()) {
+            final long e2eLatency = now - recordTimestamp;
+            if (e2eLatency >  MAXIMUM_E2E_LATENCY) {
+                log.warn("Skipped recording e2e latency for node {} because {} 
is higher than maximum allowed latency {}",
+                         nodeName, e2eLatency, MAXIMUM_E2E_LATENCY);
+            } else if (e2eLatency < MINIMUM_E2E_LATENCY) {

Review comment:
       @ableegoldman I agree with your thinking here. IMO, we should just log 
the warning for now. If we see that there is a demand for such a metric, we can 
add it later on.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to