Lucas Bradstreet created KAFKA-9048:
---------------------------------------

             Summary: Improve partition scalability in replica fetcher
                 Key: KAFKA-9048
                 URL: https://issues.apache.org/jira/browse/KAFKA-9048
             Project: Kafka
          Issue Type: Task
          Components: core
            Reporter: Lucas Bradstreet


https://issues.apache.org/jira/browse/KAFKA-9039 
([https://github.com/apache/kafka/pull/7443]) improves the performance of the 
replica fetcher (at both small and large numbers of partitions), but it does 
not improve its complexity or scalability in the number of partitions.

I took a profile using async-profiler for the 1000 partition JMH replica 
fetcher benchmark. The big remaining culprits are:
 * ~18% looking up logStartOffset
 * ~45% FetchSessionHandler$Builder.add
 * ~19% FetchSessionHandler$Builder.build

*Suggestions*
 #  The logStartOffset is looked up for every partition on each doWork pass. 
This requires a hashmap lookup even though the logStartOffset changes rarely. 
If the replica fetcher could be notified of updates to the logStartOffset, then 
we could reduce the overhead to a function of the number of updates to the 
logStartOffset instead of O(n) on each pass.
 #  The use of FetchSessionHandler means that we maintain a partitionStates 
hashmap in the replica fetcher, and a sessionPartitions hashmap in the 
FetchSessionHandler. On each incremental fetch session pass, we need to 
reconcile these two hashmaps to determine which partitions were added/updated 
and which partitions were removed. This reconciliation process is especially 
expensive, requiring multiple passes over the fetching partitions, and hashmap 
remove and puts for most partitions. The replica fetcher could be smarter by 
maintaining the fetch session *updated* hashmap containing 
FetchRequest.PartitionData(s) directly, as well as *removed* partitions list so 
that these do not need to be generated by reconciled on each fetch pass.
 #  maybeTruncate requires an O(n) pass over the elements in partitionStates 
even if there are no partitions in truncating state. If we can maintain some 
additional state about whether truncating partitions exist in partitionStates, 
or if we could separate these states into a separate data structure, we would 
not need to iterate across all partitions on every doWork pass. I’ve seen 
clusters where this work takes about 0.5%-1% of CPU, which is minor but will 
become more substantial as the number of partitions increases.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to