[ https://issues.apache.org/jira/browse/KAFKA-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Mayuresh Gharat updated KAFKA-7548: ----------------------------------- Description: Today when we call KafkaConsumer.poll(), it will fetch data from Kafka asynchronously and is put in to a local buffer (completedFetches). If now we pause some TopicPartitions and call KafkaConsumer.poll(), we might throw away any buffered data that we might have in the local buffer for these TopicPartitions. Generally, if an application is calling pause on some TopicPartitions, it is likely to resume those TopicPartitions in near future, which would require KafkaConsumer to re-issue a fetch for the same data that it had buffered earlier for these TopicPartitions. This is a wasted effort from the application's point of view. At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data would improve the performance for stream applications like Samza. We ran a benchmark to compare the "before-fix" and "after-fix" versions. We had a consumer subscribed to 10 partitions of a high volume topic and paused predefined number partitions for every poll call. The partitions to pause were chosen randomly for each poll() call. * Time to run Benchmark = 60 seconds. * MaxPollRecords = 1 * Number of TopicPartition subscribed = 10 ||Number Of Partitions Paused||Number of Records consumed (Before fix)||Number of Records consumed (After fix)|| |9|2087|4884693| was: In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka brokers that is buffered in completedFetch queue. Now if we pause a few partitions, in next call to poll we remove the completedFetches for those paused partitions. Normally, if an application is calling pause on topicPartitions, it is likely to return to those topicPartitions in near future and when it does, with the current design we would have to re-fetch that data. At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data would improve the performance for stream applications like Samza. We ran a benchmark were we compared what is the throughput w.r.t to different values of maxPollRecords. We had a consumer subscribed to 10 partitions of a high volume topic and paused predefined number partitions for every poll call. The partitions to pause were chosen randomly for each poll() call. * Time to run Benchmark = 60 seconds. * MaxPollRecords = 1 * Number of TopicPartition subscribed = 10 ||Number Of Partitions Paused||Number of Records consumed (Before fix)||Number of Records consumed (After fix)|| |9|2087|4884693| > KafkaConsumer should not throw away already fetched data for paused > partitions. > ------------------------------------------------------------------------------- > > Key: KAFKA-7548 > URL: https://issues.apache.org/jira/browse/KAFKA-7548 > Project: Kafka > Issue Type: Improvement > Components: clients > Reporter: Mayuresh Gharat > Assignee: Mayuresh Gharat > Priority: Major > > Today when we call KafkaConsumer.poll(), it will fetch data from Kafka > asynchronously and is put in to a local buffer (completedFetches). > If now we pause some TopicPartitions and call KafkaConsumer.poll(), we might > throw away any buffered data that we might have in the local buffer for these > TopicPartitions. Generally, if an application is calling pause on some > TopicPartitions, it is likely to resume those TopicPartitions in near future, > which would require KafkaConsumer to re-issue a fetch for the same data that > it had buffered earlier for these TopicPartitions. This is a wasted effort > from the application's point of view. > At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data > would improve the performance for stream applications like Samza. We ran a > benchmark to compare the "before-fix" and "after-fix" versions. > We had a consumer subscribed to 10 partitions of a high volume topic and > paused predefined number partitions for every poll call. The partitions to > pause were chosen randomly for each poll() call. > * Time to run Benchmark = 60 seconds. > * MaxPollRecords = 1 > * Number of TopicPartition subscribed = 10 > ||Number Of Partitions Paused||Number of Records consumed (Before > fix)||Number of Records consumed (After fix)|| > |9|2087|4884693| > -- This message was sent by Atlassian JIRA (v7.6.3#76005)