[
https://issues.apache.org/jira/browse/KAFKA-12464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17317615#comment-17317615
]
Luke Chen commented on KAFKA-12464:
-----------------------------------
[~ableegoldman], good news, after my enhancement, the
_testLargeAssignmentAndGroupWithUniformSubscription_ test time down from 28xx
ms, to 18xx ms. Improved 33% of performance. PR is submitted. Thank you.
> Enhance constrained sticky Assign algorithm
> -------------------------------------------
>
> Key: KAFKA-12464
> URL: https://issues.apache.org/jira/browse/KAFKA-12464
> Project: Kafka
> Issue Type: Improvement
> Components: consumer
> Affects Versions: 2.7.0
> Reporter: Luke Chen
> Assignee: Luke Chen
> Priority: Major
> Labels: perfomance
>
> In KAFKA-9987, we did a great improvement for the case when all consumers
> were subscribed to same set of topics. The algorithm contains 4 phases:
> # Reassign as many previously owned partitions as possible, up to the
> maxQuota
> # Fill remaining members up to minQuota
> # If we ran out of unassigned partitions before filling all consumers, we
> need to start stealing partitions from the over-full consumers at max capacity
> # Otherwise we may have run out of unfilled consumers before assigning all
> partitions, in which case we should just distribute one partition each to all
> consumers at min capacity
>
> Take an example for better understanding:
> *example:*
> Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1,
> ... t1p9
> Suppose, current assignment is:
> _C0: t1p0, t1p1, t1p2, t1p3, t1p4_
> _C1: t1p5, t1p6, t1p7, t1p8, t1p9_
> Now, new consumer added: C2, so we'll do:
> # Reassign as many previously owned partitions as possible, up to the
> maxQuota
> After this phase, the assignment will be: (maxQuota will be 4)
> _C0: t1p0, t1p1, t1p2, t1p3_
> _C1: t1p5, t1p6, t1p7, t1p8_
> # Fill remaining members up to minQuota
> After this phase, the assignment will be:
> _C0: t1p0, t1p1, t1p2, t1p3_
> _C1: t1p5, t1p6, t1p7, t1p8_
> _C2: t1p4, t1p9_
> # If we ran out of unassigned partitions before filling all consumers, we
> need to start stealing partitions from the over-full consumers at max capacity
> After this phase, the assignment will be:
> _C0: t1p0, t1p1, t1p2_
> _C1: t1p5, t1p6, t1p7, t1p8_
> _C2: t1p4, t1p9,_ _t1p3_
> # Otherwise we may have run out of unfilled consumers before assigning all
> partitions, in which case we should just distribute one partition each to all
> consumers at min capacity
>
>
> As we can see, we need 3 phases to complete the assignment. But we can
> actually completed with 2 phases. Here's the updated algorithm:
> # Reassign as many previously owned partitions as possible, up to the
> maxQuota, and also considering the numMaxQuota by the remainder of
> (Partitions / Consumers)
> # Fill remaining members up to maxQuota if current maxQuotaMember <
> numMaxQuota, otherwise, to minQuota
>
> By considering the numMaxQuota, the original step 1 won't be too aggressive
> to assign too many partitions to consumers, and the step 2 won't be too
> conservative to assign not enough partitions to consumers, so that we don't
> need step 3 and step 4 to balance them.
>
> {{So, the updated Pseudo-code sketch of the algorithm:}}
> C_f := (P/N)_floor, the floor capacity
> C_c := (P/N)_ceil, the ceiling capacity
> *C_r := (P%N) the allowed number of members with C_c partitions assigned*
> *num_max_capacity_members := current number of members with C_c partitions
> assigned (default to 0)*
> members := the sorted set of all consumers
> partitions := the set of all partitions
> unassigned_partitions := the set of partitions not yet assigned, initialized
> to be all partitions
> unfilled_members := the set of consumers not yet at capacity, initialized to
> empty
> -max_capacity_members := the set of members with exactly C_c partitions
> assigned, initialized to empty-
> member.owned_partitions := the set of previously owned partitions encoded in
> the Subscription
> // Reassign as many previously owned partitions as possible, *by considering
> the num_max_capacity_members*
> for member : members
> remove any partitions that are no longer in the subscription from its
> owned partitions
> remove all owned_partitions if the generation is old
> if member.owned_partitions.size < C_f
> assign all owned partitions to member and remove from
> unassigned_partitions
> add member to unfilled_members
> -else if member.owned_partitions.size == C_f-
> -assign first C_f owned_partitions to member and remove from
> unassigned_partitions-
> else if member.owned_partitions.size >= C_c *&&
> num_max_capacity_members < C_r*
> *assign first C_c owned_partitions to member and remove from
> unassigned_partitions*
> *num_max_capacity_members++*
> a-dd member to max_capacity_members-
> *else*
> *assign first C_f owned_partitions to member and remove from
> unassigned_partitions*
> sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1,
> t1_p0 .... (for data parallelism)
> sort unfilled_members by memberId (for determinism)
> // Fill remaining members *up to the C_r numbers of C_c, otherwise, to C_f*
> for member : unfilled_members
> compute the remaining capacity as -C = C_f - num_assigned_partitions-
> if num_max_capacity_members < C_r:
> C = C_c - num_assigned_partitions
> num_max_capacity_members++
> else
> C = C_f - num_assigned_partitions
> pop the first C partitions from unassigned_partitions and assign to member
> -// Steal partitions from members with max_capacity if necessary-
> -if we run out of partitions before getting to the end of unfilled members:-
> -for member : unfilled_members-
> -poll for first member in max_capacity_members and remove one
> partition-
> -assign this partition to the unfilled member-
> -// Distribute remaining partitions, one per consumer, to fill some up to C_c
> if necessary-
> -if we run out of unfilled_members before assigning all partitions:-
> -for partition : unassigned_partitions-
> -assign to next member in members that is not in
> max_capacity_members (then add member to max_capacity_members)-
>
>
> {code:java}
> C_f := (P/N)_floor, the floor capacity
> C_c := (P/N)_ceil, the ceiling capacity
> C_r := (P%N) the allowed number of members with C_c partitions assigned
> num_max_capacity_members := current number of members with C_c partitions
> assigned (default to 0)
>
> members := the sorted set of all consumers
> partitions := the set of all partitions
> unassigned_partitions := the set of partitions not yet assigned, initialized
> to be all partitions
> unfilled_members := the set of consumers not yet at capacity, initialized to
> empty
> member.owned_partitions := the set of previously owned partitions encoded in
> the Subscription
>
> // Reassign as many previously owned partitions as possible, by considering
> the num_max_capacity_members
> for member : members
> remove any partitions that are no longer in the subscription from
> its owned partitions
> remove all owned_partitions if the generation is old
> if member.owned_partitions.size < C_f
> assign all owned partitions to member and remove from
> unassigned_partitions
> add member to unfilled_members
> else if member.owned_partitions.size >= C_c &&
> num_max_capacity_members < C_r
> assign first C_c owned_partitions to member and remove from
> unassigned_partitions
> num_max_capacity_members++
> else
> assign first C_f owned_partitions to member and remove from
> unassigned_partitions
>
> sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0,
> t0_p1, t1_p0 .... (for data parallelism)
> sort unfilled_members by memberId (for determinism)
>
> // Fill remaining members up to the C_r numbers of C_c, otherwise, to C_f
> for member : unfilled_members
> compute the remaining capacity as:
> if num_max_capacity_members < C_r:
> C = C_c - num_assigned_partitions
> num_max_capacity_members++
> else
> C = C_f - num_assigned_partitions
> pop the first C partitions from unassigned_partitions and assign to
> member{code}
>
> So, adopting the updated algorithm, the previous example will be:
> *example:*
> Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1,
> ... t1p9
> Suppose, current assignment is:
> _C0: t1p0, t1p1, t1p2, t1p3, t1p4_
> _C1: t1p5, t1p6, t1p7, t1p8, t1p9_
> Now, new consumer added: C2, so we'll do:
> # Reassign as many previously owned partitions as possible, up to the
> maxQuota
> After this phase, the assignment will be: (maxQuota will be 4, *and
> numAllowedWithMaxQuota will be 1)*
> _C0: t1p0, t1p1, t1p2, t1p3_
> _C1: t1p5, t1p6, t1p7_
> # Fill remaining members up to maxQuota if current maxQuotaMember <
> numMaxQuota, otherwise, to minQuota
> After this phase, the assignment will be:
> _C0: t1p0, t1p1, t1p2, t1p3_
> _C1: t1p5, t1p6, t1p7_
> _C2: t1p4, t1p8,_ _t1p9_
>
> *Another enhancement:*
> Currently, in phase 1, we loop through all consumerToOwnedPartitions 1 by 1,
> to do partition assignment or partition remove. However, we doesn't care each
> partition info, what we only care, is the partition number assigned. So, we
> can just use List.subList() to get the expected number of subList from the
> consumerToOwnedPartitions. It'll also improve the performance of the
> algorithm.
>
> That is:
> {code:java}
> // Reassign as many previously owned partitions as possible
> for (Map.Entry<String, List<TopicPartition>> consumerEntry :
> consumerToOwnedPartitions.entrySet()) {
> String consumer = consumerEntry.getKey();
> List<TopicPartition> ownedPartitions = consumerEntry.getValue();
> List<TopicPartition> consumerAssignment = assignment.get(consumer);
> int i = 0;
> // assign the first N partitions up to the max quota, and mark the
> remaining as being revoked
> // originally, we loop through the partitions 1 by 1
> /* for (TopicPartition tp : ownedPartitions) {
> if (i < maxQuota) {
> consumerAssignment.add(tp);
> unassignedPartitions.remove(tp);
> } else {
> allRevokedPartitions.add(tp);
> }
> ++i;
> } */
> // Enhancement: since we only care the number of partitions assigned, we
> can use subList to assign the expected number, so no need to loop through
> them all
> if (ownedPartitions.size() < minQuota) {
> // the expected assignment size is more than consumer have now, so keep
> all the owned partitions
> // and put this member into unfilled member list
> consumerAssignment.addAll(ownedPartitions);
> unassignedPartitions.removeAll(ownedPartitions);
> unfilledMembers.add(consumer);
> } else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers++ <=
> numExpectedMaxCapacityMembers) {
> // consumer owned the "maxQuota" of partitions or more, and we still
> under the number of expected max capacity members
> // so keep "maxQuota" of the owned partitions, and revoke the rest of
> the partitions
> consumerAssignment.addAll(ownedPartitions.subList(0, maxQuota));
> unassignedPartitions.removeAll(ownedPartitions.subList(0, maxQuota));
> allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota,
> ownedPartitions.size()));
> } else {
> // consumer owned the "minQuota" of partitions or more
> // so keep "minQuota" of the owned partitions, and revoke the rest of
> the partitions
> consumerAssignment.addAll(ownedPartitions.subList(0, minQuota));
> unassignedPartitions.removeAll(ownedPartitions.subList(0, minQuota));
> allRevokedPartitions.addAll(ownedPartitions.subList(minQuota,
> ownedPartitions.size()));
> }
> .....
> {code}
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)