[ 
https://issues.apache.org/jira/browse/KAFKA-16811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851160#comment-17851160
 ] 

Ganesh Sadanala edited comment on KAFKA-16811 at 6/1/24 7:26 PM:
-----------------------------------------------------------------

[~sebviale] [~rohanpd] I have completed the implementation using the 
SlidingWindow approach with x=30 seconds for testing. Here are the changes: 
https://github.com/apache/kafka/pull/16162

 

I have followed these steps to test the changes, but I still see the 
puncutate-ratio as zero for all the instances of example Demo class. 
 # Start ZooKeeper, Kafka Broker.
 # Created input and output topics with 3 partitions (for the sake of having 
active tasks distributed to multiple instances of WordCountProcessorDemo stream 
class) 
 # 
{code:java}
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
streams-plaintext-input --partitions 3 --replication-factor 1

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
streams-wordcount-output --partitions 3 --replication-factor 1 {code}
4. Run the 3 instances of Kafka Streams Demo Application in different 
terminals/processors:

 # 
{code:java}
bin/kafka-run-class.sh 
org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo {code}
5. Produce and consume data

 # 
{code:java}
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
streams-plaintext-input

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 
streams-wordcount-output --from-beginning{code}

       6. Open the jconsole and watch the metrics

 

I see that all the metrics are getting calculated. When I run the debugger, I 
see that in this code tasks.activeTasks() is an empty list. Because of that 
punctuated values is becoming zero, hence the punctuate ratio.

TaskExecutor.java
{code:java}
int punctuate() {
        int punctuated = 0;

        for (final Task task : tasks.activeTasks()) {
            try {
                if (executionMetadata.canPunctuateTask(task)) {
                    if (task.maybePunctuateStreamTime()) {
                        punctuated++;
                    }
                    if (task.maybePunctuateSystemTime()) {
                        punctuated++;
                    }
                }
            } catch (final TaskMigratedException e) {
                log.info("Failed to punctuate stream task {} since it got 
migrated to another thread already. " +
                    "Will trigger a new rebalance and close all tasks as 
zombies together.", task.id());
                throw e;
            } catch (final StreamsException e) {
                log.error("Failed to punctuate stream task {} due to the 
following error:", task.id(), e);
                e.setTaskId(task.id());
                throw e;
            } catch (final KafkaException e) {
                log.error("Failed to punctuate stream task {} due to the 
following error:", task.id(), e);
                throw new StreamsException(e, task.id());
            }
        }

        return punctuated;
    }
} {code}
Is there a way to make active tasks list non-empty, thus I can test the changes 
and write some unit tests?

 

Is this behaviour normal in the local environment?


was (Author: JIRAUSER305566):
[~sebviale] [~rohanpd] I have completed the implementation using the 
SlidingWindow approach with x=30 seconds for testing. Here are the changes: 
[https://github.com/ganesh-sadanala/kafka/commit/20873613b4f94cea5de32382e7e30cde25ef4c99]

 

I have followed these steps to test the changes, but I still see the 
puncutate-ratio as zero for all the instances of example Demo class. 
 # Start ZooKeeper, Kafka Broker.
 # Created input and output topics with 3 partitions (for the sake of having 
active tasks distributed to multiple instances of WordCountProcessorDemo stream 
class) 
 # 
{code:java}
bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
streams-plaintext-input --partitions 3 --replication-factor 1

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic 
streams-wordcount-output --partitions 3 --replication-factor 1 {code}
4. Run the 3 instances of Kafka Streams Demo Application in different 
terminals/processors:

 # 
{code:java}
bin/kafka-run-class.sh 
org.apache.kafka.streams.examples.wordcount.WordCountProcessorDemo {code}
5. Produce and consume data

 # 
{code:java}
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic 
streams-plaintext-input

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 
streams-wordcount-output --from-beginning{code}

       6. Open the jconsole and watch the metrics

 

I see that all the metrics are getting calculated. When I run the debugger, I 
see that in this code tasks.activeTasks() is an empty list. Because of that 
punctuated values is becoming zero, hence the punctuate ratio.

TaskExecutor.java
{code:java}
int punctuate() {
        int punctuated = 0;

        for (final Task task : tasks.activeTasks()) {
            try {
                if (executionMetadata.canPunctuateTask(task)) {
                    if (task.maybePunctuateStreamTime()) {
                        punctuated++;
                    }
                    if (task.maybePunctuateSystemTime()) {
                        punctuated++;
                    }
                }
            } catch (final TaskMigratedException e) {
                log.info("Failed to punctuate stream task {} since it got 
migrated to another thread already. " +
                    "Will trigger a new rebalance and close all tasks as 
zombies together.", task.id());
                throw e;
            } catch (final StreamsException e) {
                log.error("Failed to punctuate stream task {} due to the 
following error:", task.id(), e);
                e.setTaskId(task.id());
                throw e;
            } catch (final KafkaException e) {
                log.error("Failed to punctuate stream task {} due to the 
following error:", task.id(), e);
                throw new StreamsException(e, task.id());
            }
        }

        return punctuated;
    }
} {code}
Is there a way to make active tasks list non-empty, thus I can test the changes 
and write some unit tests?

 

Is this behaviour normal in the local environment?

> Punctuate Ratio metric almost impossible to track
> -------------------------------------------------
>
>                 Key: KAFKA-16811
>                 URL: https://issues.apache.org/jira/browse/KAFKA-16811
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 3.7.0
>            Reporter: Sebastien Viale
>            Assignee: Ganesh Sadanala
>            Priority: Minor
>          Time Spent: 48h
>  Remaining Estimate: 0h
>
> The Punctuate ratio metric is returned after the last record of the poll 
> loop. It is recomputed in every poll loop.
> After a puntuate, the value is close to 1, but there is little chance that 
> metric is sampled at this time. 
> So its value is almost always 0.   
> A solution could be to apply a kind of "sliding window" to it and report the 
> value for the last x seconds.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to