dajac commented on code in PR #14481: URL: https://github.com/apache/kafka/pull/14481#discussion_r1397600750
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java: ########## @@ -14,17 +14,891 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - package org.apache.kafka.coordinator.group.assignor; +import org.apache.kafka.common.Uuid; +import org.apache.kafka.server.common.TopicIdPartition; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.TreeSet; +import java.util.stream.Collectors; + +/** + * The general uniform assignment builder is used to generate the target assignment for a consumer group with + * at least one of its members subscribed to a different set of topics. + * + * Assignments are done according to the following principles: + * + * <li> Balance: Ensure partitions are distributed equally among all members. + * The difference in assignments sizes between any two members + * should not exceed one partition. </li> + * <li> Rack Matching: When feasible, aim to assign partitions to members + * located on the same rack thus avoiding cross-zone traffic. </li> + * <li> Stickiness: Minimize partition movements among members by retaining + * as much of the existing assignment as possible. </li> + * + * This assignment builder prioritizes the above properties in the following order: + * Balance > Rack Matching > Stickiness. + */ public class GeneralUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { private static final Logger LOG = LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class); + /** + * The member metadata obtained from the assignment specification. + */ + private final Map<String, AssignmentMemberSpec> members; + + /** + * The topic and partition metadata describer. + */ + private final SubscribedTopicDescriber subscribedTopicDescriber; + + /** + * The list of all the topic Ids that the consumer group is subscribed to. + */ + private final Set<Uuid> subscriptionIds; + + /** + * Rack information. + */ + private final RackInfo rackInfo; + + /** + * List of subscribed members for each topic. + */ + private final Map<Uuid, List<String>> membersPerTopic; + + /** + * The partitions that still need to be assigned. + */ + private final Set<TopicIdPartition> unassignedPartitions; + + /** + * All the partitions that have been retained from the existing assignment. + */ + private final Set<TopicIdPartition> assignedStickyPartitions; + + /** + * Manages assignments to members based on their current assignment size and maximum allowed assignment size. + */ + private final AssignmentManager assignmentManager; + + /** + * List of all the members sorted by their respective assignment sizes. + */ + private final TreeSet<String> sortedMembersByAssignmentSize; + + /** + * Tracks the owner of each partition in the existing assignment on the client. + * + * Only populated when rack aware strategy is used. + * Contains partitions that weren't retained due to a rack mismatch. + */ + private final Map<TopicIdPartition, String> currentPartitionOwners; + + /** + * Tracks the owner of each partition in the target assignment. + */ + private final Map<TopicIdPartition, String> partitionOwnerInTargetAssignment; + + /** + * Handles all operations related to partition movements during a reassignment for balancing the target assignment. + */ + private PartitionMovements partitionMovements; + + /** + * The new assignment that will be returned. + */ + private final Map<String, MemberAssignment> targetAssignment; + + public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, SubscribedTopicDescriber subscribedTopicDescriber) { + this.members = assignmentSpec.members(); + this.subscribedTopicDescriber = subscribedTopicDescriber; + this.subscriptionIds = assignmentSpec.members().values().stream() + .flatMap(memberSpec -> memberSpec.subscribedTopicIds().stream()) + .peek(topicId -> { + int partitionCount = subscribedTopicDescriber.numPartitions(topicId); + if (partitionCount == -1) { + throw new PartitionAssignorException( + "Members are subscribed to topic " + topicId + " which doesn't exist in the topic metadata." + ); + } + }) + .collect(Collectors.toSet()); + this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, subscriptionIds); + this.membersPerTopic = new HashMap<>(); + members.forEach((memberId, memberMetadata) -> { + Collection<Uuid> topics = memberMetadata.subscribedTopicIds(); + topics.forEach(topicId -> + membersPerTopic.computeIfAbsent(topicId, k -> new ArrayList<>()).add(memberId) + ); + }); + this.unassignedPartitions = new HashSet<>(topicIdPartitions(subscriptionIds, subscribedTopicDescriber)); + this.assignedStickyPartitions = new HashSet<>(); + this.assignmentManager = new AssignmentManager(); + this.sortedMembersByAssignmentSize = assignmentManager.getSortedMembersByAssignmentSize(members.keySet()); + this.currentPartitionOwners = new HashMap<>(); + this.partitionOwnerInTargetAssignment = new HashMap<>(); + this.targetAssignment = new HashMap<>(); + } + + /** + * Here's the step-by-step breakdown of the assignment process: + * + * <li> Retain partitions from the existing assignments a.k.a sticky partitions. </li> + * <ul><li> If a partition's rack mismatches with its owner, track it for future use. </li></ul> + * <li> If rack aware strategy is possible, allocate unassigned partitions to members in the same rack. </li> + * <li> Allocate all the remaining unassigned partitions to the members in a balanced manner. If possible, allocate + * the partition back to it's existing owner in case it was not retained earlier due to a rack mismatch. </li> + * <li> Iterate through the assignment until it is balanced. </li> + */ @Override protected GroupAssignment buildAssignment() { - return null; + if (subscriptionIds.isEmpty()) { + LOG.info("The subscription list is empty, returning an empty assignment"); + return new GroupAssignment(Collections.emptyMap()); + } + + members.keySet().forEach(memberId -> targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()))); + partitionMovements = new PartitionMovements(); + + // When rack awareness is enabled, only sticky partitions with matching rack are retained. + // Otherwise, all existing partitions are retained until max assignment size. + assignStickyPartitions(); + + if (rackInfo.useRackStrategy) rackAwarePartitionAssignment(); + unassignedPartitionsAssignment(); + + balance(); + + return new GroupAssignment(targetAssignment); + } + + /** + * <li> TopicIdPartitions are sorted in descending order based on the value: + * totalPartitions/number of subscribed members. </li> + * <li> If the above value is the same then topicIdPartitions are sorted in + * ascending order of number of subscribers. </li> + * <li> If both criteria are the same, sort in ascending order of the partition Id. + * This last criteria is for predictability of the assignments. </li> + * + * @param topicIdPartitions The topic partitions that need to be sorted. + * @return A list of sorted topic partitions. + */ + private List<TopicIdPartition> sortTopicIdPartitions(Collection<TopicIdPartition> topicIdPartitions) { + Comparator<TopicIdPartition> comparator = Comparator + .comparingDouble((TopicIdPartition topicIdPartition) -> { + int totalPartitions = subscribedTopicDescriber.numPartitions(topicIdPartition.topicId()); + int totalSubscribers = membersPerTopic.get(topicIdPartition.topicId()).size(); + return (double) totalPartitions / totalSubscribers; + }) + .reversed() + .thenComparingInt(topicIdPartition -> membersPerTopic.get(topicIdPartition.topicId()).size()) + .thenComparingInt(TopicIdPartition::partitionId); + + return topicIdPartitions.stream() + .sorted(comparator) + .collect(Collectors.toList()); + } + + /** + * Gets a set of partitions that are to be retained from the existing assignment. This includes: + * <li> Partitions from topics that are still present in both the new subscriptions and the topic metadata. </li> + * <li> When using a rack-aware strategy, only partitions with member owners in the same rack are retained. </li> + * <li> Track current partition owners when there is a rack mismatch. </li> + */ + private void assignStickyPartitions() { + members.forEach((memberId, assignmentMemberSpec) -> + assignmentMemberSpec.assignedPartitions().forEach((topicId, currentAssignment) -> { + if (assignmentMemberSpec.subscribedTopicIds().contains(topicId)) { + currentAssignment.forEach(partition -> { + TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partition); + if (rackInfo.useRackStrategy && rackInfo.racksMismatch(memberId, topicIdPartition)) { + currentPartitionOwners.put(topicIdPartition, memberId); + } else { + assignmentManager.maybeAssignPartitionToMember(topicIdPartition, memberId); + assignedStickyPartitions.add(topicIdPartition); + } + }); + } else { + LOG.debug("The topic " + topicId + " is no longer present in the subscribed topics list"); + } + }) + ); + } + + /** + * Allocates the unassigned partitions to members in the same rack, if available. + */ + private void rackAwarePartitionAssignment() { + // Sort partitions in ascending order by the number of potential members with matching racks. + // Only partitions with potential members in the same rack are returned. + List<TopicIdPartition> sortedPartitions = rackInfo.sortPartitionsByRackMembers(unassignedPartitions); + + sortedPartitions.forEach(partition -> { + List<String> sortedMembersByAssignmentSize = rackInfo.getSortedMembersWithMatchingRack( + partition, + targetAssignment + ); + + for (String memberId : sortedMembersByAssignmentSize) { + if (assignmentManager.maybeAssignPartitionToMember(partition, memberId)) + break; + } + }); + } + + /** + * Allocates the remaining unassigned partitions to members in a balanced manner. + * <li> Partitions are sorted to maximize the probability of a balanced assignment. </li> + * <li> If there was an assignment that wasn't retained due to a rack mismatch, + * check if the partition can retain its existing assignment. </li> + * <li> Sort members in ascending order of their current target assignment sizes + * to ensure the least filled member gets the partition first. </li> + */ + private void unassignedPartitionsAssignment() { + List<TopicIdPartition> sortedPartitions = sortTopicIdPartitions(unassignedPartitions); + + for (TopicIdPartition partition : sortedPartitions) { + if (rackInfo.useRackStrategy && currentPartitionOwners.containsKey(partition)) { + String prevOwner = currentPartitionOwners.get(partition); + if (assignmentManager.maybeAssignPartitionToMember(partition, prevOwner)) { + continue; + } + } + + TreeSet<String> sortedMembers = assignmentManager.getSortedMembersByAssignmentSize( + membersPerTopic.get(partition.topicId())); + + for (String member : sortedMembers) { + if (assignmentManager.maybeAssignPartitionToMember(partition, member)) { + break; + } + } + } + } + + /** + * If a topic has two or more potential members it is subject to reassignment. + * + * @return true if the topic can participate in reassignment, false otherwise. + */ + private boolean canTopicParticipateInReassignment(Uuid topicId) { + return membersPerTopic.get(topicId).size() >= 2; + } + + /** + * If a member is not assigned all its potential partitions it is subject to reassignment. + * If any of the partitions assigned to a member is subject to reassignment, the member itself + * is subject to reassignment. + * + * @return true if the member can participate in reassignment, false otherwise. + */ + private boolean canMemberParticipateInReassignment(String memberId) { + Set<Uuid> assignedTopicIds = targetAssignment.get(memberId).targetPartitions().keySet(); + + int currentAssignmentSize = assignmentManager.targetAssignmentSize(memberId); + int maxAssignmentSize = assignmentManager.maxAssignmentSize(memberId); + + if (currentAssignmentSize > maxAssignmentSize) + LOG.error("The member {} is assigned more partitions than the maximum possible.", memberId); + + if (currentAssignmentSize < maxAssignmentSize) + return true; + + for (Uuid topicId: assignedTopicIds) { + if (canTopicParticipateInReassignment(topicId)) + return true; + } + return false; + } + + /** + * Determine if the current assignment is a balanced one. + * + * @return true if the given assignment is balanced; false otherwise. + */ + private boolean isBalanced() { + int min = assignmentManager.targetAssignmentSize(sortedMembersByAssignmentSize.first()); + int max = assignmentManager.targetAssignmentSize(sortedMembersByAssignmentSize.last()); + + // If minimum and maximum numbers of partitions assigned to consumers differ by at most one return true. + if (min >= max - 1) + return true; + + // For each member that does not have all the topic partitions it can get make sure none of the + // topic partitions it could but did not get cannot be moved to it (because that would break the balance). + // Members with the least assignment sizes are checked first to see if they can receive any more partitions. + for (String member: sortedMembersByAssignmentSize) { + int memberPartitionCount = assignmentManager.targetAssignmentSize(member); + + // Skip if this member already has all the topic partitions it can get. + List<Uuid> allSubscribedTopics = new ArrayList<>(members.get(member).subscribedTopicIds()); + int maxAssignmentSize = assignmentManager.maxAssignmentSize(member); + + if (memberPartitionCount == maxAssignmentSize) + continue; + + // Otherwise make sure it cannot get any more partitions. + for (Uuid topicId: allSubscribedTopics) { + Set<Integer> assignedPartitions = targetAssignment.get(member).targetPartitions().get(topicId); + for (int i = 0; i < subscribedTopicDescriber.numPartitions(topicId); i++) { + TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, i); + if (assignedPartitions == null || !assignedPartitions.contains(i)) { + String otherMember = partitionOwnerInTargetAssignment.get(topicIdPartition); + int otherMemberPartitionCount = assignmentManager.targetAssignmentSize(otherMember); + if (memberPartitionCount + 1 < otherMemberPartitionCount) { + LOG.debug("{} can be moved from member {} to member {} for a more balanced assignment.", + topicIdPartition, otherMember, member); + return false; + } + } + } + } + } + return true; + } + + /** + * Balance the current assignment using the data structures created in the assignPartitions(...) method above. + */ + private void balance() { + if (!unassignedPartitions.isEmpty()) unassignedPartitionsAssignment(); + // Refill unassigned partitions will all the topicId partitions. + unassignedPartitions.addAll(topicIdPartitions(subscriptionIds, subscribedTopicDescriber)); + + // Narrow down the reassignment scope to only those partitions that can actually be reassigned. + Set<TopicIdPartition> fixedPartitions = new HashSet<>(); + for (Uuid topicId: subscriptionIds) { + if (!canTopicParticipateInReassignment(topicId)) { + for (int i = 0; i < subscribedTopicDescriber.numPartitions(topicId); i++) { + fixedPartitions.add(new TopicIdPartition(topicId, i)); + } + } + } + unassignedPartitions.removeAll(fixedPartitions); + + // Narrow down the reassignment scope to only those members that are subject to reassignment. + for (String member: members.keySet()) { + if (!canMemberParticipateInReassignment(member)) { + sortedMembersByAssignmentSize.remove(member); + } + } + + // If all the partitions are fixed i.e. unassigned partitions is empty there is no point of re-balancing. + if (!unassignedPartitions.isEmpty() && !isBalanced()) performReassignments(); + } + + private void performReassignments() { + boolean modified; + boolean reassignmentOccurred; + // Repeat reassignment until no partition can be moved to improve the balance. + do { + // Before re-starting the round of reassignments check if the assignment is already balanced. + if (isBalanced()) break; Review Comment: `isBalanced` seems quite heavy so it would be better to avoid this. I wonder if we could just add it to the condition of the do..while loop. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org