dajac commented on code in PR #16088:
URL: https://github.com/apache/kafka/pull/16088#discussion_r1622321107


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java:
##########
@@ -144,216 +133,137 @@ protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
             }
         }
 
-        // The minimum required quota that each member needs to meet for a 
balanced assignment.
-        // This is the same for all members.
-        final int numberOfMembers = groupSpec.members().size();
-        final int minQuota = totalPartitionsCount / numberOfMembers;
+        // Compute the minimum required quota per member and the number of 
members
+        // who should receive an extra partition.
+        int numberOfMembers = groupSpec.members().size();
+        minimumMemberQuota = totalPartitionsCount / numberOfMembers;
         remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-        groupSpec.members().keySet().forEach(memberId ->
-            targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-        ));
-
-        potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
-
-        unassignedPartitionsRoundRobinAssignment();
+        // Revoke the partitions which are either not part of the 
subscriptions or above
+        // the maximum quota.
+        maybeRevokePartitions();
 
-        if (!unassignedPartitions.isEmpty()) {
-            throw new PartitionAssignorException("Partitions were left 
unassigned");
-        }
+        // Assign the unassigned partitions to the members with space.
+        assignRemainingPartitions();
 
         return new GroupAssignment(targetAssignment);
     }
 
-    /**
-     * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
-     * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
-     *
-     * <p> For each member:
-     * <ol>
-     *     <li> Find the valid current assignment considering topic 
subscriptions and metadata</li>
-     *     <li> If the current assignment exists, retain partitions up to the 
minimum quota.</li>
-     *     <li> If the current assignment size is greater than the minimum 
quota and
-     *          there are members that could get an extra partition, assign 
the next partition as well.</li>
-     *     <li> Finally, if the member's current assignment size is less than 
the minimum quota,
-     *          add them to the potentially unfilled members map and track the 
number of remaining
-     *          partitions required to meet the quota.</li>
-     * </ol>
-     * </p>
-     *
-     * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
-     *          including members that are eligible to receive an extra 
partition.
-     */
-    private Map<String, Integer> assignStickyPartitions(int minQuota) {
-        Map<String, Integer> potentiallyUnfilledMembers = new HashMap<>();
-
-        groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-            List<TopicIdPartition> validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-                assignmentMemberSpec.assignedPartitions()
-            );
-
-            int currentAssignmentSize = validCurrentMemberAssignment.size();
-            // Number of partitions required to meet the minimum quota.
-            int remaining = minQuota - currentAssignmentSize;
-
-            if (currentAssignmentSize > 0) {
-                int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-                IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-                    TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-                    addPartitionToAssignment(
-                        targetAssignment,
-                        memberId,
-                        topicIdPartition.topicId(),
-                        topicIdPartition.partitionId()
-                    );
-                });
-
-                if (remaining < 0) {
-                    // The extra partition is located at the last index from 
the previous step.
-                    if (remainingMembersToGetAnExtraPartition > 0) {
-                        TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-                        addPartitionToAssignment(
-                            targetAssignment,
-                            memberId,
-                            topicIdPartition.topicId(),
-                            topicIdPartition.partitionId()
-                        );
-                        remainingMembersToGetAnExtraPartition--;
+    private void maybeRevokePartitions() {
+        for (Map.Entry<String, AssignmentMemberSpec> entry : 
groupSpec.members().entrySet()) {
+            String memberId = entry.getKey();
+            AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+            Map<Uuid, Set<Integer>> oldAssignment = 
assignmentMemberSpec.assignedPartitions();
+            Map<Uuid, Set<Integer>> newAssignment = null;
+
+            // The assignor expects to receive the assignment as an immutable 
map. It leverages
+            // this knowledge in order to avoid having to copy all assignments.
+            if (!isImmutableMap(oldAssignment)) {
+                throw new IllegalStateException("The assignor expect an 
immutable map.");
+            }
+
+            int quota = minimumMemberQuota;
+            if (remainingMembersToGetAnExtraPartition > 0) {
+                quota++;
+                remainingMembersToGetAnExtraPartition--;
+            }
+
+            for (Map.Entry<Uuid, Set<Integer>> topicPartitions : 
oldAssignment.entrySet()) {
+                Uuid topicId = topicPartitions.getKey();
+                Set<Integer> partitions = topicPartitions.getValue();
+
+                if (subscribedTopicIds.contains(topicId)) {
+                    if (partitions.size() <= quota) {
+                        quota -= partitions.size();
+                    } else {
+                        for (Integer partition : partitions) {
+                            if (quota > 0) {
+                                quota--;
+                            } else {
+                                if (newAssignment == null) {
+                                    // If the new assignment is null, we 
create a deep copy of the
+                                    // original assignment so that we can 
alter it.
+                                    newAssignment = deepCopy(oldAssignment);
+                                }
+                                // Remove the partition from the new 
assignment.
+                                Set<Integer> parts = 
newAssignment.get(topicId);

Review Comment:
   `partitions` is already defined at L173 so we cannot reuse it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to