[
https://issues.apache.org/jira/browse/KAFKA-19048?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jialun Peng updated KAFKA-19048:
--------------------------------
Component/s: tools
> Minimal Movement Replica Balancing algorithm
> --------------------------------------------
>
> Key: KAFKA-19048
> URL: https://issues.apache.org/jira/browse/KAFKA-19048
> Project: Kafka
> Issue Type: Improvement
> Components: generator, tools
> Reporter: Jialun Peng
> Assignee: Jialun Peng
> Priority: Major
> Labels: pull-request-available
>
> h2. Motivation
> Kafka clusters typically require rebalancing of topic replicas after
> horizontal scaling to evenly distribute the load across new and existing
> brokers. The current rebalancing approach does not consider the existing
> replica distribution, often resulting in excessive and unnecessary replica
> movements. These unnecessary movements increase rebalance duration, consume
> significant bandwidth and CPU resources, and potentially disrupt ongoing
> production and consumption operations. Thus, a replica rebalancing strategy
> that minimizes movements while achieving an even distribution of replicas is
> necessary.
> h2. Goals
> The proposed approach prioritizes the following objectives:
> # {*}Minimal Movement{*}: Minimize the number of replica relocations during
> rebalancing.
> # {*}Replica Balancing{*}: Ensure that replicas are evenly distributed
> across brokers.
> # {*}Anti-Affinity Support{*}: Support rack-aware allocation when enabled.
> # {*}Leader Balancing{*}: Distribute leader replicas evenly across brokers.
> # {*}ISR Order Optimization{*}: Optimize adjacency relationships to prevent
> failover traffic concentration in case of broker failures.
> # {*}Leader Stability{*}: Where possible, retain existing leader assignments
> to reduce leadership churn, provided this does not compromise the first five
> objectives.
> h2. Proposed Changes
> h3. Rack-Level Replica Distribution
> The following rules ensure balanced replica allocation at the rack level:
> # *When* {{{}*rackCount = replicationFactor*{}}}:
> * Each rack receives exactly {{partitionCount}} replicas.
> **2. *When* {{{}*rackCount > replicationFactor*{}}}:
> * If weighted allocation {{{}(rackBrokers/totalBrokers × totalReplicas) ≥
> partitionCount{}}}: each rack receives exactly {{partitionCount}} replicas.
> * If weighted allocation {{{}< partitionCount{}}}: distribute remaining
> replicas using a weighted remainder allocation.
> h3. Node-Level Replica Distribution
> # If the number of replicas assigned to a rack is not a multiple of the
> number of nodes in that rack, some nodes will host one additional replica
> compared to others.
> # *When* {{{}*rackCount = replicationFactor*{}}}:
> * If all racks have an equal number of nodes, each node will host an equal
> number of replicas.
> * If rack sizes vary, nodes in larger racks will host fewer replicas on
> average.
> **3. *When* {{{}*rackCount > replicationFactor*{}}}:
> * If no rack has a significantly higher node weight, replicas will be evenly
> distributed.
> * If a rack has disproportionately high node weight, those nodes will
> receive fewer replicas.
> h3. Anti-Affinity Support
> When anti-affinity is enabled, the rebalance algorithm ensures that replicas
> of the same partition do not colocate on the same rack. Brokers without rack
> configuration are excluded from anti-affinity checks.
> In this way we can unify the implementation logic of rack-aware and
> non-rack-aware.
>
> *Replica Balancing* *Algorithm*
> Through the above steps, we can calculate the ideal replica count for each
> node and rack.
> Based on the initial replica distribution of topics, we obtain the current
> replica partition allocation across nodes and racks, allowing us to identify
> which nodes violate anti-affinity rules.
> We iterate through nodes with the following priority:
> # First process nodes that violate anti-affinity rules
> # Then process nodes whose current replica count exceeds the desired replica
> count (prioritizing those with the largest discrepancy)
> For these identified nodes, we relocate their replicas to target nodes that:
> * Satisfy all anti-affinity constraints
> * Have a current replica count below their ideal allocation
> This process continues iteratively until:
> * No nodes violate anti-affinity rules
> * All nodes' current replica counts match their desired replica counts
> Upon satisfying these conditions, we achieve balanced replica distribution
> across nodes.
>
> *Leader* *Balancing* *Algorithm*
> *Target Leader Calculation:*
> Compute baseline average: {{leader_avg = total_partitions / total_nodes}}
> Identify broker where {{{}replica_count ≤ leader_avg{}}}:
> * Designate all replicas as leaders on these brokers
> * Subtract allocated leaders: {{remaining_partitions -= assigned_leaders}}
> * Exclude nodes: {{{}remaining_{}}}{{{}broker{}}}{{{}s -=
> processed_brokers{}}}
> Iteratively recalculate {{leader_avg}} until minimum replica nodes satisfy
> {{replica_count ≥ leader_avg}}
> *Leader Assignment Constraints:*
> Final targets:
> * Light {{{}brokers{}}}: {{target_leaders = replica_count}}
> * Normal {{{}broker{}}}s: {{target_leaders = leader_avg}}
>
> For each partition, select the {{broker}} with the largest difference between
> its {{{}target_leaders }}and current leader count to become that partition's
> leader. Upon completing this traversal, we achieve uniform leader
> distribution across all brokers{}}}.
>
> *Optimizing ISR Order*
> During Leader Rebalancing, the leader of each partition is fixed and does not
> change.
> {*}Tracking Node Pair Frequencies{*}:
> * Iterate through all partitions and record the first replica (which is the
> leader).
> * Track the occurrences of broker pairs (broker pairs) formed by the first
> and second replicas of each partition.
> {*}Optimized Selection of Subsequent Replicas{*}:
> * For each partition, when selecting the second replica, choose a broker
> that forms the least frequent node pair with the first replica.
> * Continue this process iteratively for all replicas in the partition.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)