[ https://issues.apache.org/jira/browse/KAFKA-9048?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16992214#comment-16992214 ]
Pradeep Bansal commented on KAFKA-9048: --------------------------------------- When is this change planned in Kafka release? > Improve scalability in number of partitions in replica fetcher > -------------------------------------------------------------- > > Key: KAFKA-9048 > URL: https://issues.apache.org/jira/browse/KAFKA-9048 > Project: Kafka > Issue Type: Task > Components: core > Reporter: Lucas Bradstreet > Assignee: Guozhang Wang > Priority: Major > > https://issues.apache.org/jira/browse/KAFKA-9039 > ([https://github.com/apache/kafka/pull/7443]) improves the performance of the > replica fetcher (at both small and large numbers of partitions), but it does > not improve its complexity or scalability in the number of partitions. > I took a profile using async-profiler for the 1000 partition JMH replica > fetcher benchmark. The big remaining culprits are: > * ~18% looking up logStartOffset > * ~45% FetchSessionHandler$Builder.add > * ~19% FetchSessionHandler$Builder.build > *Suggestions* > # The logStartOffset is looked up for every partition on each doWork pass. > This requires a hashmap lookup even though the logStartOffset changes rarely. > If the replica fetcher could be notified of updates to the logStartOffset, > then we could reduce the overhead to a function of the number of updates to > the logStartOffset instead of O( n ) on each pass. > # The use of FetchSessionHandler means that we maintain a partitionStates > hashmap in the replica fetcher, and a sessionPartitions hashmap in the > FetchSessionHandler. On each incremental fetch session pass, we need to > reconcile these two hashmaps to determine which partitions were added/updated > and which partitions were removed. This reconciliation process is especially > expensive, requiring multiple passes over the fetching partitions, and > hashmap remove and puts for most partitions. The replica fetcher could be > smarter by maintaining the fetch session *updated* hashmap containing > FetchRequest.PartitionData(s) directly, as well as *removed* partitions list > so that these do not need to be generated by reconciled on each fetch pass. > # maybeTruncate requires an O( n ) pass over the elements in partitionStates > even if there are no partitions in truncating state. If we can maintain some > additional state about whether truncating partitions exist in > partitionStates, or if we could separate these states into a separate data > structure, we would not need to iterate across all partitions on every doWork > pass. I’ve seen clusters where this work takes about 0.5%-1% of CPU, which is > minor but will become more substantial as the number of partitions increases. > If we can achieve 1 and 2, the complexity will be improved from a function of > the number of partitions to the the number of partitions with updated fetch > offsets/log start offsets between each fetch. In general, a minority of > partitions will have changes in these between fetches, so this should improve > the average case complexity greatly. -- This message was sent by Atlassian Jira (v8.3.4#803005)