This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 92ed1ed5869 KAFKA-16864; Optimize uniform (homogenous) assignor 
(#16088)
92ed1ed5869 is described below

commit 92ed1ed58694d1df9df386a286bb856dbefb683a
Author: David Jacot <dja...@confluent.io>
AuthorDate: Fri May 31 22:17:59 2024 +0200

    KAFKA-16864; Optimize uniform (homogenous) assignor (#16088)
    
    This patch optimizes uniform (homogenous) assignor by avoiding creating a 
copy of all the assignments. Instead, the assignor creates a copy only if the 
assignment is updated. It is a sort of copy-on-write. This change reduces the 
overhead of the TargetAssignmentBuilder when ran with the uniform (homogenous) 
assignor.
    
    Trunk:
    
    ```
    Benchmark                                     (memberCount)  
(partitionsToMemberRatio)  (topicCount)  Mode  Cnt   Score   Error  Units
    TargetAssignmentBuilderBenchmark.build                10000                 
        10           100  avgt    5  24.535 ± 1.583  ms/op
    TargetAssignmentBuilderBenchmark.build                10000                 
        10          1000  avgt    5  24.094 ± 0.223  ms/op
    JMH benchmarks done
    ```
    
    ```
    Benchmark                                       (assignmentType)  
(assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  
(subscriptionType)  (topicCount)  Mode  Cnt   Score   Error  Units
    ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         
UNIFORM          false          10000                         10         
HOMOGENEOUS           100  avgt    5  14.697 ± 0.133  ms/op
    ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         
UNIFORM          false          10000                         10         
HOMOGENEOUS          1000  avgt    5  15.073 ± 0.135  ms/op
    JMH benchmarks done
    ```
    
    Patch:
    
    ```
    Benchmark                                     (memberCount)  
(partitionsToMemberRatio)  (topicCount)  Mode  Cnt  Score   Error  Units
    TargetAssignmentBuilderBenchmark.build                10000                 
        10           100  avgt    5  3.376 ± 0.577  ms/op
    TargetAssignmentBuilderBenchmark.build                10000                 
        10          1000  avgt    5  3.731 ± 0.359  ms/op
    JMH benchmarks done
    ```
    
    ```
    Benchmark                                       (assignmentType)  
(assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  
(subscriptionType)  (topicCount)  Mode  Cnt  Score   Error  Units
    ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         
UNIFORM          false          10000                         10         
HOMOGENEOUS           100  avgt    5  1.975 ± 0.086  ms/op
    ServerSideAssignorBenchmark.doAssignment             INCREMENTAL         
UNIFORM          false          10000                         10         
HOMOGENEOUS          1000  avgt    5  2.026 ± 0.190  ms/op
    JMH benchmarks done
    ```
    
    Reviewers: Ritika Reddy <rre...@confluent.io>, Jeff Kim 
<jeff....@confluent.io>, Justine Olshan <jols...@confluent.io>
---
 .../OptimizedUniformAssignmentBuilder.java         | 370 ++++++++-------------
 .../group/assignor/UniformAssignor.java            |  10 +-
 .../coordinator/group/AssignmentTestUtil.java      |  17 +-
 .../group/CoordinatorRecordHelpersTest.java        |  18 +-
 .../OptimizedUniformAssignmentBuilderTest.java     | 123 +++----
 .../jmh/assignor/ServerSideAssignorBenchmark.java  |   2 +-
 .../assignor/TargetAssignmentBuilderBenchmark.java |   5 +-
 7 files changed, 223 insertions(+), 322 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java
index 3ea1361d699..34e82566520 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java
@@ -18,29 +18,18 @@ package org.apache.kafka.coordinator.group.assignor;
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.server.common.TopicIdPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
 import java.util.Set;
-import java.util.stream.Collectors;
-import java.util.stream.IntStream;
-
-import static java.lang.Math.min;
 
 /**
- * The optimized uniform assignment builder is used to generate the target 
assignment for a consumer group with
+ * The homogenous uniform assignment builder is used to generate the target 
assignment for a consumer group with
  * all its members subscribed to the same set of topics.
- * It is optimized since the assignment can be done in fewer, less complicated 
steps compared to when
- * the subscriptions are different across the members.
  *
  * Assignments are done according to the following principles:
  *
@@ -53,8 +42,17 @@ import static java.lang.Math.min;
  * The assignment builder prioritizes the properties in the following order:
  *      Balance > Stickiness.
  */
-public class OptimizedUniformAssignmentBuilder extends 
AbstractUniformAssignmentBuilder {
-    private static final Logger LOG = 
LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class);
+public class OptimizedUniformAssignmentBuilder {
+    private static final Class<?> UNMODIFIABLE_MAP_CLASS = 
Collections.unmodifiableMap(new HashMap<>()).getClass();
+    private static final Class<?> EMPTY_MAP_CLASS = 
Collections.emptyMap().getClass();
+
+    /**
+     * @return True if the provided map is an UnmodifiableMap or EmptyMap. 
Those classes are not
+     * public hence we cannot use the `instanceof` operator.
+     */
+    private static boolean isImmutableMap(Map<?, ?> map) {
+        return UNMODIFIABLE_MAP_CLASS.isInstance(map) || 
EMPTY_MAP_CLASS.isInstance(map);
+    }
 
     /**
      * The assignment specification which includes member metadata.
@@ -72,62 +70,53 @@ public class OptimizedUniformAssignmentBuilder extends 
AbstractUniformAssignment
     private final Set<Uuid> subscribedTopicIds;
 
     /**
-     * The number of members to receive an extra partition beyond the minimum 
quota.
-     * Minimum Quota = Total Partitions / Total Members
-     * Example: If there are 11 partitions to be distributed among 3 members,
-     *          each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 
3) members get an extra partition.
+     * The members that are below their quota.
      */
-    private int remainingMembersToGetAnExtraPartition;
-
-    /**
-     * Members mapped to the remaining number of partitions needed to meet the 
minimum quota.
-     * Minimum quota = total partitions / total members.
-     */
-    private Map<String, Integer> potentiallyUnfilledMembers;
+    private final List<MemberWithRemainingQuota> unfilledMembers;
 
     /**
      * The partitions that still need to be assigned.
      * Initially this contains all the subscribed topics' partitions.
      */
-    private final Set<TopicIdPartition> unassignedPartitions;
+    private final List<TopicIdPartition> unassignedPartitions;
 
     /**
      * The target assignment.
      */
     private final Map<String, MemberAssignment> targetAssignment;
 
+    /**
+     * The minimum number of partitions that a member must have.
+     * Minimum quota = total partitions / total members.
+     */
+    private int minimumMemberQuota;
+
+    /**
+     * The number of members to receive an extra partition beyond the minimum 
quota.
+     * Example: If there are 11 partitions to be distributed among 3 members,
+     *          each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 
3) members get an extra partition.
+     */
+    private int remainingMembersToGetAnExtraPartition;
+
     OptimizedUniformAssignmentBuilder(GroupSpec groupSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
         this.groupSpec = groupSpec;
         this.subscribedTopicDescriber = subscribedTopicDescriber;
         this.subscribedTopicIds = new 
HashSet<>(groupSpec.members().values().iterator().next().subscribedTopicIds());
-        this.potentiallyUnfilledMembers = new HashMap<>();
-        this.unassignedPartitions = new HashSet<>();
+        this.unfilledMembers = new ArrayList<>();
+        this.unassignedPartitions = new ArrayList<>();
         this.targetAssignment = new HashMap<>();
     }
 
     /**
-     * Here's the step-by-step breakdown of the assignment process:
-     *
-     * <li> Compute the quotas of partitions for each member based on the 
total partitions and member count.</li>
-     * <li> Initialize unassigned partitions with all the topic partitions 
that aren't present in the
-     *      current target assignment.</li>
-     * <li> For existing assignments, retain partitions based on the 
determined quota. Add extras to unassigned partitions.</li>
-     * <li> Identify members that haven't fulfilled their partition quota or 
are eligible to receive extra partitions.</li>
-     * <li> Proceed with a round-robin assignment according to quotas.
-     *      For each unassigned partition, locate the first compatible member 
from the potentially unfilled list.</li>
+     * Compute the new assignment for the group.
      */
-    @Override
-    protected GroupAssignment buildAssignment() throws 
PartitionAssignorException {
-        int totalPartitionsCount = 0;
-
+    public GroupAssignment build() throws PartitionAssignorException {
         if (subscribedTopicIds.isEmpty()) {
-            LOG.debug("The subscription list is empty, returning an empty 
assignment");
             return new GroupAssignment(Collections.emptyMap());
         }
 
-        // Check if the subscribed topicId is still valid.
-        // Update unassigned partitions based on the current target assignment
-        // and topic metadata.
+        // Compute the list of unassigned partitions.
+        int totalPartitionsCount = 0;
         for (Uuid topicId : subscribedTopicIds) {
             int partitionCount = 
subscribedTopicDescriber.numPartitions(topicId);
             if (partitionCount == -1) {
@@ -144,216 +133,149 @@ public class OptimizedUniformAssignmentBuilder extends 
AbstractUniformAssignment
             }
         }
 
-        // The minimum required quota that each member needs to meet for a 
balanced assignment.
-        // This is the same for all members.
-        final int numberOfMembers = groupSpec.members().size();
-        final int minQuota = totalPartitionsCount / numberOfMembers;
+        // Compute the minimum required quota per member and the number of 
members
+        // that should receive an extra partition.
+        int numberOfMembers = groupSpec.members().size();
+        minimumMemberQuota = totalPartitionsCount / numberOfMembers;
         remainingMembersToGetAnExtraPartition = totalPartitionsCount % 
numberOfMembers;
 
-        groupSpec.members().keySet().forEach(memberId ->
-            targetAssignment.put(memberId, new MemberAssignment(new 
HashMap<>())
-        ));
-
-        potentiallyUnfilledMembers = assignStickyPartitions(minQuota);
+        // Revoke the partitions that either are not part of the member's 
subscriptions or
+        // exceed the maximum quota assigned to each member.
+        maybeRevokePartitions();
 
-        unassignedPartitionsRoundRobinAssignment();
-
-        if (!unassignedPartitions.isEmpty()) {
-            throw new PartitionAssignorException("Partitions were left 
unassigned");
-        }
+        // Assign the unassigned partitions to the members with space.
+        assignRemainingPartitions();
 
         return new GroupAssignment(targetAssignment);
     }
 
     /**
-     * Retains a set of partitions from the existing assignment and includes 
them in the target assignment.
-     * Only relevant partitions that exist in the current topic metadata and 
subscriptions are considered.
+     * Revoke the partitions that either are not part of the member's 
subscriptions or
+     * exceed the maximum quota assigned to each member.
      *
-     * <p> For each member:
-     * <ol>
-     *     <li> Find the valid current assignment considering topic 
subscriptions and metadata</li>
-     *     <li> If the current assignment exists, retain partitions up to the 
minimum quota.</li>
-     *     <li> If the current assignment size is greater than the minimum 
quota and
-     *          there are members that could get an extra partition, assign 
the next partition as well.</li>
-     *     <li> Finally, if the member's current assignment size is less than 
the minimum quota,
-     *          add them to the potentially unfilled members map and track the 
number of remaining
-     *          partitions required to meet the quota.</li>
-     * </ol>
-     * </p>
-     *
-     * @return  Members mapped to the remaining number of partitions needed to 
meet the minimum quota,
-     *          including members that are eligible to receive an extra 
partition.
+     * This method ensures that the original assignment is not copied if it is 
not
+     * altered.
      */
-    private Map<String, Integer> assignStickyPartitions(int minQuota) {
-        Map<String, Integer> potentiallyUnfilledMembers = new HashMap<>();
-
-        groupSpec.members().forEach((memberId, assignmentMemberSpec) -> {
-            List<TopicIdPartition> validCurrentMemberAssignment = 
validCurrentMemberAssignment(
-                assignmentMemberSpec.assignedPartitions()
-            );
-
-            int currentAssignmentSize = validCurrentMemberAssignment.size();
-            // Number of partitions required to meet the minimum quota.
-            int remaining = minQuota - currentAssignmentSize;
-
-            if (currentAssignmentSize > 0) {
-                int retainedPartitionsCount = min(currentAssignmentSize, 
minQuota);
-                IntStream.range(0, retainedPartitionsCount).forEach(i -> {
-                    TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(i);
-                    addPartitionToAssignment(
-                        targetAssignment,
-                        memberId,
-                        topicIdPartition.topicId(),
-                        topicIdPartition.partitionId()
-                    );
-                });
-
-                if (remaining < 0) {
-                    // The extra partition is located at the last index from 
the previous step.
-                    if (remainingMembersToGetAnExtraPartition > 0) {
-                        TopicIdPartition topicIdPartition = 
validCurrentMemberAssignment.get(retainedPartitionsCount++);
-                        addPartitionToAssignment(
-                            targetAssignment,
-                            memberId,
-                            topicIdPartition.topicId(),
-                            topicIdPartition.partitionId()
-                        );
-                        remainingMembersToGetAnExtraPartition--;
+    private void maybeRevokePartitions() {
+        for (Map.Entry<String, AssignmentMemberSpec> entry : 
groupSpec.members().entrySet()) {
+            String memberId = entry.getKey();
+            AssignmentMemberSpec assignmentMemberSpec = entry.getValue();
+            Map<Uuid, Set<Integer>> oldAssignment = 
assignmentMemberSpec.assignedPartitions();
+            Map<Uuid, Set<Integer>> newAssignment = null;
+
+            // The assignor expects to receive the assignment as an immutable 
map. It leverages
+            // this knowledge in order to avoid having to copy all assignments.
+            if (!isImmutableMap(oldAssignment)) {
+                throw new IllegalStateException("The assignor expect an 
immutable map.");
+            }
+
+            int quota = minimumMemberQuota;
+            if (remainingMembersToGetAnExtraPartition > 0) {
+                quota++;
+                remainingMembersToGetAnExtraPartition--;
+            }
+
+            for (Map.Entry<Uuid, Set<Integer>> topicPartitions : 
oldAssignment.entrySet()) {
+                Uuid topicId = topicPartitions.getKey();
+                Set<Integer> partitions = topicPartitions.getValue();
+
+                if (subscribedTopicIds.contains(topicId)) {
+                    if (partitions.size() <= quota) {
+                        quota -= partitions.size();
+                    } else {
+                        for (Integer partition : partitions) {
+                            if (quota > 0) {
+                                quota--;
+                            } else {
+                                if (newAssignment == null) {
+                                    // If the new assignment is null, we 
create a deep copy of the
+                                    // original assignment so that we can 
alter it.
+                                    newAssignment = deepCopy(oldAssignment);
+                                }
+                                // Remove the partition from the new 
assignment.
+                                Set<Integer> parts = 
newAssignment.get(topicId);
+                                parts.remove(partition);
+                                if (parts.isEmpty()) {
+                                    newAssignment.remove(topicId);
+                                }
+                                // Add the partition to the unassigned set to 
be re-assigned later on.
+                                unassignedPartitions.add(new 
TopicIdPartition(topicId, partition));
+                            }
+                        }
                     }
-                    // Any previously owned partitions that weren't retained 
due to the quotas
-                    // are added to the unassigned partitions set.
-                    if (retainedPartitionsCount < currentAssignmentSize) {
-                        
unassignedPartitions.addAll(validCurrentMemberAssignment.subList(
-                            retainedPartitionsCount,
-                            currentAssignmentSize
-                        ));
+                } else {
+                    if (newAssignment == null) {
+                        // If the new assignment is null, we create a deep 
copy of the
+                        // original assignment so that we can alter it.
+                        newAssignment = deepCopy(oldAssignment);
                     }
+                    // Remove the entire topic.
+                    newAssignment.remove(topicId);
                 }
             }
 
-            if (remaining >= 0) {
-                potentiallyUnfilledMembers.put(memberId, remaining);
+            if (quota > 0) {
+                unfilledMembers.add(new MemberWithRemainingQuota(memberId, 
quota));
             }
-        });
 
-        return potentiallyUnfilledMembers;
-    }
-
-    /**
-     * Filters the current assignment of partitions for a given member based 
on certain criteria.
-     *
-     * Any partition that still belongs to the member's subscribed topics list 
is considered valid.
-     *
-     * @param currentMemberAssignment       The map of topics to partitions 
currently assigned to the member.
-     *
-     * @return List of valid partitions after applying the filters.
-     */
-    private List<TopicIdPartition> validCurrentMemberAssignment(
-        Map<Uuid, Set<Integer>> currentMemberAssignment
-    ) {
-        List<TopicIdPartition> validCurrentAssignmentList = new ArrayList<>();
-        currentMemberAssignment.forEach((topicId, partitions) -> {
-            if (subscribedTopicIds.contains(topicId)) {
-                partitions.forEach(partition -> {
-                    TopicIdPartition topicIdPartition = new 
TopicIdPartition(topicId, partition);
-                    validCurrentAssignmentList.add(topicIdPartition);
-                });
+            if (newAssignment == null) {
+                targetAssignment.put(memberId, new 
MemberAssignment(oldAssignment));
             } else {
-                LOG.debug("The topic " + topicId + " is no longer present in 
the subscribed topics list");
+                targetAssignment.put(memberId, new 
MemberAssignment(newAssignment));
             }
-        });
-
-        return validCurrentAssignmentList;
+        }
     }
 
     /**
-     * Allocates the unassigned partitions to unfilled members in a 
round-robin fashion.
+     * Assign the unassigned partitions to the unfilled members.
      */
-    private void unassignedPartitionsRoundRobinAssignment() {
-        Queue<String> roundRobinMembers = new 
LinkedList<>(potentiallyUnfilledMembers.keySet());
-
-        // Partitions are sorted to ensure an even topic wise distribution 
across members.
-        // This not only balances the load but also makes partition-to-member 
mapping more predictable.
-        List<TopicIdPartition> sortedPartitionsList = 
unassignedPartitions.stream()
-            
.sorted(Comparator.comparing(TopicIdPartition::topicId).thenComparing(TopicIdPartition::partitionId))
-            .collect(Collectors.toList());
-
-        for (TopicIdPartition topicIdPartition : sortedPartitionsList) {
-            boolean assigned = false;
-
-            for (int i = 0; i < roundRobinMembers.size() && !assigned; i++) {
-                String memberId = roundRobinMembers.poll();
-                if (potentiallyUnfilledMembers.containsKey(memberId)) {
-                    assigned = maybeAssignPartitionToMember(memberId, 
topicIdPartition);
-                }
-                // Only re-add the member to the end of the queue if it's 
still available for assignment.
-                if (potentiallyUnfilledMembers.containsKey(memberId)) {
-                    roundRobinMembers.add(memberId);
-                }
+    private void assignRemainingPartitions() {
+        int unassignedPartitionIndex = 0;
+
+        for (MemberWithRemainingQuota unfilledMember : unfilledMembers) {
+            String memberId = unfilledMember.memberId;
+            int remainingQuota = unfilledMember.remainingQuota;
+
+            Map<Uuid, Set<Integer>> newAssignment = 
targetAssignment.get(memberId).targetPartitions();
+            if (isImmutableMap(newAssignment)) {
+                // If the new assignment is immutable, we must create a deep 
copy of it
+                // before altering it.
+                newAssignment = deepCopy(newAssignment);
+                targetAssignment.put(memberId, new 
MemberAssignment(newAssignment));
             }
 
-            if (assigned) {
-                unassignedPartitions.remove(topicIdPartition);
+            for (int i = 0; i < remainingQuota && unassignedPartitionIndex < 
unassignedPartitions.size(); i++) {
+                TopicIdPartition unassignedTopicIdPartition = 
unassignedPartitions.get(unassignedPartitionIndex);
+                unassignedPartitionIndex++;
+                newAssignment
+                    .computeIfAbsent(unassignedTopicIdPartition.topicId(), __ 
-> new HashSet<>())
+                    .add(unassignedTopicIdPartition.partitionId());
             }
         }
-    }
 
-    /**
-     * Assigns the specified partition to the given member and updates the 
potentially unfilled members map.
-     * Only assign extra partitions once the member has met its minimum quota 
= total partitions / total members.
-     *
-     * <ol>
-     *     <li> If the minimum quota hasn't been met aka remaining > 0 
directly assign the partition.
-     *          After assigning the partition, if the min quota has been met 
aka remaining = 0, remove the member
-     *          if there's no members left to receive an extra partition. 
Otherwise, keep it in the
-     *          potentially unfilled map. </li>
-     *     <li> If the minimum quota has been met and if there is potential to 
receive an extra partition, assign it.
-     *          Remove the member from the potentially unfilled map since it 
has already received the extra partition
-     *          and met the min quota. </li>
-     *     <li> Else, don't assign the partition. </li>
-     * </ol>
-     *
-     * @param memberId              The Id of the member to which the 
partition will be assigned.
-     * @param topicIdPartition      The topicIdPartition to be assigned.
-     * @return true if the assignment was successful, false otherwise.
-     */
-    private boolean maybeAssignPartitionToMember(String memberId, 
TopicIdPartition topicIdPartition) {
-        int remaining = potentiallyUnfilledMembers.get(memberId);
-        boolean shouldAssign = false;
-
-        // If the member hasn't met the minimum quota, set the flag for 
assignment.
-        // If member has met minimum quota and there's an extra partition 
available, set the flag for assignment.
-        if (remaining > 0) {
-            potentiallyUnfilledMembers.put(memberId, --remaining);
-            shouldAssign = true;
-
-            // If the member meets the minimum quota due to this assignment,
-            // check if any extra partitions are available.
-            // Removing the member from the list reduces an iteration for when 
remaining = 0 but there's no extras left.
-            if (remaining == 0 && remainingMembersToGetAnExtraPartition == 0) {
-                potentiallyUnfilledMembers.remove(memberId);
-            }
-        } else if (remaining == 0 && remainingMembersToGetAnExtraPartition > 
0) {
-            remainingMembersToGetAnExtraPartition--;
-            // Each member can only receive one extra partition, once they 
meet the minimum quota and receive an extra
-            // partition they can be removed from the potentially unfilled 
members map.
-            potentiallyUnfilledMembers.remove(memberId);
-            shouldAssign = true;
+        if (unassignedPartitionIndex < unassignedPartitions.size()) {
+            throw new PartitionAssignorException("Partitions were left 
unassigned");
         }
+    }
 
-        // Assign the partition if flag is set.
-        if (shouldAssign) {
-            addPartitionToAssignment(
-                targetAssignment,
-                memberId,
-                topicIdPartition.topicId(),
-                topicIdPartition.partitionId()
-            );
-            return true;
+    private static Map<Uuid, Set<Integer>> deepCopy(Map<Uuid, Set<Integer>> 
map) {
+        Map<Uuid, Set<Integer>> copy = new HashMap<>(map.size());
+        for (Map.Entry<Uuid, Set<Integer>> entry : map.entrySet()) {
+            copy.put(entry.getKey(), new HashSet<>(entry.getValue()));
         }
+        return copy;
+    }
+
+    private static class MemberWithRemainingQuota {
+        final String memberId;
+        final int remainingQuota;
 
-        // No assignment possible because the member met the minimum quota but
-        // number of members to receive an extra partition is zero.
-        return false;
+        MemberWithRemainingQuota(
+            String memberId,
+            int remainingQuota
+        ) {
+            this.memberId = memberId;
+            this.remainingQuota = remainingQuota;
+        }
     }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java
index 7da7c2d8c8a..648b0161d64 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java
@@ -66,21 +66,19 @@ public class UniformAssignor implements 
ConsumerGroupPartitionAssignor {
         GroupSpec groupSpec,
         SubscribedTopicDescriber subscribedTopicDescriber
     ) throws PartitionAssignorException {
-        AbstractUniformAssignmentBuilder assignmentBuilder;
-
         if (groupSpec.members().isEmpty())
             return new GroupAssignment(Collections.emptyMap());
 
         if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
             LOG.debug("Detected that all members are subscribed to the same 
set of topics, invoking the "
                 + "optimized assignment algorithm");
-            assignmentBuilder = new 
OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber);
+            return new OptimizedUniformAssignmentBuilder(groupSpec, 
subscribedTopicDescriber)
+                .build();
         } else {
             LOG.debug("Detected that the members are subscribed to different 
sets of topics, invoking the "
                 + "general assignment algorithm");
-            assignmentBuilder = new GeneralUniformAssignmentBuilder(groupSpec, 
subscribedTopicDescriber);
+            return new GeneralUniformAssignmentBuilder(groupSpec, 
subscribedTopicDescriber)
+                .buildAssignment();
         }
-
-        return assignmentBuilder.buildAssignment();
     }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java
index 74bb303abd0..ffc5455ceaf 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java
@@ -22,12 +22,13 @@ import 
org.apache.kafka.coordinator.group.assignor.GroupAssignment;
 
 import java.util.AbstractMap;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
-import java.util.TreeSet;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
@@ -42,13 +43,13 @@ public class AssignmentTestUtil {
         );
     }
 
-    public static Map.Entry<Uuid, Set<Integer>> mkSortedTopicAssignment(
+    public static Map.Entry<Uuid, Set<Integer>> mkOrderedTopicAssignment(
         Uuid topicId,
         Integer... partitions
     ) {
         return new AbstractMap.SimpleEntry<>(
             topicId,
-            new TreeSet<>(Arrays.asList(partitions))
+            new LinkedHashSet<>(Arrays.asList(partitions))
         );
     }
 
@@ -56,18 +57,18 @@ public class AssignmentTestUtil {
     public static Map<Uuid, Set<Integer>> mkAssignment(Map.Entry<Uuid, 
Set<Integer>>... entries) {
         Map<Uuid, Set<Integer>> assignment = new HashMap<>();
         for (Map.Entry<Uuid, Set<Integer>> entry : entries) {
-            assignment.put(entry.getKey(), entry.getValue());
+            assignment.put(entry.getKey(), 
Collections.unmodifiableSet(entry.getValue()));
         }
-        return assignment;
+        return Collections.unmodifiableMap(assignment);
     }
 
     @SafeVarargs
-    public static Map<Uuid, Set<Integer>> mkSortedAssignment(Map.Entry<Uuid, 
Set<Integer>>... entries) {
+    public static Map<Uuid, Set<Integer>> mkOrderedAssignment(Map.Entry<Uuid, 
Set<Integer>>... entries) {
         Map<Uuid, Set<Integer>> assignment = new LinkedHashMap<>();
         for (Map.Entry<Uuid, Set<Integer>> entry : entries) {
-            assignment.put(entry.getKey(), entry.getValue());
+            assignment.put(entry.getKey(), 
Collections.unmodifiableSet(entry.getValue()));
         }
-        return assignment;
+        return Collections.unmodifiableMap(assignment);
     }
 
     /**
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpersTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpersTest.java
index 77bad7a4834..900ba318393 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpersTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpersTest.java
@@ -69,8 +69,8 @@ import java.util.Set;
 import java.util.stream.Stream;
 
 import static org.apache.kafka.coordinator.group.Assertions.assertRecordEquals;
-import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedAssignment;
-import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedTopicAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedTopicAssignment;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
 import static 
org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newCurrentAssignmentRecord;
 import static 
org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord;
@@ -297,7 +297,7 @@ public class CoordinatorRecordHelpersTest {
         Uuid topicId1 = Uuid.randomUuid();
         Uuid topicId2 = Uuid.randomUuid();
 
-        Map<Uuid, Set<Integer>> partitions = mkSortedAssignment(
+        Map<Uuid, Set<Integer>> partitions = mkOrderedAssignment(
             mkTopicAssignment(topicId1, 11, 12, 13),
             mkTopicAssignment(topicId2, 21, 22, 23)
         );
@@ -379,14 +379,14 @@ public class CoordinatorRecordHelpersTest {
         Uuid topicId1 = Uuid.randomUuid();
         Uuid topicId2 = Uuid.randomUuid();
 
-        Map<Uuid, Set<Integer>> assigned = mkSortedAssignment(
-            mkSortedTopicAssignment(topicId1, 11, 12, 13),
-            mkSortedTopicAssignment(topicId2, 21, 22, 23)
+        Map<Uuid, Set<Integer>> assigned = mkOrderedAssignment(
+            mkOrderedTopicAssignment(topicId1, 11, 12, 13),
+            mkOrderedTopicAssignment(topicId2, 21, 22, 23)
         );
 
-        Map<Uuid, Set<Integer>> revoking = mkSortedAssignment(
-            mkSortedTopicAssignment(topicId1, 14, 15, 16),
-            mkSortedTopicAssignment(topicId2, 24, 25, 26)
+        Map<Uuid, Set<Integer>> revoking = mkOrderedAssignment(
+            mkOrderedTopicAssignment(topicId1, 14, 15, 16),
+            mkOrderedTopicAssignment(topicId2, 24, 25, 26)
         );
 
         CoordinatorRecord expectedRecord = new CoordinatorRecord(
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
index f21bd63735f..fdc4f5941fa 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java
@@ -34,6 +34,7 @@ import java.util.TreeMap;
 import static org.apache.kafka.common.utils.Utils.mkSet;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssignment;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment;
+import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAssignment;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment;
 import static 
org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment;
 import static 
org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks;
@@ -158,12 +159,11 @@ public class OptimizedUniformAssignmentBuilderTest {
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
         expectedAssignment.put(memberA, mkAssignment(
-            mkTopicAssignment(topic1Uuid, 0, 2),
-            mkTopicAssignment(topic3Uuid, 1)
+            mkTopicAssignment(topic1Uuid, 0),
+            mkTopicAssignment(topic3Uuid, 0, 1)
         ));
         expectedAssignment.put(memberB, mkAssignment(
-            mkTopicAssignment(topic1Uuid, 1),
-            mkTopicAssignment(topic3Uuid, 0)
+            mkTopicAssignment(topic1Uuid, 1, 2)
         ));
 
         GroupSpec groupSpec = new GroupSpecImpl(
@@ -295,30 +295,25 @@ public class OptimizedUniformAssignmentBuilderTest {
         ));
 
         Map<String, AssignmentMemberSpec> members = new TreeMap<>();
-        Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
-            mkAssignment(
-                mkTopicAssignment(topic1Uuid, 0, 1),
-                mkTopicAssignment(topic2Uuid, 0, 1)
-            )
-        );
+
         members.put(memberA, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
             mkSet(topic1Uuid, topic2Uuid),
-            currentAssignmentForA
+            mkOrderedAssignment(
+                mkTopicAssignment(topic1Uuid, 0, 1),
+                mkTopicAssignment(topic2Uuid, 0, 1)
+            )
         ));
 
-        Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(
-            mkAssignment(
-                mkTopicAssignment(topic1Uuid, 2),
-                mkTopicAssignment(topic2Uuid, 2)
-            )
-        );
         members.put(memberB, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
             mkSet(topic1Uuid, topic2Uuid),
-            currentAssignmentForB
+            mkOrderedAssignment(
+                mkTopicAssignment(topic1Uuid, 2),
+                mkTopicAssignment(topic2Uuid, 2)
+            )
         ));
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
@@ -366,40 +361,34 @@ public class OptimizedUniformAssignmentBuilderTest {
 
         Map<String, AssignmentMemberSpec> members = new TreeMap<>();
 
-        Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(
-            mkAssignment(
-                mkTopicAssignment(topic1Uuid, 0, 2),
-                mkTopicAssignment(topic2Uuid, 0)
-            )
-        );
         members.put(memberA, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
             mkSet(topic1Uuid, topic2Uuid),
-            currentAssignmentForA
+            mkOrderedAssignment(
+                mkTopicAssignment(topic1Uuid, 0, 2),
+                mkTopicAssignment(topic2Uuid, 0)
+            )
         ));
 
-        Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(
-            mkAssignment(
-                mkTopicAssignment(topic1Uuid, 1),
-                mkTopicAssignment(topic2Uuid, 1, 2)
-            )
-        );
         members.put(memberB, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
             mkSet(topic1Uuid, topic2Uuid),
-            currentAssignmentForB
+            mkOrderedAssignment(
+                mkTopicAssignment(topic1Uuid, 1),
+                mkTopicAssignment(topic2Uuid, 1, 2)
+            )
         ));
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
         expectedAssignment.put(memberA, mkAssignment(
-            mkTopicAssignment(topic1Uuid, 0, 2, 3, 5),
-            mkTopicAssignment(topic2Uuid, 0, 4)
+            mkTopicAssignment(topic1Uuid, 0, 2, 3),
+            mkTopicAssignment(topic2Uuid, 0, 3, 4)
         ));
         expectedAssignment.put(memberB, mkAssignment(
-            mkTopicAssignment(topic1Uuid, 1, 4),
-            mkTopicAssignment(topic2Uuid, 1, 2, 3)
+            mkTopicAssignment(topic1Uuid, 1, 4, 5),
+            mkTopicAssignment(topic2Uuid, 1, 2)
         ));
 
         GroupSpec groupSpec = new GroupSpecImpl(
@@ -436,26 +425,24 @@ public class OptimizedUniformAssignmentBuilderTest {
 
         Map<String, AssignmentMemberSpec> members = new HashMap<>();
 
-        Map<Uuid, Set<Integer>> currentAssignmentForA = new 
TreeMap<>(mkAssignment(
-            mkTopicAssignment(topic1Uuid, 0, 2),
-            mkTopicAssignment(topic2Uuid, 0)
-        ));
         members.put(memberA, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
             mkSet(topic1Uuid, topic2Uuid),
-            currentAssignmentForA
+            mkOrderedAssignment(
+                mkTopicAssignment(topic1Uuid, 0, 2),
+                mkTopicAssignment(topic2Uuid, 0)
+            )
         ));
 
-        Map<Uuid, Set<Integer>> currentAssignmentForB = new 
TreeMap<>(mkAssignment(
-            mkTopicAssignment(topic1Uuid, 1),
-            mkTopicAssignment(topic2Uuid, 1, 2)
-        ));
         members.put(memberB, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
             mkSet(topic1Uuid, topic2Uuid),
-            currentAssignmentForB
+            mkOrderedAssignment(
+                mkTopicAssignment(topic1Uuid, 1),
+                mkTopicAssignment(topic2Uuid, 1, 2)
+            )
         ));
 
         // Add a new member to trigger a re-assignment.
@@ -512,38 +499,36 @@ public class OptimizedUniformAssignmentBuilderTest {
 
         Map<String, AssignmentMemberSpec> members = new HashMap<>();
 
-        Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
-            mkTopicAssignment(topic1Uuid, 0),
-            mkTopicAssignment(topic2Uuid, 0)
-        );
         members.put(memberA, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
             mkSet(topic1Uuid, topic2Uuid),
-            currentAssignmentForA
+            mkAssignment(
+                mkTopicAssignment(topic1Uuid, 0),
+                mkTopicAssignment(topic2Uuid, 0)
+            )
         ));
 
-        Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
-            mkTopicAssignment(topic1Uuid, 1),
-            mkTopicAssignment(topic2Uuid, 1)
-        );
         members.put(memberB, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
             mkSet(topic1Uuid, topic2Uuid),
-            currentAssignmentForB
+            mkAssignment(
+                mkTopicAssignment(topic1Uuid, 1),
+                mkTopicAssignment(topic2Uuid, 1)
+            )
         ));
 
         // Member C was removed
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
         expectedAssignment.put(memberA, mkAssignment(
-            mkTopicAssignment(topic1Uuid, 0, 2),
-            mkTopicAssignment(topic2Uuid, 0)
+            mkTopicAssignment(topic1Uuid, 0),
+            mkTopicAssignment(topic2Uuid, 0, 2)
         ));
         expectedAssignment.put(memberB, mkAssignment(
-            mkTopicAssignment(topic1Uuid, 1),
-            mkTopicAssignment(topic2Uuid, 1, 2)
+            mkTopicAssignment(topic1Uuid, 1, 2),
+            mkTopicAssignment(topic2Uuid, 1)
         ));
 
         GroupSpec groupSpec = new GroupSpecImpl(
@@ -581,26 +566,24 @@ public class OptimizedUniformAssignmentBuilderTest {
         // Initial subscriptions were [T1, T2]
         Map<String, AssignmentMemberSpec> members = new HashMap<>();
 
-        Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment(
-            mkTopicAssignment(topic1Uuid, 0),
-            mkTopicAssignment(topic2Uuid, 0)
-        );
         members.put(memberA, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
             Collections.singleton(topic2Uuid),
-            currentAssignmentForA
+            mkAssignment(
+                mkTopicAssignment(topic1Uuid, 0),
+                mkTopicAssignment(topic2Uuid, 0)
+            )
         ));
 
-        Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment(
-            mkTopicAssignment(topic1Uuid, 1),
-            mkTopicAssignment(topic2Uuid, 1)
-        );
         members.put(memberB, new AssignmentMemberSpec(
             Optional.empty(),
             Optional.empty(),
             Collections.singleton(topic2Uuid),
-            currentAssignmentForB
+            mkAssignment(
+                mkTopicAssignment(topic1Uuid, 1),
+                mkTopicAssignment(topic2Uuid, 1)
+            )
         ));
 
         Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
index 2349350e4ba..77a38ab7f4e 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java
@@ -268,7 +268,7 @@ public class ServerSideAssignorBenchmark {
                 assignmentMemberSpec.instanceId(),
                 assignmentMemberSpec.rackId(),
                 assignmentMemberSpec.subscribedTopicIds(),
-                memberAssignment.targetPartitions()
+                
Collections.unmodifiableMap(memberAssignment.targetPartitions())
             ));
         });
 
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
index 511db01c862..7d67f07d3e6 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java
@@ -184,10 +184,7 @@ public class TargetAssignmentBuilderBenchmark {
         for (Map.Entry<String, MemberAssignment> entry : 
groupAssignment.members().entrySet()) {
             String memberId = entry.getKey();
             Map<Uuid, Set<Integer>> topicPartitions = 
entry.getValue().targetPartitions();
-
-            Assignment assignment = new Assignment(topicPartitions);
-
-            initialTargetAssignment.put(memberId, assignment);
+            initialTargetAssignment.put(memberId, new 
Assignment(topicPartitions));
         }
 
         return initialTargetAssignment;


Reply via email to