[ https://issues.apache.org/jira/browse/KAFKA-16811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851160#comment-17851160 ]
Ganesh Sadanala edited comment on KAFKA-16811 at 6/1/24 7:26 PM: ----------------------------------------------------------------- [~sebviale] [~rohanpd] I have completed the implementation using the SlidingWindow approach with x=30 seconds for testing. Here are the changes: https://github.com/apache/kafka/pull/16162 I have followed these steps to test the changes, but I still see the puncutate-ratio as zero for all the instances of example Demo class. # Start ZooKeeper, Kafka Broker. # Created input and output topics with 3 partitions (for the sake of having active tasks distributed to multiple instances of WordCountProcessorDemo stream class) # {code:java} bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic streams-plaintext-input --partitions 3 --replication-factor 1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic streams-wordcount-output --partitions 3 --replication-factor 1 {code} 4. Run the 3 instances of Kafka Streams Demo Application in different terminals/processors: # {code:java} bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo {code} 5. Produce and consume data # {code:java} bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning{code} 6. Open the jconsole and watch the metrics I see that all the metrics are getting calculated. When I run the debugger, I see that in this code tasks.activeTasks() is an empty list. Because of that punctuated values is becoming zero, hence the punctuate ratio. TaskExecutor.java {code:java} int punctuate() { int punctuated = 0; for (final Task task : tasks.activeTasks()) { try { if (executionMetadata.canPunctuateTask(task)) { if (task.maybePunctuateStreamTime()) { punctuated++; } if (task.maybePunctuateSystemTime()) { punctuated++; } } } catch (final TaskMigratedException e) { log.info("Failed to punctuate stream task {} since it got migrated to another thread already. " + "Will trigger a new rebalance and close all tasks as zombies together.", task.id()); throw e; } catch (final StreamsException e) { log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e); e.setTaskId(task.id()); throw e; } catch (final KafkaException e) { log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e); throw new StreamsException(e, task.id()); } } return punctuated; } } {code} Is there a way to make active tasks list non-empty, thus I can test the changes and write some unit tests? Is this behaviour normal in the local environment? was (Author: JIRAUSER305566): [~sebviale] [~rohanpd] I have completed the implementation using the SlidingWindow approach with x=30 seconds for testing. Here are the changes: [https://github.com/ganesh-sadanala/kafka/commit/20873613b4f94cea5de32382e7e30cde25ef4c99] I have followed these steps to test the changes, but I still see the puncutate-ratio as zero for all the instances of example Demo class. # Start ZooKeeper, Kafka Broker. # Created input and output topics with 3 partitions (for the sake of having active tasks distributed to multiple instances of WordCountProcessorDemo stream class) # {code:java} bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic streams-plaintext-input --partitions 3 --replication-factor 1 bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic streams-wordcount-output --partitions 3 --replication-factor 1 {code} 4. Run the 3 instances of Kafka Streams Demo Application in different terminals/processors: # {code:java} bin/kafka-run-class.sh org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo {code} 5. Produce and consume data # {code:java} bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic streams-plaintext-input bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic streams-wordcount-output --from-beginning{code} 6. Open the jconsole and watch the metrics I see that all the metrics are getting calculated. When I run the debugger, I see that in this code tasks.activeTasks() is an empty list. Because of that punctuated values is becoming zero, hence the punctuate ratio. TaskExecutor.java {code:java} int punctuate() { int punctuated = 0; for (final Task task : tasks.activeTasks()) { try { if (executionMetadata.canPunctuateTask(task)) { if (task.maybePunctuateStreamTime()) { punctuated++; } if (task.maybePunctuateSystemTime()) { punctuated++; } } } catch (final TaskMigratedException e) { log.info("Failed to punctuate stream task {} since it got migrated to another thread already. " + "Will trigger a new rebalance and close all tasks as zombies together.", task.id()); throw e; } catch (final StreamsException e) { log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e); e.setTaskId(task.id()); throw e; } catch (final KafkaException e) { log.error("Failed to punctuate stream task {} due to the following error:", task.id(), e); throw new StreamsException(e, task.id()); } } return punctuated; } } {code} Is there a way to make active tasks list non-empty, thus I can test the changes and write some unit tests? Is this behaviour normal in the local environment? > Punctuate Ratio metric almost impossible to track > ------------------------------------------------- > > Key: KAFKA-16811 > URL: https://issues.apache.org/jira/browse/KAFKA-16811 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 3.7.0 > Reporter: Sebastien Viale > Assignee: Ganesh Sadanala > Priority: Minor > Time Spent: 48h > Remaining Estimate: 0h > > The Punctuate ratio metric is returned after the last record of the poll > loop. It is recomputed in every poll loop. > After a puntuate, the value is close to 1, but there is little chance that > metric is sampled at this time. > So its value is almost always 0. > A solution could be to apply a kind of "sliding window" to it and report the > value for the last x seconds. -- This message was sent by Atlassian Jira (v8.20.10#820010)