[ 
https://issues.apache.org/jira/browse/KAFKA-14437?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andrew Grant updated KAFKA-14437:
---------------------------------
    Description: 
Currently, in StripedReplicaPlacer we don’t take existing partition assignments 
into consideration when the place method is called. This means for new 
partitions added, they may get the same assignments as existing partitions. 
This differs from AdminUtils, which has some logic to try and shift where in 
the list of brokers we start making assignments from for new partitions added.

For example, lets say we had the following
{code:java}
Rack 1: 0, 1, 2, 3
Rack 2: 4, 5, 6, 7
Rack 3: 8, 9, 10, 11
{code}
CreateTopics might return the following assignment for two partitions:
{code:java}
P0: 6, 8, 2
P1: 9, 3, 7
{code}
If the user then calls CreatePartitions increasing the partition count to 4, 
StripedReplicaPlacer does not take into account P0 and P1. It creates a random 
rack offset and a random broker offset. So it could easily create the same 
assignment for P3 and P4 that it created for P0 and P1. This is easily 
reproduced in a unit test.

My suggestion is to enhance StripedReplicaPlacer to account for existing 
partition assignments. Intuitively, we’d like to make assignments for added 
partitions from “where we left off” when we were making the previous 
assignments. In practice, its not possible to know exactly what the state was 
during the previous partition assignments because, for example, brokers fencing 
state may have changed. But I do think we can make a best effort attempt to do 
so that is optimized for the common case where most brokers were and are 
unfenced.

Note, all the changes suggested below only affect StripedReplicaPlacer when 
place is called and there are existing partition assignments, which happens 
when its servicing CreatePartitions requests. If there are no existing 
partition assignments, which happens during CreateTopics, the logic is 
unchanged.

First, we need to update ClusterDescriber to:
{code:java}
public interface ClusterDescriber {
    /**
     * Get an iterator through the usable brokers.
     */
    Iterator<UsableBroker> usableBrokers();

    List<List<Integer>> replicasForTopicName(String topicName);
}
{code}
The replicasForTopicName returns the existing partition assignments. This will 
enable StripedReplicaPlacer to know about existing partition assignments if 
they exist.

When place is called, some initialization is done in both RackList and 
BrokerList. One thing that is initialized is the offset variable - this is a 
variable used in both RackList and BrokerList that determines where in the list 
of either racks or brokers respectively we should start from when making the 
next assignment. Currently, it is initialized to a random value, based off the 
size of the list. 

I suggest we add some logic during initialization that sets the offset for both 
RackList and BrokerList to a value based off the previous assignments.

Consider again the following rack metadata and existing assignments:
{code:java}
Rack 1: 0, 1, 2, 3
Rack 2: 4, 5, 6, 7
Rack 3: 8, 9, 10, 11
 
P0: 6, 8, 2
P1: 9, 3, 7  
{code}
Lets imagine a user wants to create a new partition, P3. 

First, we need to determine which rack to start from for P3: this corresponds 
to the initial offset in RackList. We can look at the leader of P1 (not P0 
because P1 is the “last” partition we made an assignment for) and see its on 
rack 3. So, the next rack we should start from should be rack 1. This means we 
set offset in RackList to 0, instead of a random value, during initialization. 

Second, we need to determine which broker to start from {_}per rack{_}: this 
corresponds to the initial offset in BrokerList. We can look at all the 
existing partition assignments, P0 and P1 in our example, and _per rack_ infer 
the last offset started from during previous assignments. For each rack, we do 
this by iterating through each partition, in reverse order because we care 
about the most recent starting position, and try to find the first broker in 
the assignment from the rack. This enables us to know where we last started 
from when making an assignment for that rack, which can be used to determine 
where to continue on from within that rack.

So in our example, for rack 1 we can see the last broker we started from was 
broker 3 in P1: so the next broker we should choose for that rack should be 0 
which means the initial offset is set to 0 in the BrokerList for rack 1 during 
initialization. For rack 2 we can see the last broker we started with was 
broker 7 in P1: so the next broker should be 4 which means the offset is 0 in 
the BrokerList for rack 2. For rack 3 we can see the last broker we started 
with was broker 9 in P1: so the next broker should be 10 which means the offset 
is 2 in the BrokerList for rack 3.

  was:
Currently, in StripedReplicaPlacer we don’t take existing partition assignments 
into consideration when the place method is called. This means for new 
partitions added, they may get the same assignments as existing partitions. 
This differs from AdminUtils, which has some logic to try and shift where in 
the list of brokers we start making assignments from for new partitions added.

For example, lets say we had the following
{code:java}
Rack 1: 0, 1, 2, 3
Rack 2: 4, 5, 6, 7
Rack 3: 8, 9, 10, 11
{code}
CreateTopics might return the following assignment for two partitions:
{code:java}
P0: 6, 8, 2
P1: 9, 3, 7
{code}
If the user then calls CreatePartitions increasing the partition count to 4, 
StripedReplicaPlacer does not take into account P0 and P1. It creates a random 
rack offset and a random broker offset. So it could easily create the same 
assignment for P3 and P4 that it created for P0 and P1. This is easily 
reproduced in a unit test.

My suggestion is to enhance StripedReplicaPlacer to account for existing 
partition assignments. Intuitively, we’d like to make assignments for added 
partitions from “where we left off” when we were making the previous 
assignments. In practice, its not possible to know exactly what the state was 
during the previous partition assignments because, for example, brokers fencing 
state may have changed. But I do think we can make a best effort attempt to do 
so that is optimized for the common case where most brokers are unfenced. Note, 
all the changes suggested below only will affect StripedReplicaPlacer when 
place is called and there are existing partition assignments, which happens 
when its servicing CreatePartitions requests. If there are no existing 
partition assignments, which happens during CreateTopics, the logic is 
unchanged.

First, we need to update ClusterDescriber to:
{code:java}
public interface ClusterDescriber {
    /**
     * Get an iterator through the usable brokers.
     */
    Iterator<UsableBroker> usableBrokers();

    List<List<Integer>> replicasForTopicName(String topicName);
}
{code}
The replicasForTopicName returns the existing partition assignments. This will 
enable StripedReplicaPlacer to know about existing partition assignments when 
they exist.

When place is called, some initialization is done in both RackList and 
BrokerList. One thing that is initialized is the offset variable - this is a 
variable used in both RackList and BrokerList that determines where in the list 
of either racks or brokers respectively we should start from when making the 
next assignment. Currently, it is initialized to a random value, based off the 
size of the list. 

I suggest we add some logic during initialization that sets the offset for both 
RackList and BrokerList to a value based off the previous assignments.

Consider again the following rack metadata and existing assignments:
{code:java}
Rack 1: 0, 1, 2, 3
Rack 2: 4, 5, 6, 7
Rack 3: 8, 9, 10, 11
 
P0: 6, 8, 2
P1: 9, 3, 7  
{code}
Lets imagine a user wants to create a new partition, P3. 

First, we need to determine which rack to start from for P3: this corresponds 
to the initial offset in RackList. We can look at the leader of P1 (not P0 
because P1 is the “last” partition we made an assignment for) and see its on 
rack 3. So, the next rack we should start from should be rack 1. This means we 
set offset in RackList to 0, instead of a random value, during initialization. 

Second, we need to determine which broker to start from {_}per rack{_}: this 
corresponds to the initial offset in BrokerList. We can look at all the 
existing partition assignments, P0 and P1 in our example, and _per rack_ infer 
the last offset started from during previous assignments. For each rack, we do 
this by iterating through each partition, in reverse order because we care 
about the most recent starting position, and try to find the first broker in 
the assignment from the rack. This enables us to know where we last started 
from when making an assignment for that rack, which can be used to determine 
where to continue on from within that rack.

So in our example, for rack 1 we can see the last broker we started from was 
broker 3 in P1: so the next broker we should choose for that rack should be 0 
which means the initial offset is set to 0 in the BrokerList for rack 1 during 
initialization. For rack 2 we can see the last broker we started with was 
broker 7 in P1: so the next broker should be 4 which means the offset is 0 in 
the BrokerList for rack 2. For rack 3 we can see the last broker we started 
with was broker 9 in P1: so the next broker should be 10 which means the offset 
is 2 in the BrokerList for rack 3.


> Enhance StripedReplicaPlacer to account for existing partition assignments
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-14437
>                 URL: https://issues.apache.org/jira/browse/KAFKA-14437
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Andrew Grant
>            Assignee: Andrew Grant
>            Priority: Major
>
> Currently, in StripedReplicaPlacer we don’t take existing partition 
> assignments into consideration when the place method is called. This means 
> for new partitions added, they may get the same assignments as existing 
> partitions. This differs from AdminUtils, which has some logic to try and 
> shift where in the list of brokers we start making assignments from for new 
> partitions added.
> For example, lets say we had the following
> {code:java}
> Rack 1: 0, 1, 2, 3
> Rack 2: 4, 5, 6, 7
> Rack 3: 8, 9, 10, 11
> {code}
> CreateTopics might return the following assignment for two partitions:
> {code:java}
> P0: 6, 8, 2
> P1: 9, 3, 7
> {code}
> If the user then calls CreatePartitions increasing the partition count to 4, 
> StripedReplicaPlacer does not take into account P0 and P1. It creates a 
> random rack offset and a random broker offset. So it could easily create the 
> same assignment for P3 and P4 that it created for P0 and P1. This is easily 
> reproduced in a unit test.
> My suggestion is to enhance StripedReplicaPlacer to account for existing 
> partition assignments. Intuitively, we’d like to make assignments for added 
> partitions from “where we left off” when we were making the previous 
> assignments. In practice, its not possible to know exactly what the state was 
> during the previous partition assignments because, for example, brokers 
> fencing state may have changed. But I do think we can make a best effort 
> attempt to do so that is optimized for the common case where most brokers 
> were and are unfenced.
> Note, all the changes suggested below only affect StripedReplicaPlacer when 
> place is called and there are existing partition assignments, which happens 
> when its servicing CreatePartitions requests. If there are no existing 
> partition assignments, which happens during CreateTopics, the logic is 
> unchanged.
> First, we need to update ClusterDescriber to:
> {code:java}
> public interface ClusterDescriber {
>     /**
>      * Get an iterator through the usable brokers.
>      */
>     Iterator<UsableBroker> usableBrokers();
>     List<List<Integer>> replicasForTopicName(String topicName);
> }
> {code}
> The replicasForTopicName returns the existing partition assignments. This 
> will enable StripedReplicaPlacer to know about existing partition assignments 
> if they exist.
> When place is called, some initialization is done in both RackList and 
> BrokerList. One thing that is initialized is the offset variable - this is a 
> variable used in both RackList and BrokerList that determines where in the 
> list of either racks or brokers respectively we should start from when making 
> the next assignment. Currently, it is initialized to a random value, based 
> off the size of the list. 
> I suggest we add some logic during initialization that sets the offset for 
> both RackList and BrokerList to a value based off the previous assignments.
> Consider again the following rack metadata and existing assignments:
> {code:java}
> Rack 1: 0, 1, 2, 3
> Rack 2: 4, 5, 6, 7
> Rack 3: 8, 9, 10, 11
>  
> P0: 6, 8, 2
> P1: 9, 3, 7  
> {code}
> Lets imagine a user wants to create a new partition, P3. 
> First, we need to determine which rack to start from for P3: this corresponds 
> to the initial offset in RackList. We can look at the leader of P1 (not P0 
> because P1 is the “last” partition we made an assignment for) and see its on 
> rack 3. So, the next rack we should start from should be rack 1. This means 
> we set offset in RackList to 0, instead of a random value, during 
> initialization. 
> Second, we need to determine which broker to start from {_}per rack{_}: this 
> corresponds to the initial offset in BrokerList. We can look at all the 
> existing partition assignments, P0 and P1 in our example, and _per rack_ 
> infer the last offset started from during previous assignments. For each 
> rack, we do this by iterating through each partition, in reverse order 
> because we care about the most recent starting position, and try to find the 
> first broker in the assignment from the rack. This enables us to know where 
> we last started from when making an assignment for that rack, which can be 
> used to determine where to continue on from within that rack.
> So in our example, for rack 1 we can see the last broker we started from was 
> broker 3 in P1: so the next broker we should choose for that rack should be 0 
> which means the initial offset is set to 0 in the BrokerList for rack 1 
> during initialization. For rack 2 we can see the last broker we started with 
> was broker 7 in P1: so the next broker should be 4 which means the offset is 
> 0 in the BrokerList for rack 2. For rack 3 we can see the last broker we 
> started with was broker 9 in P1: so the next broker should be 10 which means 
> the offset is 2 in the BrokerList for rack 3.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to