[ https://issues.apache.org/jira/browse/KAFKA-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Andrew Grant reassigned KAFKA-14437: ------------------------------------ Assignee: Andrew Grant > Enhance StripedReplicaPlacer to account for existing partition assignments > -------------------------------------------------------------------------- > > Key: KAFKA-14437 > URL: https://issues.apache.org/jira/browse/KAFKA-14437 > Project: Kafka > Issue Type: Improvement > Reporter: Andrew Grant > Assignee: Andrew Grant > Priority: Major > > Currently, in StripedReplicaPlacer we don’t take existing partition > assignments into consideration when the place method is called. This means > for new partitions added, they may get the same assignments as existing > partitions. This differs from AdminUtils, which has some logic to try and > shift where in the list of brokers we start making assignments from for new > partitions added. > For example, lets say we had the following > {code:java} > Rack 1: 0, 1, 2, 3 > Rack 2: 4, 5, 6, 7 > Rack 3: 8, 9, 10, 11 > {code} > CreateTopics might return the following assignment for two partitions: > {code:java} > P0: 6, 8, 2 > P1: 9, 3, 7 > {code} > If the user then calls CreatePartitions increasing the partition count to 4, > StripedReplicaPlacer does not take into account P0 and P1. It creates a > random rack offset and a random broker offset. So it could easily create the > same assignment for P3 and P4 that it created for P0 and P1. This is easily > reproduced in a unit test. > My suggestion is to enhance StripedReplicaPlacer to account for existing > partition assignments. Intuitively, we’d like to make assignments for added > partitions from “where we left off” when we were making the previous > assignments. In practice, its not possible to know exactly what the state was > during the previous partition assignments because, for example, brokers > fencing state may have changed. But I do think we can make a best effort > attempt to do so that is optimized for the common case where most brokers are > unfenced. Note, all the changes suggested below only will affect > StripedReplicaPlacer when place is called and there are existing partition > assignments, which happens when its servicing CreatePartitions requests. If > there are no existing partition assignments, which happens during > CreateTopics, the logic is unchanged. > First, we need to update ClusterDescriber to: > {code:java} > public interface ClusterDescriber { > /** > * Get an iterator through the usable brokers. > */ > Iterator<UsableBroker> usableBrokers(); > List<List<Integer>> replicasForTopicName(String topicName); > } > {code} > The replicasForTopicName returns the existing partition assignments. This > will enable StripedReplicaPlacer to know about existing partition assignments > when they exist. > When place is called, some initialization is done in both RackList and > BrokerList. One thing that is initialized is the offset variable - this is a > variable used in both RackList and BrokerList that determines where in the > list of either racks or brokers respectively we should start from when making > the next assignment. Currently, it is initialized to a random value, based > off the size of the list. > I suggest we add some logic during initialization that sets the offset for > both RackList and BrokerList to a value based off the previous assignments. > Consider again the following rack metadata and existing assignments: > {code:java} > Rack 1: 0, 1, 2, 3 > Rack 2: 4, 5, 6, 7 > Rack 3: 8, 9, 10, 11 > > P0: 6, 8, 2 > P1: 9, 3, 7 > {code} > Lets imagine a user wants to create a new partition, P3. > First, we need to determine which rack to start from for P3: this corresponds > to the initial offset in RackList. We can look at the leader of P1 (not P0 > because P1 is the “last” partition we made an assignment for) and see its on > rack 3. So, the next rack we should start from should be rack 1. This means > we set offset in RackList to 0, instead of a random value, during > initialization. > Second, we need to determine which broker to start from {_}per rack{_}: this > corresponds to the initial offset in BrokerList. We can look at all the > existing partition assignments, P0 and P1 in our example, and _per rack_ > infer the last offset started from during previous assignments. For each > rack, we do this by iterating through each partition, in reverse order > because we care about the most recent starting position, and try to find the > first broker in the assignment from the rack. This enables us to know where > we last started from when making an assignment for that rack, which can be > used to determine where to continue on from within that rack. > So in our example, for rack 1 we can see the last broker we started from was > broker 3 in P1: so the next broker we should choose for that rack should be 0 > which means the initial offset is set to 0 in the BrokerList for rack 1 > during initialization. For rack 2 we can see the last broker we started with > was broker 7 in P1: so the next broker should be 4 which means the offset is > 0 in the BrokerList for rack 2. For rack 3 we can see the last broker we > started with was broker 9 in P1: so the next broker should be 10 which means > the offset is 2 in the BrokerList for rack 3. -- This message was sent by Atlassian Jira (v8.20.10#820010)