[ 
https://issues.apache.org/jira/browse/KAFKA-9048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16992214#comment-16992214
 ] 

Pradeep Bansal commented on KAFKA-9048:
---------------------------------------

When is this change planned in Kafka release?

> Improve scalability in number of partitions in replica fetcher
> --------------------------------------------------------------
>
>                 Key: KAFKA-9048
>                 URL: https://issues.apache.org/jira/browse/KAFKA-9048
>             Project: Kafka
>          Issue Type: Task
>          Components: core
>            Reporter: Lucas Bradstreet
>            Assignee: Guozhang Wang
>            Priority: Major
>
> https://issues.apache.org/jira/browse/KAFKA-9039 
> ([https://github.com/apache/kafka/pull/7443]) improves the performance of the 
> replica fetcher (at both small and large numbers of partitions), but it does 
> not improve its complexity or scalability in the number of partitions.
> I took a profile using async-profiler for the 1000 partition JMH replica 
> fetcher benchmark. The big remaining culprits are:
>  * ~18% looking up logStartOffset
>  * ~45% FetchSessionHandler$Builder.add
>  * ~19% FetchSessionHandler$Builder.build
> *Suggestions*
>  # The logStartOffset is looked up for every partition on each doWork pass. 
> This requires a hashmap lookup even though the logStartOffset changes rarely. 
> If the replica fetcher could be notified of updates to the logStartOffset, 
> then we could reduce the overhead to a function of the number of updates to 
> the logStartOffset instead of O( n ) on each pass.
>  # The use of FetchSessionHandler means that we maintain a partitionStates 
> hashmap in the replica fetcher, and a sessionPartitions hashmap in the 
> FetchSessionHandler. On each incremental fetch session pass, we need to 
> reconcile these two hashmaps to determine which partitions were added/updated 
> and which partitions were removed. This reconciliation process is especially 
> expensive, requiring multiple passes over the fetching partitions, and 
> hashmap remove and puts for most partitions. The replica fetcher could be 
> smarter by maintaining the fetch session *updated* hashmap containing 
> FetchRequest.PartitionData(s) directly, as well as *removed* partitions list 
> so that these do not need to be generated by reconciled on each fetch pass.
>  # maybeTruncate requires an O( n ) pass over the elements in partitionStates 
> even if there are no partitions in truncating state. If we can maintain some 
> additional state about whether truncating partitions exist in 
> partitionStates, or if we could separate these states into a separate data 
> structure, we would not need to iterate across all partitions on every doWork 
> pass. I’ve seen clusters where this work takes about 0.5%-1% of CPU, which is 
> minor but will become more substantial as the number of partitions increases.
> If we can achieve 1 and 2, the complexity will be improved from a function of 
> the number of partitions to the the number of partitions with updated fetch 
> offsets/log start offsets between each fetch. In general, a minority of 
> partitions will have changes in these between fetches, so this should improve 
> the average case complexity greatly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to