[ 
https://issues.apache.org/jira/browse/KAFKA-14597?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703204#comment-17703204
 ] 

Atul Jain commented on KAFKA-14597:
-----------------------------------

[~talestonini] I have another issue with process-latency [-avg, -max] . Is it 
possible that the observed discrepancies in latencies are due to the reasons 
you have mentioned above?

I will describe my topology here. I have two sources and both these sources are 
writing data to their respective state stores. I have added delays (using 
sleep) of 3 and 1 sec respectively while writing to these state stores. I also 
have a PunctuateProcessor which runs Punctuate(60s Wall Clock Time). The 
punctuate logic checks the data in both state stores and adds them to sink. 
(attached topology diagram)

!image-2023-03-21-19-01-54-713.png!

*expected values of latencies*
process-latency-max = 3sec 
process-latency-avg = avg(3sec, 1sec) = 2sec

*Observed latencies on stream-thread-metrics*

process-latency-max = 2334 ms 
process-latency-avg = 1971.91 ms

!image-2023-03-21-19-03-07-525.png!


*Observed latencies on stream-task-metrics* : (Obtained by setting 
metrics.recording.level=DEBUG)
process-latency-max = 3000 ms 
process-latency-avg = 2034 ms

!image-2023-03-21-19-03-28-625.png!

 

Kindly let me know if it is a different issue and needs to be addressed 
separately. Thanks

> [Streams] record-e2e-latency-max is not reporting correct metrics 
> ------------------------------------------------------------------
>
>                 Key: KAFKA-14597
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14597
>             Project: Kafka
>          Issue Type: Bug
>          Components: metrics, streams
>            Reporter: Atul Jain
>            Assignee: Tales Tonini
>            Priority: Major
>         Attachments: image-2023-03-21-15-07-24-352.png, 
> image-2023-03-21-19-01-54-713.png, image-2023-03-21-19-03-07-525.png, 
> image-2023-03-21-19-03-28-625.png, process-latency-max.jpg, 
> record-e2e-latency-max.jpg
>
>
> I was following this KIP documentation 
> ([https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams])
>  and kafka streams documentation 
> ([https://kafka.apache.org/documentation/#kafka_streams_monitoring:~:text=node%2Did%3D(%5B%2D.%5Cw%5D%2B)-,record%2De2e%2Dlatency%2Dmax,-The%20maximum%20end])
>  . Based on these documentations , the *record-e2e-latency-max* should 
> monitor the full end to end latencies, which includes both *consumption 
> latencies* and  {*}processing delays{*}.
> However, based on my observations , record-e2e-latency-max seems to be only 
> measuring the consumption latencies. processing delays can be measured using 
> *process-latency-max* .I am checking all this using a simple topology 
> consisting of source, processors and sink (code added). I have added some 
> sleep time (of 3 seconds) in one of the processors to ensure some delays in 
> the processing logic. These delays are not getting accounted in the 
> record-e2e-latency-max but are accounted in process-latency-max. 
> process-latency-max was observed to be 3002 ms which accounts for sleep time 
> of 3 seconds. However, record-e2e-latency-max observed in jconsole is 422ms, 
> which does not account for 3 seconds of sleep time.
>  
> Code describing my topology:
> {code:java}
>    static Topology buildTopology(String inputTopic, String outputTopic) {
>         log.info("Input topic: " + inputTopic + " and output topic: " + 
> outputTopic);
>         Serde<String> stringSerde = Serdes.String();
>         StreamsBuilder builder = new StreamsBuilder();
>         builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde))
>                 .peek((k,v) -> log.info("Observed event: key" + k + " value: 
> " + v))
>                 .mapValues(s -> {
>                     try {
>                         System.out.println("sleeping for 3 seconds");
>                         Thread.sleep(3000);
>                     }
>                     catch (InterruptedException e) {
>                         e.printStackTrace();
>                     }
>                     return  s.toUpperCase();
>                 })
>                 .peek((k,v) -> log.info("Transformed event: key" + k + " 
> value: " + v))
>                 .to(outputTopic, Produced.with(stringSerde, stringSerde));
>         return builder.build();
>     } {code}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to