[ 
https://issues.apache.org/jira/browse/KAFKA-7548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mayuresh Gharat updated KAFKA-7548:
-----------------------------------
    Description: 
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, in next call to poll we remove the completedFetches for those 
paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
predefined number partitions for every poll call. The partitions to pause were 
chosen randomly for each poll() call. We ran this benchmark multiple times 
pausing different number for of partitions for each run. Here are the results :

 
 * Time to run Benchmark = 60 seconds.
 * MaxPollRecords = 1
 * Number of TopicPartition subscribed  = 10

*Before fix*

 
||Number Of Partitions Paused||Number of Records consumed||
|0|6424753|
|2|10495|
|4|5004|
|6|3152|
|8|2237|
|9|2087|

 
 *After fix*
||Number Of Partitions Paused||Number of Records consumed||
|0|5846512|
|2|5269557|
|4|5213496|
|6|4519645|
|8|4383300|
|9|4884693|

 
  

  was:
In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
brokers that is buffered in completedFetch queue. Now if we pause a few 
partitions, it seems that in next call to poll we remove the completedFetches 
for those paused partitions. Normally, if an application is calling pause on 
topicPartitions, it is likely to return to those topicPartitions in near future 
and when it does, with the current design we would have to re-fetch that data.

At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
would improve the performance for stream applications like Samza. We ran a 
benchmark were we compared what is the throughput w.r.t to different values of 
maxPollRecords.

We had a consumer subscribed to 10 partitions of a high volume topic and paused 
different number of partitions for every poll call. The partitions to pause 
were chosen randomly for each poll() call. Here are the results :

 
 * Time to run Benchmark = 60 seconds.
 * MaxPollRecords = 1
 * Number of TopicPartition subscribed  = 10

*Before fix* 

 
||Number Of Partitions Paused||Number of Records consumed ||
|0|6424753|
|2|10495|
|4|5004|
|6|3152|
|8|2237|
|9|2087|

 
 *After fix* 
||Number Of Partitions Paused||Number of Records consumed||
|0|5846512|
|2|5269557|
|4|5213496|
|6|4519645|
|8|4383300|
|9|4884693|

 
 


> KafkaConsumer should not throw away already fetched data for paused 
> partitions.
> -------------------------------------------------------------------------------
>
>                 Key: KAFKA-7548
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7548
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: Mayuresh Gharat
>            Assignee: Mayuresh Gharat
>            Priority: Major
>
> In KafkaConsumer, when we do a poll, we fetch data asynchronously from kafka 
> brokers that is buffered in completedFetch queue. Now if we pause a few 
> partitions, in next call to poll we remove the completedFetches for those 
> paused partitions. Normally, if an application is calling pause on 
> topicPartitions, it is likely to return to those topicPartitions in near 
> future and when it does, with the current design we would have to re-fetch 
> that data.
> At Linkedin, we made a hotfix to see if NOT throwing away the prefetched data 
> would improve the performance for stream applications like Samza. We ran a 
> benchmark were we compared what is the throughput w.r.t to different values 
> of maxPollRecords.
> We had a consumer subscribed to 10 partitions of a high volume topic and 
> paused predefined number partitions for every poll call. The partitions to 
> pause were chosen randomly for each poll() call. We ran this benchmark 
> multiple times pausing different number for of partitions for each run. Here 
> are the results :
>  
>  * Time to run Benchmark = 60 seconds.
>  * MaxPollRecords = 1
>  * Number of TopicPartition subscribed  = 10
> *Before fix*
>  
> ||Number Of Partitions Paused||Number of Records consumed||
> |0|6424753|
> |2|10495|
> |4|5004|
> |6|3152|
> |8|2237|
> |9|2087|
>  
>  *After fix*
> ||Number Of Partitions Paused||Number of Records consumed||
> |0|5846512|
> |2|5269557|
> |4|5213496|
> |6|4519645|
> |8|4383300|
> |9|4884693|
>  
>   



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to