[ 
https://issues.apache.org/jira/browse/KAFKA-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant reassigned KAFKA-14437:
------------------------------------

    Assignee: Andrew Grant

> Enhance StripedReplicaPlacer to account for existing partition assignments
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-14437
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14437
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Andrew Grant
>            Assignee: Andrew Grant
>            Priority: Major
>
> Currently, in StripedReplicaPlacer we don’t take existing partition 
> assignments into consideration when the place method is called. This means 
> for new partitions added, they may get the same assignments as existing 
> partitions. This differs from AdminUtils, which has some logic to try and 
> shift where in the list of brokers we start making assignments from for new 
> partitions added.
> For example, lets say we had the following
> {code:java}
> Rack 1: 0, 1, 2, 3
> Rack 2: 4, 5, 6, 7
> Rack 3: 8, 9, 10, 11
> {code}
> CreateTopics might return the following assignment for two partitions:
> {code:java}
> P0: 6, 8, 2
> P1: 9, 3, 7
> {code}
> If the user then calls CreatePartitions increasing the partition count to 4, 
> StripedReplicaPlacer does not take into account P0 and P1. It creates a 
> random rack offset and a random broker offset. So it could easily create the 
> same assignment for P3 and P4 that it created for P0 and P1. This is easily 
> reproduced in a unit test.
> My suggestion is to enhance StripedReplicaPlacer to account for existing 
> partition assignments. Intuitively, we’d like to make assignments for added 
> partitions from “where we left off” when we were making the previous 
> assignments. In practice, its not possible to know exactly what the state was 
> during the previous partition assignments because, for example, brokers 
> fencing state may have changed. But I do think we can make a best effort 
> attempt to do so that is optimized for the common case where most brokers are 
> unfenced. Note, all the changes suggested below only will affect 
> StripedReplicaPlacer when place is called and there are existing partition 
> assignments, which happens when its servicing CreatePartitions requests. If 
> there are no existing partition assignments, which happens during 
> CreateTopics, the logic is unchanged.
> First, we need to update ClusterDescriber to:
> {code:java}
> public interface ClusterDescriber {
>     /**
>      * Get an iterator through the usable brokers.
>      */
>     Iterator<UsableBroker> usableBrokers();
>     List<List<Integer>> replicasForTopicName(String topicName);
> }
> {code}
> The replicasForTopicName returns the existing partition assignments. This 
> will enable StripedReplicaPlacer to know about existing partition assignments 
> when they exist.
> When place is called, some initialization is done in both RackList and 
> BrokerList. One thing that is initialized is the offset variable - this is a 
> variable used in both RackList and BrokerList that determines where in the 
> list of either racks or brokers respectively we should start from when making 
> the next assignment. Currently, it is initialized to a random value, based 
> off the size of the list. 
> I suggest we add some logic during initialization that sets the offset for 
> both RackList and BrokerList to a value based off the previous assignments.
> Consider again the following rack metadata and existing assignments:
> {code:java}
> Rack 1: 0, 1, 2, 3
> Rack 2: 4, 5, 6, 7
> Rack 3: 8, 9, 10, 11
>  
> P0: 6, 8, 2
> P1: 9, 3, 7  
> {code}
> Lets imagine a user wants to create a new partition, P3. 
> First, we need to determine which rack to start from for P3: this corresponds 
> to the initial offset in RackList. We can look at the leader of P1 (not P0 
> because P1 is the “last” partition we made an assignment for) and see its on 
> rack 3. So, the next rack we should start from should be rack 1. This means 
> we set offset in RackList to 0, instead of a random value, during 
> initialization. 
> Second, we need to determine which broker to start from {_}per rack{_}: this 
> corresponds to the initial offset in BrokerList. We can look at all the 
> existing partition assignments, P0 and P1 in our example, and _per rack_ 
> infer the last offset started from during previous assignments. For each 
> rack, we do this by iterating through each partition, in reverse order 
> because we care about the most recent starting position, and try to find the 
> first broker in the assignment from the rack. This enables us to know where 
> we last started from when making an assignment for that rack, which can be 
> used to determine where to continue on from within that rack.
> So in our example, for rack 1 we can see the last broker we started from was 
> broker 3 in P1: so the next broker we should choose for that rack should be 0 
> which means the initial offset is set to 0 in the BrokerList for rack 1 
> during initialization. For rack 2 we can see the last broker we started with 
> was broker 7 in P1: so the next broker should be 4 which means the offset is 
> 0 in the BrokerList for rack 2. For rack 3 we can see the last broker we 
> started with was broker 9 in P1: so the next broker should be 10 which means 
> the offset is 2 in the BrokerList for rack 3.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to