dajac commented on code in PR #16088: URL: https://github.com/apache/kafka/pull/16088#discussion_r1622321107
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java: ########## @@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws PartitionAssignorException { } } - // The minimum required quota that each member needs to meet for a balanced assignment. - // This is the same for all members. - final int numberOfMembers = groupSpec.members().size(); - final int minQuota = totalPartitionsCount / numberOfMembers; + // Compute the minimum required quota per member and the number of members + // who should receive an extra partition. + int numberOfMembers = groupSpec.members().size(); + minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; - groupSpec.members().keySet().forEach(memberId -> - targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) - )); - - potentiallyUnfilledMembers = assignStickyPartitions(minQuota); - - unassignedPartitionsRoundRobinAssignment(); + // Revoke the partitions which are either not part of the subscriptions or above + // the maximum quota. + maybeRevokePartitions(); - if (!unassignedPartitions.isEmpty()) { - throw new PartitionAssignorException("Partitions were left unassigned"); - } + // Assign the unassigned partitions to the members with space. + assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } - /** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. - * - * <p> For each member: - * <ol> - * <li> Find the valid current assignment considering topic subscriptions and metadata</li> - * <li> If the current assignment exists, retain partitions up to the minimum quota.</li> - * <li> If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well.</li> - * <li> Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota.</li> - * </ol> - * </p> - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. - */ - private Map<String, Integer> assignStickyPartitions(int minQuota) { - Map<String, Integer> potentiallyUnfilledMembers = new HashMap<>(); - - groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { - List<TopicIdPartition> validCurrentMemberAssignment = validCurrentMemberAssignment( - assignmentMemberSpec.assignedPartitions() - ); - - int currentAssignmentSize = validCurrentMemberAssignment.size(); - // Number of partitions required to meet the minimum quota. - int remaining = minQuota - currentAssignmentSize; - - if (currentAssignmentSize > 0) { - int retainedPartitionsCount = min(currentAssignmentSize, minQuota); - IntStream.range(0, retainedPartitionsCount).forEach(i -> { - TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); - addPartitionToAssignment( - targetAssignment, - memberId, - topicIdPartition.topicId(), - topicIdPartition.partitionId() - ); - }); - - if (remaining < 0) { - // The extra partition is located at the last index from the previous step. - if (remainingMembersToGetAnExtraPartition > 0) { - TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); - addPartitionToAssignment( - targetAssignment, - memberId, - topicIdPartition.topicId(), - topicIdPartition.partitionId() - ); - remainingMembersToGetAnExtraPartition--; + private void maybeRevokePartitions() { + for (Map.Entry<String, AssignmentMemberSpec> entry : groupSpec.members().entrySet()) { + String memberId = entry.getKey(); + AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); + Map<Uuid, Set<Integer>> oldAssignment = assignmentMemberSpec.assignedPartitions(); + Map<Uuid, Set<Integer>> newAssignment = null; + + // The assignor expects to receive the assignment as an immutable map. It leverages + // this knowledge in order to avoid having to copy all assignments. + if (!isImmutableMap(oldAssignment)) { + throw new IllegalStateException("The assignor expect an immutable map."); + } + + int quota = minimumMemberQuota; + if (remainingMembersToGetAnExtraPartition > 0) { + quota++; + remainingMembersToGetAnExtraPartition--; + } + + for (Map.Entry<Uuid, Set<Integer>> topicPartitions : oldAssignment.entrySet()) { + Uuid topicId = topicPartitions.getKey(); + Set<Integer> partitions = topicPartitions.getValue(); + + if (subscribedTopicIds.contains(topicId)) { + if (partitions.size() <= quota) { + quota -= partitions.size(); + } else { + for (Integer partition : partitions) { + if (quota > 0) { + quota--; + } else { + if (newAssignment == null) { + // If the new assignment is null, we create a deep copy of the + // original assignment so that we can alter it. + newAssignment = deepCopy(oldAssignment); + } + // Remove the partition from the new assignment. + Set<Integer> parts = newAssignment.get(topicId); Review Comment: `partitions` is already defined at L173 so we cannot reuse it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org