[ 
https://issues.apache.org/jira/browse/KAFKA-12464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen updated KAFKA-12464:
------------------------------
    Description: 
In KAFKA-9987, we did a great improvement for the case when all consumers were 
subscribed to same set of topics. The algorithm contains 4 phases:
 # Reassign as many previously owned partitions as possible, up to the maxQuota
 # Fill remaining members up to minQuota
 # If we ran out of unassigned partitions before filling all consumers, we need 
to start stealing partitions from the over-full consumers at max capacity
 # Otherwise we may have run out of unfilled consumers before assigning all 
partitions, in which case we should just distribute one partition each to all 
consumers at min capacity

 

Take an example for better understanding:

*example:*

Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1, ... 
t1p9

Suppose, current assignment is:

_C0: t1p0, t1p1, t1p2, t1p3, t1p4_
 _C1: t1p5, t1p6, t1p7, t1p8, t1p9_

Now, new consumer added: C2, so we'll do:
 # Reassign as many previously owned partitions as possible, up to the maxQuota
 After this phase, the assignment will be: (maxQuota will be 4)

_C0: t1p0, t1p1, t1p2, t1p3_
 _C1: t1p5, t1p6, t1p7, t1p8_
 # Fill remaining members up to minQuota
 After this phase, the assignment will be:

_C0: t1p0, t1p1, t1p2, t1p3_
 _C1: t1p5, t1p6, t1p7, t1p8_
 _C2: t1p4, t1p9_
 # If we ran out of unassigned partitions before filling all consumers, we need 
to start stealing partitions from the over-full consumers at max capacity
 After this phase, the assignment will be:

_C0: t1p0, t1p1, t1p2_ 
 _C1: t1p5, t1p6, t1p7, t1p8_
 _C2: t1p4, t1p9,_ _t1p3_
 # Otherwise we may have run out of unfilled consumers before assigning all 
partitions, in which case we should just distribute one partition each to all 
consumers at min capacity

 

 

As we can see, we need 3 phases to complete the assignment. But we can actually 
completed with 2 phases. Here's the updated algorithm:
 # Reassign as many previously owned partitions as possible, up to the 
maxQuota, and also considering the numMaxQuota by the remainder of (Partitions 
/ Consumers)
 # Fill remaining members up to maxQuota if current maxQuotaMember < 
numMaxQuota, otherwise, to minQuota

 

By considering the numMaxQuota, the original step 1 won't be too aggressive to 
assign too many partitions to consumers, and the step 2 won't be too 
conservative to assign not enough partitions to consumers, so that we don't 
need step 3 and step 4 to balance them.

 

{{So, the updated Pseudo-code sketch of the algorithm:}}

C_f := (P/N)_floor, the floor capacity 
 C_c := (P/N)_ceil, the ceiling capacity

*C_r := (P%N) the allowed number of members with C_c partitions assigned*
 *num_max_capacity_members := current number of members with C_c partitions 
assigned (default to 0)*

members := the sorted set of all consumers
 partitions := the set of all partitions
 unassigned_partitions := the set of partitions not yet assigned, initialized 
to be all partitions
 unfilled_members := the set of consumers not yet at capacity, initialized to 
empty
 -max_capacity_members := the set of members with exactly C_c partitions 
assigned, initialized to empty-
 member.owned_partitions := the set of previously owned partitions encoded in 
the Subscription

// Reassign as many previously owned partitions as possible, *by considering 
the num_max_capacity_members*
 for member : members
      remove any partitions that are no longer in the subscription from its 
owned partitions
      remove all owned_partitions if the generation is old
      if member.owned_partitions.size < C_f
          assign all owned partitions to member and remove from 
unassigned_partitions
          add member to unfilled_members
      -else if member.owned_partitions.size == C_f-
          -assign first C_f owned_partitions to member and remove from 
unassigned_partitions-
      else if member.owned_partitions.size >= C_c *&& num_max_capacity_members 
< C_r*
          *assign first C_c owned_partitions to member and remove from 
unassigned_partitions*            

         *num_max_capacity_members++*
          a-dd member to max_capacity_members-

     *else*
           *assign first C_f owned_partitions to member and remove from 
unassigned_partitions*

sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, 
t1_p0 .... (for data parallelism)
 sort unfilled_members by memberId (for determinism)

// Fill remaining members *up to the C_r numbers of C_c, otherwise, to C_f*
 for member : unfilled_members
      compute the remaining capacity as -C = C_f - num_assigned_partitions-

         if num_max_capacity_members < C_r:

                C = C_c - num_assigned_partitions

                num_max_capacity_members++
          else

               C = C_f - num_assigned_partitions
 pop the first C partitions from unassigned_partitions and assign to member

-// Steal partitions from members with max_capacity if necessary-
 -if we run out of partitions before getting to the end of unfilled members:-
 -for member : unfilled_members-
       -poll for first member in max_capacity_members and remove one partition-
        -assign this partition to the unfilled member-

-// Distribute remaining partitions, one per consumer, to fill some up to C_c 
if necessary-
 -if we run out of unfilled_members before assigning all partitions:-
 -for partition : unassigned_partitions-
         -assign to next member in members that is not in max_capacity_members 
(then add member to max_capacity_members)-

 

 
{code:java}
C_f := (P/N)_floor, the floor capacity
C_c := (P/N)_ceil, the ceiling capacity
C_r := (P%N) the allowed number of members with C_c partitions assigned
num_max_capacity_members := current number of members with C_c partitions 
assigned (default to 0)
  
 members := the sorted set of all consumers
 partitions := the set of all partitions
 unassigned_partitions := the set of partitions not yet assigned, initialized 
to be all partitions
 unfilled_members := the set of consumers not yet at capacity, initialized to 
empty
 member.owned_partitions := the set of previously owned partitions encoded in 
the Subscription
  
 // Reassign as many previously owned partitions as possible, by considering 
the num_max_capacity_members 
 for member : members
         remove any partitions that are no longer in the subscription from its 
owned partitions
         remove all owned_partitions if the generation is old
         if member.owned_partitions.size < C_f
             assign all owned partitions to member and remove from 
unassigned_partitions
             add member to unfilled_members
         else if member.owned_partitions.size >= C_c && 
num_max_capacity_members < C_r
             assign first C_c owned_partitions to member and remove from 
unassigned_partitions
             num_max_capacity_members++
         else 
             assign first C_f owned_partitions to member and remove from 
unassigned_partitions
  
 sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, 
t1_p0 .... (for data parallelism)
 sort unfilled_members by memberId (for determinism)
  
 // Fill remaining members up to the C_r numbers of C_c, otherwise, to C_f
 for member : unfilled_members
     compute the remaining capacity as:
      if num_max_capacity_members < C_r: 
          C = C_c - num_assigned_partitions
          num_max_capacity_members++
      else 
          C = C_f - num_assigned_partitions
     pop the first C partitions from unassigned_partitions and assign to 
member{code}
 

 So, adopting the updated algorithm, the previous example will be:

*example:*

Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1, ... 
t1p9

Suppose, current assignment is:

_C0: t1p0, t1p1, t1p2, t1p3, t1p4_
 _C1: t1p5, t1p6, t1p7, t1p8, t1p9_

Now, new consumer added: C2, so we'll do:
 # Reassign as many previously owned partitions as possible, up to the maxQuota
 After this phase, the assignment will be: (maxQuota will be 4, *and 
numAllowedWithMaxQuota will be 1)*

_C0: t1p0, t1p1, t1p2, t1p3_
 _C1: t1p5, t1p6, t1p7_
 # Fill remaining members up to maxQuota if current maxQuotaMember < 
numMaxQuota, otherwise, to minQuota
 After this phase, the assignment will be:

_C0: t1p0, t1p1, t1p2, t1p3_
 _C1: t1p5, t1p6, t1p7_
 _C2: t1p4, t1p8,_ _t1p9_

 

*Another enhancement:*

Currently, in phase 1, we loop through all consumerToOwnedPartitions 1 by 1, to 
do partition assignment or partition remove. However, we doesn't care each 
partition info, what we only care, is the partition number assigned. So, we can 
just use List.subList() to get the expected number of subList from the 
consumerToOwnedPartitions. It'll also improve the performance of the algorithm.

 

That is:
{code:java}
// Reassign as many previously owned partitions as possible
for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
    String consumer = consumerEntry.getKey();
    List<TopicPartition> ownedPartitions = consumerEntry.getValue();
    List<TopicPartition> consumerAssignment = assignment.get(consumer);
    int i = 0;
    // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
    // originally, we loop through the partitions 1 by 1
/*   for (TopicPartition tp : ownedPartitions) {
        if (i < maxQuota) {
            consumerAssignment.add(tp);
            unassignedPartitions.remove(tp);
        } else {
            allRevokedPartitions.add(tp);
        }
        ++i;
    } */
    // Enhancement: since we only care the number of partitions assigned, we 
can use subList to assign the expected number, so no need to loop through them 
all
    if (ownedPartitions.size() < minQuota) {
      // the expected assignment size is more than consumer have now, so keep 
all the owned partitions
      // and put this member into unfilled member list
      consumerAssignment.addAll(ownedPartitions);
      unassignedPartitions.removeAll(ownedPartitions);
      unfilledMembers.add(consumer);
  } else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers++ <= 
numExpectedMaxCapacityMembers) {
      // consumer owned the "maxQuota" of partitions or more, and we still 
under the number of expected max capacity members
      // so keep "maxQuota" of the owned partitions, and revoke the rest of the 
partitions
      consumerAssignment.addAll(ownedPartitions.subList(0, maxQuota));
      unassignedPartitions.removeAll(ownedPartitions.subList(0, maxQuota));
      allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
  } else {
      // consumer owned the "minQuota" of partitions or more
      // so keep "minQuota" of the owned partitions, and revoke the rest of the 
partitions
      consumerAssignment.addAll(ownedPartitions.subList(0, minQuota));
    unassignedPartitions.removeAll(ownedPartitions.subList(0, minQuota));
      allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
ownedPartitions.size()));
  }

     .....
{code}
 

  was:
In KAFKA-9987, we did a great improvement for the case when all consumers were 
subscribed to same set of topics. The algorithm contains 4 phases:
 # Reassign as many previously owned partitions as possible, up to the maxQuota
 # Fill remaining members up to minQuota
 # If we ran out of unassigned partitions before filling all consumers, we need 
to start stealing partitions from the over-full consumers at max capacity
 # Otherwise we may have run out of unfilled consumers before assigning all 
partitions, in which case we should just distribute one partition each to all 
consumers at min capacity

 

Take an example for better understanding:

*example:*

Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1, ... 
t1p9

Suppose, current assignment is:

_C0: t1p0, t1p1, t1p2, t1p3, t1p4_
 _C1: t1p5, t1p6, t1p7, t1p8, t1p9_

Now, new consumer added: C2, so we'll do:
 # Reassign as many previously owned partitions as possible, up to the maxQuota
 After this phase, the assignment will be: (maxQuota will be 4)

_C0: t1p0, t1p1, t1p2, t1p3_
 _C1: t1p5, t1p6, t1p7, t1p8_
 # Fill remaining members up to minQuota
 After this phase, the assignment will be:

_C0: t1p0, t1p1, t1p2, t1p3_
 _C1: t1p5, t1p6, t1p7, t1p8_
 _C2: t1p4, t1p9_
 # If we ran out of unassigned partitions before filling all consumers, we need 
to start stealing partitions from the over-full consumers at max capacity
 After this phase, the assignment will be:

_C0: t1p0, t1p1, t1p2_ 
 _C1: t1p5, t1p6, t1p7, t1p8_
 _C2: t1p4, t1p9,_ _t1p3_
 # Otherwise we may have run out of unfilled consumers before assigning all 
partitions, in which case we should just distribute one partition each to all 
consumers at min capacity

 

 

As we can see, we need 3 phases to complete the assignment. But we can actually 
completed with 2 phases. Here's the updated algorithm:
 # Reassign as many previously owned partitions as possible, up to the 
maxQuota, and also considering the numMaxQuota by the remainder of (Partitions 
/ Consumers)
 # Fill remaining members up to maxQuota if current maxQuotaMember < 
numMaxQuota, otherwise, to minQuota

 

By considering the numMaxQuota, the original step 1 won't be too aggressive to 
assign too many partitions to consumers, and the step 2 won't be too 
conservative to assign not enough partitions to consumers, so that we don't 
need step 3 and step 4 to balance them.

 

{{So, the updated Pseudo-code sketch of the algorithm:}}

C_f := (P/N)_floor, the floor capacity 
 C_c := (P/N)_ceil, the ceiling capacity

*C_r := (P%N) the allowed number of members with C_c partitions assigned*
 *num_max_capacity_members := current number of members with C_c partitions 
assigned (default to 0)*

members := the sorted set of all consumers
 partitions := the set of all partitions
 unassigned_partitions := the set of partitions not yet assigned, initialized 
to be all partitions
 unfilled_members := the set of consumers not yet at capacity, initialized to 
empty
 -max_capacity_members := the set of members with exactly C_c partitions 
assigned, initialized to empty-
 member.owned_partitions := the set of previously owned partitions encoded in 
the Subscription

// Reassign as many previously owned partitions as possible, *by considering 
the num_max_capacity_members*
 for member : members
      remove any partitions that are no longer in the subscription from its 
owned partitions
      remove all owned_partitions if the generation is old
      if member.owned_partitions.size < C_f
          assign all owned partitions to member and remove from 
unassigned_partitions
          add member to unfilled_members
      -else if member.owned_partitions.size == C_f-
          -assign first C_f owned_partitions to member and remove from 
unassigned_partitions-
      else if member.owned_partitions.size >= C_c *&& num_max_capacity_members 
< C_r*
          *assign first C_c owned_partitions to member and remove from 
unassigned_partitions*            

         *num_max_capacity_members++*
          a-dd member to max_capacity_members-

     *else*
           *assign first C_f owned_partitions to member and remove from 
unassigned_partitions*

sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, 
t1_p0 .... (for data parallelism)
 sort unfilled_members by memberId (for determinism)

// Fill remaining members *up to the C_r numbers of C_c, otherwise, to C_f*
 for member : unfilled_members
      compute the remaining capacity as -C = C_f - num_assigned_partitions-

         if num_max_capacity_members < C_r:

                C = C_c - num_assigned_partitions

                num_max_capacity_members++
          else

               C = C_f - num_assigned_partitions
 pop the first C partitions from unassigned_partitions and assign to member

-// Steal partitions from members with max_capacity if necessary-
 -if we run out of partitions before getting to the end of unfilled members:-
 -for member : unfilled_members-
       -poll for first member in max_capacity_members and remove one partition-
        -assign this partition to the unfilled member-

-// Distribute remaining partitions, one per consumer, to fill some up to C_c 
if necessary-
 -if we run out of unfilled_members before assigning all partitions:-
 -for partition : unassigned_partitions-
         -assign to next member in members that is not in max_capacity_members 
(then add member to max_capacity_members)-

 

 
{code:java}
C_f := (P/N)_floor, the floor capacity
C_c := (P/N)_ceil, the ceiling capacity
C_r := (P%N) the allowed number of members with C_c partitions assigned
num_max_capacity_members := current number of members with C_c partitions 
assigned (default to 0)
  
 members := the sorted set of all consumers
 partitions := the set of all partitions
 unassigned_partitions := the set of partitions not yet assigned, initialized 
to be all partitions
 unfilled_members := the set of consumers not yet at capacity, initialized to 
empty
 member.owned_partitions := the set of previously owned partitions encoded in 
the Subscription
  
 // Reassign as many previously owned partitions as possible, by considering 
the num_max_capacity_members 
 for member : members
         remove any partitions that are no longer in the subscription from its 
owned partitions
         remove all owned_partitions if the generation is old
         if member.owned_partitions.size < C_f
             assign all owned partitions to member and remove from 
unassigned_partitions
             add member to unfilled_members
         else if member.owned_partitions.size >= C_c && 
num_max_capacity_members < C_r
             assign first C_c owned_partitions to member and remove from 
unassigned_partitions
             num_max_capacity_members++
         else 
             assign first C_f owned_partitions to member and remove from 
unassigned_partitions
  
 sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, 
t1_p0 .... (for data parallelism)
 sort unfilled_members by memberId (for determinism)
  
 // Fill remaining members up to the C_r numbers of C_c, otherwise, to C_f
 for member : unfilled_members
     compute the remaining capacity as:
      if num_max_capacity_members < C_r: 
          C = C_c - num_assigned_partitions
          num_max_capacity_members++
      else 
          C = C_f - num_assigned_partitions
     pop the first C partitions from unassigned_partitions and assign to 
member{code}
 

 So, adopting the updated algorithm, the previous example will be:

*example:*

Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1, ... 
t1p9

Suppose, current assignment is:

_C0: t1p0, t1p1, t1p2, t1p3, t1p4_
 _C1: t1p5, t1p6, t1p7, t1p8, t1p9_

Now, new consumer added: C2, so we'll do:
 # Reassign as many previously owned partitions as possible, up to the maxQuota
 After this phase, the assignment will be: (maxQuota will be 4, *and 
numAllowedWithMaxQuota will be 1)*

_C0: t1p0, t1p1, t1p2, t1p3_
 _C1: t1p5, t1p6, t1p7_
 # Fill remaining members up to maxQuota if current maxQuotaMember < 
numMaxQuota, otherwise, to minQuota
 After this phase, the assignment will be:

_C0: t1p0, t1p1, t1p2, t1p3_
 _C1: t1p5, t1p6, t1p7_
 _C2: t1p4, t1p8,_ _t1p9_

 

*Another enhancement:*

Currently, in phase 1, we loop through all consumerToOwnedPartitions 1 by 1, to 
do partition assignment or partition remove. However, we doesn't care each 
partition info, what we only care, is the partition number assigned. So, we can 
just use List.subList() to get the expected number of subList from the 
consumerToOwnedPartitions. It'll also improve the performance of the algorithm.

 

That is:
{code:java}
// Reassign as many previously owned partitions as possible
for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
    String consumer = consumerEntry.getKey();
    List<TopicPartition> ownedPartitions = consumerEntry.getValue();
    List<TopicPartition> consumerAssignment = assignment.get(consumer);
    int i = 0;
    // assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
    // originally, we loop through the partitions 1 by 1
/*   for (TopicPartition tp : ownedPartitions) {
        if (i < maxQuota) {
            consumerAssignment.add(tp);
            unassignedPartitions.remove(tp);
        } else {
            allRevokedPartitions.add(tp);
        }
        ++i;
    } */
    // Enhancement: since we only care the number of partitions assigned, we 
can use subList to assign the expected number, so no need to loop through them 
all
    if (ownedPartitions.size() >= maxQuota) {
        consumerAssignment.addAll(ownedPartitions.subList(0, maxQuota));
        
unassignedPartitions.removeAll(ownedPartitions.subList(maxQuota,ownedPartitions.size());
    } ....

     .....
{code}
 


> Enhance constrained sticky Assign algorithm
> -------------------------------------------
>
>                 Key: KAFKA-12464
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12464
>             Project: Kafka
>          Issue Type: Improvement
>          Components: consumer
>    Affects Versions: 2.7.0
>            Reporter: Luke Chen
>            Assignee: Luke Chen
>            Priority: Major
>              Labels: perfomance
>
> In KAFKA-9987, we did a great improvement for the case when all consumers 
> were subscribed to same set of topics. The algorithm contains 4 phases:
>  # Reassign as many previously owned partitions as possible, up to the 
> maxQuota
>  # Fill remaining members up to minQuota
>  # If we ran out of unassigned partitions before filling all consumers, we 
> need to start stealing partitions from the over-full consumers at max capacity
>  # Otherwise we may have run out of unfilled consumers before assigning all 
> partitions, in which case we should just distribute one partition each to all 
> consumers at min capacity
>  
> Take an example for better understanding:
> *example:*
> Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1, 
> ... t1p9
> Suppose, current assignment is:
> _C0: t1p0, t1p1, t1p2, t1p3, t1p4_
>  _C1: t1p5, t1p6, t1p7, t1p8, t1p9_
> Now, new consumer added: C2, so we'll do:
>  # Reassign as many previously owned partitions as possible, up to the 
> maxQuota
>  After this phase, the assignment will be: (maxQuota will be 4)
> _C0: t1p0, t1p1, t1p2, t1p3_
>  _C1: t1p5, t1p6, t1p7, t1p8_
>  # Fill remaining members up to minQuota
>  After this phase, the assignment will be:
> _C0: t1p0, t1p1, t1p2, t1p3_
>  _C1: t1p5, t1p6, t1p7, t1p8_
>  _C2: t1p4, t1p9_
>  # If we ran out of unassigned partitions before filling all consumers, we 
> need to start stealing partitions from the over-full consumers at max capacity
>  After this phase, the assignment will be:
> _C0: t1p0, t1p1, t1p2_ 
>  _C1: t1p5, t1p6, t1p7, t1p8_
>  _C2: t1p4, t1p9,_ _t1p3_
>  # Otherwise we may have run out of unfilled consumers before assigning all 
> partitions, in which case we should just distribute one partition each to all 
> consumers at min capacity
>  
>  
> As we can see, we need 3 phases to complete the assignment. But we can 
> actually completed with 2 phases. Here's the updated algorithm:
>  # Reassign as many previously owned partitions as possible, up to the 
> maxQuota, and also considering the numMaxQuota by the remainder of 
> (Partitions / Consumers)
>  # Fill remaining members up to maxQuota if current maxQuotaMember < 
> numMaxQuota, otherwise, to minQuota
>  
> By considering the numMaxQuota, the original step 1 won't be too aggressive 
> to assign too many partitions to consumers, and the step 2 won't be too 
> conservative to assign not enough partitions to consumers, so that we don't 
> need step 3 and step 4 to balance them.
>  
> {{So, the updated Pseudo-code sketch of the algorithm:}}
> C_f := (P/N)_floor, the floor capacity 
>  C_c := (P/N)_ceil, the ceiling capacity
> *C_r := (P%N) the allowed number of members with C_c partitions assigned*
>  *num_max_capacity_members := current number of members with C_c partitions 
> assigned (default to 0)*
> members := the sorted set of all consumers
>  partitions := the set of all partitions
>  unassigned_partitions := the set of partitions not yet assigned, initialized 
> to be all partitions
>  unfilled_members := the set of consumers not yet at capacity, initialized to 
> empty
>  -max_capacity_members := the set of members with exactly C_c partitions 
> assigned, initialized to empty-
>  member.owned_partitions := the set of previously owned partitions encoded in 
> the Subscription
> // Reassign as many previously owned partitions as possible, *by considering 
> the num_max_capacity_members*
>  for member : members
>       remove any partitions that are no longer in the subscription from its 
> owned partitions
>       remove all owned_partitions if the generation is old
>       if member.owned_partitions.size < C_f
>           assign all owned partitions to member and remove from 
> unassigned_partitions
>           add member to unfilled_members
>       -else if member.owned_partitions.size == C_f-
>           -assign first C_f owned_partitions to member and remove from 
> unassigned_partitions-
>       else if member.owned_partitions.size >= C_c *&& 
> num_max_capacity_members < C_r*
>           *assign first C_c owned_partitions to member and remove from 
> unassigned_partitions*            
>          *num_max_capacity_members++*
>           a-dd member to max_capacity_members-
>      *else*
>            *assign first C_f owned_partitions to member and remove from 
> unassigned_partitions*
> sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, 
> t1_p0 .... (for data parallelism)
>  sort unfilled_members by memberId (for determinism)
> // Fill remaining members *up to the C_r numbers of C_c, otherwise, to C_f*
>  for member : unfilled_members
>       compute the remaining capacity as -C = C_f - num_assigned_partitions-
>          if num_max_capacity_members < C_r:
>                 C = C_c - num_assigned_partitions
>                 num_max_capacity_members++
>           else
>                C = C_f - num_assigned_partitions
>  pop the first C partitions from unassigned_partitions and assign to member
> -// Steal partitions from members with max_capacity if necessary-
>  -if we run out of partitions before getting to the end of unfilled members:-
>  -for member : unfilled_members-
>        -poll for first member in max_capacity_members and remove one 
> partition-
>         -assign this partition to the unfilled member-
> -// Distribute remaining partitions, one per consumer, to fill some up to C_c 
> if necessary-
>  -if we run out of unfilled_members before assigning all partitions:-
>  -for partition : unassigned_partitions-
>          -assign to next member in members that is not in 
> max_capacity_members (then add member to max_capacity_members)-
>  
>  
> {code:java}
> C_f := (P/N)_floor, the floor capacity
> C_c := (P/N)_ceil, the ceiling capacity
> C_r := (P%N) the allowed number of members with C_c partitions assigned
> num_max_capacity_members := current number of members with C_c partitions 
> assigned (default to 0)
>   
>  members := the sorted set of all consumers
>  partitions := the set of all partitions
>  unassigned_partitions := the set of partitions not yet assigned, initialized 
> to be all partitions
>  unfilled_members := the set of consumers not yet at capacity, initialized to 
> empty
>  member.owned_partitions := the set of previously owned partitions encoded in 
> the Subscription
>   
>  // Reassign as many previously owned partitions as possible, by considering 
> the num_max_capacity_members 
>  for member : members
>          remove any partitions that are no longer in the subscription from 
> its owned partitions
>          remove all owned_partitions if the generation is old
>          if member.owned_partitions.size < C_f
>              assign all owned partitions to member and remove from 
> unassigned_partitions
>              add member to unfilled_members
>          else if member.owned_partitions.size >= C_c && 
> num_max_capacity_members < C_r
>              assign first C_c owned_partitions to member and remove from 
> unassigned_partitions
>              num_max_capacity_members++
>          else 
>              assign first C_f owned_partitions to member and remove from 
> unassigned_partitions
>   
>  sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, 
> t0_p1, t1_p0 .... (for data parallelism)
>  sort unfilled_members by memberId (for determinism)
>   
>  // Fill remaining members up to the C_r numbers of C_c, otherwise, to C_f
>  for member : unfilled_members
>      compute the remaining capacity as:
>       if num_max_capacity_members < C_r: 
>           C = C_c - num_assigned_partitions
>           num_max_capacity_members++
>       else 
>           C = C_f - num_assigned_partitions
>      pop the first C partitions from unassigned_partitions and assign to 
> member{code}
>  
>  So, adopting the updated algorithm, the previous example will be:
> *example:*
> Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1, 
> ... t1p9
> Suppose, current assignment is:
> _C0: t1p0, t1p1, t1p2, t1p3, t1p4_
>  _C1: t1p5, t1p6, t1p7, t1p8, t1p9_
> Now, new consumer added: C2, so we'll do:
>  # Reassign as many previously owned partitions as possible, up to the 
> maxQuota
>  After this phase, the assignment will be: (maxQuota will be 4, *and 
> numAllowedWithMaxQuota will be 1)*
> _C0: t1p0, t1p1, t1p2, t1p3_
>  _C1: t1p5, t1p6, t1p7_
>  # Fill remaining members up to maxQuota if current maxQuotaMember < 
> numMaxQuota, otherwise, to minQuota
>  After this phase, the assignment will be:
> _C0: t1p0, t1p1, t1p2, t1p3_
>  _C1: t1p5, t1p6, t1p7_
>  _C2: t1p4, t1p8,_ _t1p9_
>  
> *Another enhancement:*
> Currently, in phase 1, we loop through all consumerToOwnedPartitions 1 by 1, 
> to do partition assignment or partition remove. However, we doesn't care each 
> partition info, what we only care, is the partition number assigned. So, we 
> can just use List.subList() to get the expected number of subList from the 
> consumerToOwnedPartitions. It'll also improve the performance of the 
> algorithm.
>  
> That is:
> {code:java}
> // Reassign as many previously owned partitions as possible
> for (Map.Entry<String, List<TopicPartition>> consumerEntry : 
> consumerToOwnedPartitions.entrySet()) {
>     String consumer = consumerEntry.getKey();
>     List<TopicPartition> ownedPartitions = consumerEntry.getValue();
>     List<TopicPartition> consumerAssignment = assignment.get(consumer);
>     int i = 0;
>     // assign the first N partitions up to the max quota, and mark the 
> remaining as being revoked
>     // originally, we loop through the partitions 1 by 1
> /*   for (TopicPartition tp : ownedPartitions) {
>         if (i < maxQuota) {
>             consumerAssignment.add(tp);
>             unassignedPartitions.remove(tp);
>         } else {
>             allRevokedPartitions.add(tp);
>         }
>         ++i;
>     } */
>     // Enhancement: since we only care the number of partitions assigned, we 
> can use subList to assign the expected number, so no need to loop through 
> them all
>     if (ownedPartitions.size() < minQuota) {
>       // the expected assignment size is more than consumer have now, so keep 
> all the owned partitions
>       // and put this member into unfilled member list
>       consumerAssignment.addAll(ownedPartitions);
>       unassignedPartitions.removeAll(ownedPartitions);
>       unfilledMembers.add(consumer);
>   } else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers++ <= 
> numExpectedMaxCapacityMembers) {
>       // consumer owned the "maxQuota" of partitions or more, and we still 
> under the number of expected max capacity members
>       // so keep "maxQuota" of the owned partitions, and revoke the rest of 
> the partitions
>       consumerAssignment.addAll(ownedPartitions.subList(0, maxQuota));
>       unassignedPartitions.removeAll(ownedPartitions.subList(0, maxQuota));
>       allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
> ownedPartitions.size()));
>   } else {
>       // consumer owned the "minQuota" of partitions or more
>       // so keep "minQuota" of the owned partitions, and revoke the rest of 
> the partitions
>       consumerAssignment.addAll(ownedPartitions.subList(0, minQuota));
>     unassignedPartitions.removeAll(ownedPartitions.subList(0, minQuota));
>       allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, 
> ownedPartitions.size()));
>   }
>      .....
> {code}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to