[ 
https://issues.apache.org/jira/browse/KAFKA-12464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-12464:
----------------------------------
    Component/s: streams

> Enhance constrained sticky Assign algorithm
> -------------------------------------------
>
>                 Key: KAFKA-12464
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12464
>             Project: Kafka
>          Issue Type: Improvement
>          Components: streams
>    Affects Versions: 2.7.0
>            Reporter: Luke Chen
>            Assignee: Luke Chen
>            Priority: Major
>
> In KAFKA-9987, we did a great improvement for the case when all consumers 
> were subscribed to same set of topics. The algorithm contains 4 phases:
>  # Reassign as many previously owned partitions as possible, up to the 
> maxQuota
>  # Fill remaining members up to minQuota
>  # If we ran out of unassigned partitions before filling all consumers, we 
> need to start stealing partitions from the over-full consumers at max capacity
>  # Otherwise we may have run out of unfilled consumers before assigning all 
> partitions, in which case we should just distribute one partition each to all 
> consumers at min capacity
>  
> Take an example for better understanding:
> *example:*
> Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1, 
> ... t1p9
> Suppose, current assignment is:
> _C0: t1p0, t1p1, t1p2, t1p3, t1p4_
>  _C1: t1p5, t1p6, t1p7, t1p8, t1p9_
> Now, new consumer added: C2, so we'll do:
>  # Reassign as many previously owned partitions as possible, up to the 
> maxQuota
>  After this phase, the assignment will be: (maxQuota will be 4)
> _C0: t1p0, t1p1, t1p2, t1p3_
>  _C1: t1p5, t1p6, t1p7, t1p8_
>  # Fill remaining members up to minQuota
>  After this phase, the assignment will be:
> _C0: t1p0, t1p1, t1p2, t1p3_
>  _C1: t1p5, t1p6, t1p7, t1p8_
>  _C2: t1p4, t1p9_
>  # If we ran out of unassigned partitions before filling all consumers, we 
> need to start stealing partitions from the over-full consumers at max capacity
>  After this phase, the assignment will be:
> _C0: t1p0, t1p1, t1p2_ 
>  _C1: t1p5, t1p6, t1p7, t1p8_
>  _C2: t1p4, t1p9,_ _t1p3_
>  # Otherwise we may have run out of unfilled consumers before assigning all 
> partitions, in which case we should just distribute one partition each to all 
> consumers at min capacity
>  
>  
> As we can see, we need 3 phases to complete the assignment. But we can 
> actually completed with 2 phases. Here's the updated algorithm:
>  # Reassign as many previously owned partitions as possible, up to the 
> maxQuota, and also considering the numMaxQuota by the remainder of 
> (Partitions / Consumers)
>  # Fill remaining members up to maxQuota if current maxQuotaMember < 
> numMaxQuota, otherwise, to minQuota
>  
> By considering the numMaxQuota, the original step 1 won't be too aggressive 
> to assign too many partitions to consumers, and the step 2 won't be too 
> conservative to assign not enough partitions to consumers, so that we don't 
> need step 3 and step 4 to balance them.
>  
> {{So, the updated Pseudo-code sketch of the algorithm:}}
> C_f := (P/N)_floor, the floor capacity 
>  C_c := (P/N)_ceil, the ceiling capacity
> *C_r := (P%N) the allowed number of members with C_c partitions assigned*
>  *num_max_capacity_members := current number of members with C_c partitions 
> assigned (default to 0)*
> members := the sorted set of all consumers
>  partitions := the set of all partitions
>  unassigned_partitions := the set of partitions not yet assigned, initialized 
> to be all partitions
>  unfilled_members := the set of consumers not yet at capacity, initialized to 
> empty
>  -max_capacity_members := the set of members with exactly C_c partitions 
> assigned, initialized to empty-
>  member.owned_partitions := the set of previously owned partitions encoded in 
> the Subscription
> // Reassign as many previously owned partitions as possible, *by considering 
> the num_max_capacity_members*
>  for member : members
>       remove any partitions that are no longer in the subscription from its 
> owned partitions
>       remove all owned_partitions if the generation is old
>       if member.owned_partitions.size < C_f
>           assign all owned partitions to member and remove from 
> unassigned_partitions
>           add member to unfilled_members
>       -else if member.owned_partitions.size == C_f-
>           -assign first C_f owned_partitions to member and remove from 
> unassigned_partitions-
>       else if member.owned_partitions.size >= C_c *&& 
> num_max_capacity_members < C_r*
>           *assign first C_c owned_partitions to member and remove from 
> unassigned_partitions*            
>          *num_max_capacity_members++*
>           a-dd member to max_capacity_members-
>      *else*
>            *assign first C_f owned_partitions to member and remove from 
> unassigned_partitions*
> sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, 
> t1_p0 .... (for data parallelism)
>  sort unfilled_members by memberId (for determinism)
> // Fill remaining members *up to the C_r numbers of C_c, otherwise, to C_f*
>  for member : unfilled_members
>       compute the remaining capacity as -C = C_f - num_assigned_partitions-
>          if num_max_capacity_members < C_r:
>                 C = C_c - num_assigned_partitions
>                 num_max_capacity_members++
>           else
>                C = C_f - num_assigned_partitions
>  pop the first C partitions from unassigned_partitions and assign to member
> -// Steal partitions from members with max_capacity if necessary-
>  -if we run out of partitions before getting to the end of unfilled members:-
>  -for member : unfilled_members-
>        -poll for first member in max_capacity_members and remove one 
> partition-
>         -assign this partition to the unfilled member-
> -// Distribute remaining partitions, one per consumer, to fill some up to C_c 
> if necessary-
>  -if we run out of unfilled_members before assigning all partitions:-
>  -for partition : unassigned_partitions-
>          -assign to next member in members that is not in 
> max_capacity_members (then add member to max_capacity_members)-
>  
>  
> {code:java}
> C_f := (P/N)_floor, the floor capacity
> C_c := (P/N)_ceil, the ceiling capacity
> C_r := (P%N) the allowed number of members with C_c partitions assigned
> num_max_capacity_members := current number of members with C_c partitions 
> assigned (default to 0)
>   
>  members := the sorted set of all consumers
>  partitions := the set of all partitions
>  unassigned_partitions := the set of partitions not yet assigned, initialized 
> to be all partitions
>  unfilled_members := the set of consumers not yet at capacity, initialized to 
> empty
>  member.owned_partitions := the set of previously owned partitions encoded in 
> the Subscription
>   
>  // Reassign as many previously owned partitions as possible, by considering 
> the num_max_capacity_members 
>  for member : members
>          remove any partitions that are no longer in the subscription from 
> its owned partitions
>          remove all owned_partitions if the generation is old
>          if member.owned_partitions.size < C_f
>              assign all owned partitions to member and remove from 
> unassigned_partitions
>              add member to unfilled_members
>          else if member.owned_partitions.size >= C_c && 
> num_max_capacity_members < C_r
>              assign first C_c owned_partitions to member and remove from 
> unassigned_partitions
>              num_max_capacity_members++
>          else 
>              assign first C_f owned_partitions to member and remove from 
> unassigned_partitions
>   
>  sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, 
> t0_p1, t1_p0 .... (for data parallelism)
>  sort unfilled_members by memberId (for determinism)
>   
>  // Fill remaining members up to the C_r numbers of C_c, otherwise, to C_f
>  for member : unfilled_members
>      compute the remaining capacity as:
>       if num_max_capacity_members < C_r: 
>           C = C_c - num_assigned_partitions
>           num_max_capacity_members++
>       else 
>           C = C_f - num_assigned_partitions
>      pop the first C partitions from unassigned_partitions and assign to 
> member{code}
>  
>  So, adopting the updated algorithm, the previous example will be:
> *example:*
> Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1, 
> ... t1p9
> Suppose, current assignment is:
> _C0: t1p0, t1p1, t1p2, t1p3, t1p4_
>  _C1: t1p5, t1p6, t1p7, t1p8, t1p9_
> Now, new consumer added: C2, so we'll do:
>  # Reassign as many previously owned partitions as possible, up to the 
> maxQuota
>  After this phase, the assignment will be: (maxQuota will be 4, *and 
> numAllowedWithMaxQuota will be 1)*
> _C0: t1p0, t1p1, t1p2, t1p3_
>  _C1: t1p5, t1p6, t1p7_
>  # Fill remaining members up to maxQuota if current maxQuotaMember < 
> numMaxQuota, otherwise, to minQuota
>  After this phase, the assignment will be:
> _C0: t1p0, t1p1, t1p2, t1p3_
>  _C1: t1p5, t1p6, t1p7_
>  _C2: t1p4, t1p8,_ _t1p9_
>  
> *Another enhancement:*
> Currently, in phase 1, we loop through all consumerToOwnedPartitions 1 by 1, 
> to do partition assignment or partition remove. However, we doesn't care each 
> partition info, what we only care, is the partition number assigned. So, we 
> can just use List.subList() to get the expected number of subList from the 
> consumerToOwnedPartitions. It'll also improve the performance of the 
> algorithm.
>  
> That is:
> {code:java}
> // Reassign as many previously owned partitions as possible
> for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
> consumerToOwnedPartitions.entrySet()) {
>     String consumer = consumerEntry.getKey();
>     List<TopicPartition> ownedPartitions = consumerEntry.getValue();
>     List<TopicPartition> consumerAssignment = assignment.get(consumer);
>     int i = 0;
>     // assign the first N partitions up to the max quota, and mark the 
> remaining as being revoked
>     // originally, we loop through the partitions 1 by 1
> /*   for (TopicPartition tp : ownedPartitions) {
>         if (i < maxQuota) {
>             consumerAssignment.add(tp);
>             unassignedPartitions.remove(tp);
>         } else {
>             allRevokedPartitions.add(tp);
>         }
>         ++i;
>     } */
>     // Enhancement: since we only care the number of partitions assigned, we 
> can use subList to assign the expected number, so no need to loop through 
> them all
>     if (ownedPartitions.size() >= maxQuota) {
>         consumerAssignment.addAll(ownedPartitions.subList(0, maxQuota));
>         
> unassignedPartitions.removeAll(ownedPartitions.subList(maxQuota,ownedPartitions.size());
>     } ....
>      .....
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to