[ 
https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17762735#comment-17762735
 ] 

Atul Jain commented on KAFKA-14597:
-----------------------------------

Hi [~talestonini] , [~cadonna],

Please ignore my last comment, its definitely some different issue. 

Further on this issue, I have gone through the discussions on 
[KIP-613|https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams],
 as well as the PRs for the implementation of the same. I would like to add my 
thoughts here.

As per 
[KIP-613|https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams],
 INFO level metrics were agreed for record-e2e-latency on source and terminal 
operators, and TRACE level metrics for stateful operators. The same was 
implemented in the implementation [PR 
|https://github.com/apache/kafka/pull/8697/files#]. Especially this line of 
[code 
|https://github.com/apache/kafka/pull/8697/files#diff-569d57b69279a7325ec0916f2c942de7ee46185d065c8361d55f28135d8bfbf1R227]computes
 the record-e2e-latency of terminal nodes. 
Till here, the code would have behaved as expected.

I checked git history and found that this logic was later changed on this 
[commit|https://github.com/apache/kafka/pull/8882/files#diff-569d57b69279a7325ec0916f2c942de7ee46185d065c8361d55f28135d8bfbf1R238].
 The 
[commit|https://github.com/apache/kafka/pull/8882/files#diff-569d57b69279a7325ec0916f2c942de7ee46185d065c8361d55f28135d8bfbf1R238]
 addresses the performance issues which occurred due to percentile metrics 
leaking memory.

As per the current implementation in kafka streams, the source and terminal 
operators both use cached system time and therefore, the metrics recorded for 
both source and terminal operators are exactly same. I feel that using the same 
cached time to compute metrics on both source and terminal operator does not 
make sense .

Using actual current system time on terminal node should not cause performance 
issues (It should be negligible). I would prefer to have record-e2e-latency 
measured with actual current system time on terminal operator. It is a useful 
metric to monitor the state of whole streams application.

Kindly let me know if it is desirable to revert the change made 
[here|https://github.com/apache/kafka/pull/8882/files#diff-569d57b69279a7325ec0916f2c942de7ee46185d065c8361d55f28135d8bfbf1R238]
 and use actual current system time instead of cached time. If desirable, I 
would be glad to take it further to raise a pr and contribute.

> [Streams] record-e2e-latency-max is not reporting correct metrics 
> ------------------------------------------------------------------
>
>                 Key: KAFKA-14597
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14597
>             Project: Kafka
>          Issue Type: Bug
>          Components: metrics, streams
>            Reporter: Atul Jain
>            Assignee: Tales Tonini
>            Priority: Major
>         Attachments: image-2023-03-21-15-07-24-352.png, 
> image-2023-03-21-19-01-54-713.png, image-2023-03-21-19-03-07-525.png, 
> image-2023-03-21-19-03-28-625.png, process-latency-max.jpg, 
> record-e2e-latency-max.jpg
>
>
> I was following this KIP documentation 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams])
>  and kafka streams documentation 
> ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end])
>  . Based on these documentations , the *record-e2e-latency-max* should 
> monitor the full end to end latencies, which includes both *consumption 
> latencies* and  {*}processing delays{*}.
> However, based on my observations , record-e2e-latency-max seems to be only 
> measuring the consumption latencies. processing delays can be measured using 
> *process-latency-max* .I am checking all this using a simple topology 
> consisting of source, processors and sink (code added). I have added some 
> sleep time (of 3 seconds) in one of the processors to ensure some delays in 
> the processing logic. These delays are not getting accounted in the 
> record-e2e-latency-max but are accounted in process-latency-max. 
> process-latency-max was observed to be 3002 ms which accounts for sleep time 
> of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, 
> which does not account for 3 seconds of sleep time.
>  
> Code describing my topology:
> {code:java}
>    static Topology buildTopology(String inputTopic, String outputTopic) {
>         log.info("Input topic: " + inputTopic + " and output topic: " + 
> outputTopic);
>         Serde<String> stringSerde = Serdes.String();
>         StreamsBuilder builder = new StreamsBuilder();
>         builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
>                 .peek((k,v) -> log.info("Observed event: key" + k + " value: 
> " + v))
>                 .mapValues(s -> {
>                     try {
>                         System.out.println("sleeping for 3 seconds");
>                         Thread.sleep(3000);
>                     }
>                     catch (InterruptedException e) {
>                         e.printStackTrace();
>                     }
>                     return  s.toUpperCase();
>                 })
>                 .peek((k,v) -> log.info("Transformed event: key" + k + " 
> value: " + v))
>                 .to(outputTopic, Produced.with(stringSerde, stringSerde));
>         return builder.build();
>     } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to