[ https://issues.apache.org/jira/browse/KAFKA-12464?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Guozhang Wang updated KAFKA-12464: ---------------------------------- Component/s: streams > Enhance constrained sticky Assign algorithm > ------------------------------------------- > > Key: KAFKA-12464 > URL: https://issues.apache.org/jira/browse/KAFKA-12464 > Project: Kafka > Issue Type: Improvement > Components: streams > Affects Versions: 2.7.0 > Reporter: Luke Chen > Assignee: Luke Chen > Priority: Major > > In KAFKA-9987, we did a great improvement for the case when all consumers > were subscribed to same set of topics. The algorithm contains 4 phases: > # Reassign as many previously owned partitions as possible, up to the > maxQuota > # Fill remaining members up to minQuota > # If we ran out of unassigned partitions before filling all consumers, we > need to start stealing partitions from the over-full consumers at max capacity > # Otherwise we may have run out of unfilled consumers before assigning all > partitions, in which case we should just distribute one partition each to all > consumers at min capacity > > Take an example for better understanding: > *example:* > Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1, > ... t1p9 > Suppose, current assignment is: > _C0: t1p0, t1p1, t1p2, t1p3, t1p4_ > _C1: t1p5, t1p6, t1p7, t1p8, t1p9_ > Now, new consumer added: C2, so we'll do: > # Reassign as many previously owned partitions as possible, up to the > maxQuota > After this phase, the assignment will be: (maxQuota will be 4) > _C0: t1p0, t1p1, t1p2, t1p3_ > _C1: t1p5, t1p6, t1p7, t1p8_ > # Fill remaining members up to minQuota > After this phase, the assignment will be: > _C0: t1p0, t1p1, t1p2, t1p3_ > _C1: t1p5, t1p6, t1p7, t1p8_ > _C2: t1p4, t1p9_ > # If we ran out of unassigned partitions before filling all consumers, we > need to start stealing partitions from the over-full consumers at max capacity > After this phase, the assignment will be: > _C0: t1p0, t1p1, t1p2_ > _C1: t1p5, t1p6, t1p7, t1p8_ > _C2: t1p4, t1p9,_ _t1p3_ > # Otherwise we may have run out of unfilled consumers before assigning all > partitions, in which case we should just distribute one partition each to all > consumers at min capacity > > > As we can see, we need 3 phases to complete the assignment. But we can > actually completed with 2 phases. Here's the updated algorithm: > # Reassign as many previously owned partitions as possible, up to the > maxQuota, and also considering the numMaxQuota by the remainder of > (Partitions / Consumers) > # Fill remaining members up to maxQuota if current maxQuotaMember < > numMaxQuota, otherwise, to minQuota > > By considering the numMaxQuota, the original step 1 won't be too aggressive > to assign too many partitions to consumers, and the step 2 won't be too > conservative to assign not enough partitions to consumers, so that we don't > need step 3 and step 4 to balance them. > > {{So, the updated Pseudo-code sketch of the algorithm:}} > C_f := (P/N)_floor, the floor capacity > C_c := (P/N)_ceil, the ceiling capacity > *C_r := (P%N) the allowed number of members with C_c partitions assigned* > *num_max_capacity_members := current number of members with C_c partitions > assigned (default to 0)* > members := the sorted set of all consumers > partitions := the set of all partitions > unassigned_partitions := the set of partitions not yet assigned, initialized > to be all partitions > unfilled_members := the set of consumers not yet at capacity, initialized to > empty > -max_capacity_members := the set of members with exactly C_c partitions > assigned, initialized to empty- > member.owned_partitions := the set of previously owned partitions encoded in > the Subscription > // Reassign as many previously owned partitions as possible, *by considering > the num_max_capacity_members* > for member : members > remove any partitions that are no longer in the subscription from its > owned partitions > remove all owned_partitions if the generation is old > if member.owned_partitions.size < C_f > assign all owned partitions to member and remove from > unassigned_partitions > add member to unfilled_members > -else if member.owned_partitions.size == C_f- > -assign first C_f owned_partitions to member and remove from > unassigned_partitions- > else if member.owned_partitions.size >= C_c *&& > num_max_capacity_members < C_r* > *assign first C_c owned_partitions to member and remove from > unassigned_partitions* > *num_max_capacity_members++* > a-dd member to max_capacity_members- > *else* > *assign first C_f owned_partitions to member and remove from > unassigned_partitions* > sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, t0_p1, > t1_p0 .... (for data parallelism) > sort unfilled_members by memberId (for determinism) > // Fill remaining members *up to the C_r numbers of C_c, otherwise, to C_f* > for member : unfilled_members > compute the remaining capacity as -C = C_f - num_assigned_partitions- > if num_max_capacity_members < C_r: > C = C_c - num_assigned_partitions > num_max_capacity_members++ > else > C = C_f - num_assigned_partitions > pop the first C partitions from unassigned_partitions and assign to member > -// Steal partitions from members with max_capacity if necessary- > -if we run out of partitions before getting to the end of unfilled members:- > -for member : unfilled_members- > -poll for first member in max_capacity_members and remove one > partition- > -assign this partition to the unfilled member- > -// Distribute remaining partitions, one per consumer, to fill some up to C_c > if necessary- > -if we run out of unfilled_members before assigning all partitions:- > -for partition : unassigned_partitions- > -assign to next member in members that is not in > max_capacity_members (then add member to max_capacity_members)- > > > {code:java} > C_f := (P/N)_floor, the floor capacity > C_c := (P/N)_ceil, the ceiling capacity > C_r := (P%N) the allowed number of members with C_c partitions assigned > num_max_capacity_members := current number of members with C_c partitions > assigned (default to 0) > > members := the sorted set of all consumers > partitions := the set of all partitions > unassigned_partitions := the set of partitions not yet assigned, initialized > to be all partitions > unfilled_members := the set of consumers not yet at capacity, initialized to > empty > member.owned_partitions := the set of previously owned partitions encoded in > the Subscription > > // Reassign as many previously owned partitions as possible, by considering > the num_max_capacity_members > for member : members > remove any partitions that are no longer in the subscription from > its owned partitions > remove all owned_partitions if the generation is old > if member.owned_partitions.size < C_f > assign all owned partitions to member and remove from > unassigned_partitions > add member to unfilled_members > else if member.owned_partitions.size >= C_c && > num_max_capacity_members < C_r > assign first C_c owned_partitions to member and remove from > unassigned_partitions > num_max_capacity_members++ > else > assign first C_f owned_partitions to member and remove from > unassigned_partitions > > sort unassigned_partitions in partition order, ie t0_p0, t1_p0, t2_p0, > t0_p1, t1_p0 .... (for data parallelism) > sort unfilled_members by memberId (for determinism) > > // Fill remaining members up to the C_r numbers of C_c, otherwise, to C_f > for member : unfilled_members > compute the remaining capacity as: > if num_max_capacity_members < C_r: > C = C_c - num_assigned_partitions > num_max_capacity_members++ > else > C = C_f - num_assigned_partitions > pop the first C partitions from unassigned_partitions and assign to > member{code} > > So, adopting the updated algorithm, the previous example will be: > *example:* > Current status: 2 consumers (C0, C1), and 10 topic partitions: t1p0, t1p1, > ... t1p9 > Suppose, current assignment is: > _C0: t1p0, t1p1, t1p2, t1p3, t1p4_ > _C1: t1p5, t1p6, t1p7, t1p8, t1p9_ > Now, new consumer added: C2, so we'll do: > # Reassign as many previously owned partitions as possible, up to the > maxQuota > After this phase, the assignment will be: (maxQuota will be 4, *and > numAllowedWithMaxQuota will be 1)* > _C0: t1p0, t1p1, t1p2, t1p3_ > _C1: t1p5, t1p6, t1p7_ > # Fill remaining members up to maxQuota if current maxQuotaMember < > numMaxQuota, otherwise, to minQuota > After this phase, the assignment will be: > _C0: t1p0, t1p1, t1p2, t1p3_ > _C1: t1p5, t1p6, t1p7_ > _C2: t1p4, t1p8,_ _t1p9_ > > *Another enhancement:* > Currently, in phase 1, we loop through all consumerToOwnedPartitions 1 by 1, > to do partition assignment or partition remove. However, we doesn't care each > partition info, what we only care, is the partition number assigned. So, we > can just use List.subList() to get the expected number of subList from the > consumerToOwnedPartitions. It'll also improve the performance of the > algorithm. > > That is: > {code:java} > // Reassign as many previously owned partitions as possible > for (Map.Entry<String, List<TopicPartition>> consumerEntry : > consumerToOwnedPartitions.entrySet()) { > String consumer = consumerEntry.getKey(); > List<TopicPartition> ownedPartitions = consumerEntry.getValue(); > List<TopicPartition> consumerAssignment = assignment.get(consumer); > int i = 0; > // assign the first N partitions up to the max quota, and mark the > remaining as being revoked > // originally, we loop through the partitions 1 by 1 > /* for (TopicPartition tp : ownedPartitions) { > if (i < maxQuota) { > consumerAssignment.add(tp); > unassignedPartitions.remove(tp); > } else { > allRevokedPartitions.add(tp); > } > ++i; > } */ > // Enhancement: since we only care the number of partitions assigned, we > can use subList to assign the expected number, so no need to loop through > them all > if (ownedPartitions.size() >= maxQuota) { > consumerAssignment.addAll(ownedPartitions.subList(0, maxQuota)); > > unassignedPartitions.removeAll(ownedPartitions.subList(maxQuota,ownedPartitions.size()); > } .... > ..... > {code} > -- This message was sent by Atlassian Jira (v8.3.4#803005)