dajac commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1663741172


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##########
@@ -65,191 +94,228 @@ public String name() {
     }
 
     /**
-     * Pair of memberId and remaining partitions to meet the quota.
+     * Metadata for a topic including partition and subscription details.
      */
-    private static class MemberWithRemainingAssignments {
+    private static class TopicMetadata {
+        private final Uuid topicId;
+        private final int numPartitions;
+        private int numMembers;
+        private int minQuota = -1;
+        private int extraPartitions = -1;
+        private int nextRange = 0;
+
         /**
-         * Member Id.
+         * Constructs a new TopicMetadata instance.
+         *
+         * @param topicId           The topic Id.
+         * @param numPartitions     The number of partitions.
+         * @param numMembers        The number of subscribed members.
          */
-        private final String memberId;
+        private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) 
{
+            this.topicId = topicId;
+            this.numPartitions = numPartitions;
+            this.numMembers = numMembers;
+        }
 
         /**
-         * Number of partitions required to meet the assignment quota.
+         * Computes the minimum partition quota per member and the extra 
partitions, if not already computed.
          */
-        private final int remaining;
+        private void maybeComputeQuota() {
+            if (minQuota != -1) return;
 
-        public MemberWithRemainingAssignments(String memberId, int remaining) {
-            this.memberId = memberId;
-            this.remaining = remaining;
+            // The minimum number of partitions each member should receive for 
a balanced assignment.
+            minQuota = numPartitions / numMembers;
+
+            // Extra partitions to be distributed one to each member.
+            extraPartitions = numPartitions % numMembers;
+        }
+
+        @Override
+        public String toString() {
+            return "TopicMetadata(topicId=" + topicId +
+                ", numPartitions=" + numPartitions +
+                ", numMembers=" + numMembers +
+                ", minQuota=" + minQuota +
+                ", extraPartitions=" + extraPartitions +
+                ", nextRange=" + nextRange +
+                ')';
         }
     }
 
     /**
-     * Returns a map of topic Ids to a list of members subscribed to them,
-     * based on the given assignment specification and metadata.
-     *
-     * @param groupSpec                     The specification required for 
group assignments.
-     * @param subscribedTopicDescriber      The metadata describer for 
subscribed topics and clusters.
-     * @return A map of topic Ids to a list of member Ids subscribed to them.
-     *
-     * @throws PartitionAssignorException If a member is subscribed to a 
non-existent topic.
+     * Assigns partitions to members of a homogeneous group. All members are 
subscribed to the same set of topics.
+     * Assignment will be co-partitioned when all the topics have an equal 
number of partitions.
      */
-    private Map<Uuid, Collection<String>> membersPerTopic(
-        final GroupSpec groupSpec,
-        final SubscribedTopicDescriber subscribedTopicDescriber
-    ) {
-        Map<Uuid, Collection<String>> membersPerTopic = new HashMap<>();
-
-        if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
-            Collection<String> allMembers = groupSpec.memberIds();
-            Collection<Uuid> topics = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
-                .subscribedTopicIds();
-
-            for (Uuid topicId : topics) {
-                if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
-                    throw new PartitionAssignorException("Member is subscribed 
to a non-existent topic");
-                }
-                membersPerTopic.put(topicId, allMembers);
+    private GroupAssignment assignHomogeneousGroup(
+        GroupSpec groupSpec,
+        SubscribedTopicDescriber subscribedTopicDescriber
+    ) throws PartitionAssignorException {
+        List<String> memberIds = sortMemberIds(groupSpec);
+
+        MemberSubscription subs = 
groupSpec.memberSubscription(memberIds.get(0));
+        Set<Uuid> subscribedTopics = new HashSet<>(subs.subscribedTopicIds());

Review Comment:
   It looks like copying the `subscribedTopicIds` is not necessary here because 
we only iterate on it once. What do you think?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##########
@@ -65,191 +94,228 @@ public String name() {
     }
 
     /**
-     * Pair of memberId and remaining partitions to meet the quota.
+     * Metadata for a topic including partition and subscription details.
      */
-    private static class MemberWithRemainingAssignments {
+    private static class TopicMetadata {
+        private final Uuid topicId;
+        private final int numPartitions;
+        private int numMembers;
+        private int minQuota = -1;
+        private int extraPartitions = -1;
+        private int nextRange = 0;
+
         /**
-         * Member Id.
+         * Constructs a new TopicMetadata instance.
+         *
+         * @param topicId           The topic Id.
+         * @param numPartitions     The number of partitions.
+         * @param numMembers        The number of subscribed members.
          */
-        private final String memberId;
+        private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) 
{
+            this.topicId = topicId;
+            this.numPartitions = numPartitions;
+            this.numMembers = numMembers;
+        }
 
         /**
-         * Number of partitions required to meet the assignment quota.
+         * Computes the minimum partition quota per member and the extra 
partitions, if not already computed.
          */
-        private final int remaining;
+        private void maybeComputeQuota() {
+            if (minQuota != -1) return;
 
-        public MemberWithRemainingAssignments(String memberId, int remaining) {
-            this.memberId = memberId;
-            this.remaining = remaining;
+            // The minimum number of partitions each member should receive for 
a balanced assignment.
+            minQuota = numPartitions / numMembers;
+
+            // Extra partitions to be distributed one to each member.
+            extraPartitions = numPartitions % numMembers;
+        }
+
+        @Override
+        public String toString() {
+            return "TopicMetadata(topicId=" + topicId +
+                ", numPartitions=" + numPartitions +
+                ", numMembers=" + numMembers +
+                ", minQuota=" + minQuota +
+                ", extraPartitions=" + extraPartitions +
+                ", nextRange=" + nextRange +
+                ')';
         }
     }
 
     /**
-     * Returns a map of topic Ids to a list of members subscribed to them,
-     * based on the given assignment specification and metadata.
-     *
-     * @param groupSpec                     The specification required for 
group assignments.
-     * @param subscribedTopicDescriber      The metadata describer for 
subscribed topics and clusters.
-     * @return A map of topic Ids to a list of member Ids subscribed to them.
-     *
-     * @throws PartitionAssignorException If a member is subscribed to a 
non-existent topic.
+     * Assigns partitions to members of a homogeneous group. All members are 
subscribed to the same set of topics.
+     * Assignment will be co-partitioned when all the topics have an equal 
number of partitions.
      */
-    private Map<Uuid, Collection<String>> membersPerTopic(
-        final GroupSpec groupSpec,
-        final SubscribedTopicDescriber subscribedTopicDescriber
-    ) {
-        Map<Uuid, Collection<String>> membersPerTopic = new HashMap<>();
-
-        if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
-            Collection<String> allMembers = groupSpec.memberIds();
-            Collection<Uuid> topics = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
-                .subscribedTopicIds();
-
-            for (Uuid topicId : topics) {
-                if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
-                    throw new PartitionAssignorException("Member is subscribed 
to a non-existent topic");
-                }
-                membersPerTopic.put(topicId, allMembers);
+    private GroupAssignment assignHomogeneousGroup(
+        GroupSpec groupSpec,
+        SubscribedTopicDescriber subscribedTopicDescriber
+    ) throws PartitionAssignorException {
+        List<String> memberIds = sortMemberIds(groupSpec);
+
+        MemberSubscription subs = 
groupSpec.memberSubscription(memberIds.get(0));
+        Set<Uuid> subscribedTopics = new HashSet<>(subs.subscribedTopicIds());
+        List<TopicMetadata> topics = new ArrayList<>(subscribedTopics.size());
+        int numMembers = groupSpec.memberIds().size();
+
+        for (Uuid topicId : subscribedTopics) {
+            int numPartitions = 
subscribedTopicDescriber.numPartitions(topicId);
+            if (numPartitions == -1) {
+                throw new PartitionAssignorException("Member is subscribed to 
a non-existent topic");
             }
-        } else {
-            groupSpec.memberIds().forEach(memberId -> {
-                Collection<Uuid> topics = 
groupSpec.memberSubscription(memberId).subscribedTopicIds();
-                for (Uuid topicId : topics) {
-                    if (subscribedTopicDescriber.numPartitions(topicId) == -1) 
{
-                        throw new PartitionAssignorException("Member is 
subscribed to a non-existent topic");
-                    }
-                    membersPerTopic
-                        .computeIfAbsent(topicId, k -> new ArrayList<>())
-                        .add(memberId);
-                }
-            });
+            TopicMetadata m = new TopicMetadata(
+                topicId,
+                numPartitions,
+                numMembers
+            );
+            topics.add(m);
         }
 
-        return membersPerTopic;
+        Map<String, MemberAssignment> assignments = new HashMap<>((int) 
((groupSpec.memberIds().size() / 0.75f) + 1));
+        int memberAssignmentInitialCapacity = (int) ((subscribedTopics.size() 
/ 0.75f) + 1);

Review Comment:
   If we do the change from my previous comment, I would use `topics.size()` 
here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##########
@@ -65,191 +94,233 @@ public String name() {
     }
 
     /**
-     * Pair of memberId and remaining partitions to meet the quota.
+     * Metadata for a topic including partition and subscription details.
      */
-    private static class MemberWithRemainingAssignments {
+    private static class TopicMetadata {
+        public final Uuid topicId;
+        public final int numPartitions;
+        public int numMembers;
+
+        public int minQuota = -1;
+        public int extraPartitions = -1;
+        public int nextRange = 0;
+
+        /**
+         * Constructs a new TopicMetadata instance.
+         *
+         * @param topicId           The topic Id.
+         * @param numPartitions     The number of partitions.
+         * @param numMembers        The number of subscribed members.
+         */
+        private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) 
{
+            this.topicId = topicId;
+            this.numPartitions = numPartitions;
+            this.numMembers = numMembers;
+        }
+
         /**
-         * Member Id.
+         * Factory method to create a TopicMetadata instance.
+         *
+         * @param topicId           The topic Id.
+         * @param numPartitions     The number of partitions.
+         * @param numMembers        The number of subscribed members.
+         * @return A new TopicMetadata instance.
          */
-        private final String memberId;
+        public static TopicMetadata create(Uuid topicId, int numPartitions, 
int numMembers) {
+            return new TopicMetadata(topicId, numPartitions, numMembers);
+        }
 
         /**
-         * Number of partitions required to meet the assignment quota.
+         * Computes the minimum partition quota per member and the extra 
partitions, if not already computed.
          */
-        private final int remaining;
+        void maybeComputeQuota() {
+            // The minimum number of partitions each member should receive for 
a balanced assignment.
+            if (minQuota != -1) return;
+            minQuota = numPartitions / numMembers;
+
+            // Extra partitions to be distributed one to each member.
+            extraPartitions = numPartitions % numMembers;
+        }
 
-        public MemberWithRemainingAssignments(String memberId, int remaining) {
-            this.memberId = memberId;
-            this.remaining = remaining;
+        @Override
+        public String toString() {
+            return "TopicMetadata{" +
+                "topicId=" + topicId +
+                ", numPartitions=" + numPartitions +
+                ", numMembers=" + numMembers +
+                ", minQuota=" + minQuota +
+                ", extraPartitions=" + extraPartitions +
+                ", nextRange=" + nextRange +
+                '}';
         }
     }
 
     /**
-     * Returns a map of topic Ids to a list of members subscribed to them,
-     * based on the given assignment specification and metadata.
-     *
-     * @param groupSpec                     The specification required for 
group assignments.
-     * @param subscribedTopicDescriber      The metadata describer for 
subscribed topics and clusters.
-     * @return A map of topic Ids to a list of member Ids subscribed to them.
-     *
-     * @throws PartitionAssignorException If a member is subscribed to a 
non-existent topic.
+     * Assigns partitions to members of a homogeneous group. All members are 
subscribed to the same set of topics.
+     * Assignment will be co-partitioned when all the topics have an equal 
number of partitions.
      */
-    private Map<Uuid, Collection<String>> membersPerTopic(
-        final GroupSpec groupSpec,
-        final SubscribedTopicDescriber subscribedTopicDescriber
-    ) {
-        Map<Uuid, Collection<String>> membersPerTopic = new HashMap<>();
-
-        if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
-            Collection<String> allMembers = groupSpec.memberIds();
-            Collection<Uuid> topics = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
-                .subscribedTopicIds();
-
-            for (Uuid topicId : topics) {
-                if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
-                    throw new PartitionAssignorException("Member is subscribed 
to a non-existent topic");
-                }
-                membersPerTopic.put(topicId, allMembers);
+    private GroupAssignment assignHomogeneousGroup(
+        GroupSpec groupSpec,
+        SubscribedTopicDescriber subscribedTopicDescriber
+    ) throws PartitionAssignorException {
+        List<String> memberIds = sortMemberIds(groupSpec);
+
+        MemberSubscription subs = 
groupSpec.memberSubscription(memberIds.get(0));
+        Set<Uuid> subscribedTopics = new HashSet<>(subs.subscribedTopicIds());
+        List<TopicMetadata> topics = new ArrayList<>(subscribedTopics.size());
+        int numMembers = groupSpec.memberIds().size();
+
+        for (Uuid topicId : subscribedTopics) {
+            int numPartitions = 
subscribedTopicDescriber.numPartitions(topicId);
+            if (numPartitions == -1) {
+                throw new PartitionAssignorException("Member is subscribed to 
a non-existent topic");
             }
-        } else {
-            groupSpec.memberIds().forEach(memberId -> {
-                Collection<Uuid> topics = 
groupSpec.memberSubscription(memberId).subscribedTopicIds();
-                for (Uuid topicId : topics) {
-                    if (subscribedTopicDescriber.numPartitions(topicId) == -1) 
{
-                        throw new PartitionAssignorException("Member is 
subscribed to a non-existent topic");
-                    }
-                    membersPerTopic
-                        .computeIfAbsent(topicId, k -> new ArrayList<>())
-                        .add(memberId);
-                }
-            });
+            TopicMetadata m = TopicMetadata.create(
+                topicId,
+                numPartitions,
+                numMembers
+            );
+            topics.add(m);
         }
 
-        return membersPerTopic;
+        Map<String, MemberAssignment> assignments = new HashMap<>((int) 
((groupSpec.memberIds().size() / 0.75f) + 1));
+
+        for (String memberId : memberIds) {
+            Map<Uuid, Set<Integer>> assignment = new HashMap<>((int) 
((subscribedTopics.size() / 0.75f) + 1));
+            for (TopicMetadata topicMetadata : topics) {
+                topicMetadata.maybeComputeQuota();
+                addPartitionsToAssignment(topicMetadata, assignment);
+            }
+            assignments.put(memberId, new MemberAssignmentImpl(assignment));
+        }
+
+        return new GroupAssignment(assignments);
     }
 
     /**
-     * The algorithm includes the following steps:
-     * <ol>
-     *      <li> Generate a map of members per topic using the given member 
subscriptions. </li>
-     *      <li> Generate a list of members called potentially unfilled 
members, which consists of members that have not
-     *           met the minimum required quota of partitions for the 
assignment AND get a list called assigned sticky
-     *           partitions for topic, which has the partitions that will be 
retained in the new assignment. </li>
-     *      <li> Generate a list of unassigned partitions by calculating the 
difference between the total partitions
-     *           for the topic and the assigned (sticky) partitions. </li>
-     *      <li> Find members from the potentially unfilled members list that 
haven't met the total required quota
-     *           i.e. minRequiredQuota + 1, if the member is designated to 
receive one of the excess partitions OR
-     *           minRequiredQuota otherwise. </li>
-     *      <li> Assign partitions to them in ranges from the unassigned 
partitions per topic
-     *           based on the remaining partitions value. </li>
-     * </ol>
+     * Assigns partitions to members of a heterogeneous group. Not all members 
are subscribed to the same topics.
      */
-    @Override
-    public GroupAssignment assign(
-        final GroupSpec groupSpec,
-        final SubscribedTopicDescriber subscribedTopicDescriber
+    private GroupAssignment assignHeterogeneousGroup(
+        GroupSpec groupSpec,
+        SubscribedTopicDescriber subscribedTopicDescriber
     ) throws PartitionAssignorException {
 
-        Map<String, MemberAssignment> newTargetAssignment = new HashMap<>();
-
-        // Step 1
-        Map<Uuid, Collection<String>> membersPerTopic = membersPerTopic(
-            groupSpec,
-            subscribedTopicDescriber
-        );
-
-        membersPerTopic.forEach((topicId, membersForTopic) -> {
-            int numPartitionsForTopic = 
subscribedTopicDescriber.numPartitions(topicId);
-            int minRequiredQuota = numPartitionsForTopic / 
membersForTopic.size();
-            // Each member can get only ONE extra partition per topic after 
receiving the minimum quota.
-            int numMembersWithExtraPartition = numPartitionsForTopic % 
membersForTopic.size();
-
-            // Step 2
-            Set<Integer> assignedStickyPartitionsForTopic = new HashSet<>();
-            List<MemberWithRemainingAssignments> potentiallyUnfilledMembers = 
new ArrayList<>();
-
-            for (String memberId : membersForTopic) {
-                Set<Integer> assignedPartitionsForTopic = groupSpec
-                    .memberAssignment(memberId)
-                    .partitions()
-                    .getOrDefault(topicId, Collections.emptySet());
-
-                int currentAssignmentSize = assignedPartitionsForTopic.size();
-                List<Integer> currentAssignmentListForTopic = new 
ArrayList<>(assignedPartitionsForTopic);
-
-                // If there were partitions from this topic that were 
previously assigned to this member, retain as many as possible.
-                // Sort the current assignment in ascending order since we 
want the same partition numbers from each topic
-                // to go to the same member, in order to facilitate joins in 
case of co-partitioned topics.
-                if (currentAssignmentSize > 0) {
-                    int retainedPartitionsCount = min(currentAssignmentSize, 
minRequiredQuota);
-                    Collections.sort(currentAssignmentListForTopic);
-                    for (int i = 0; i < retainedPartitionsCount; i++) {
-                        assignedStickyPartitionsForTopic
-                            .add(currentAssignmentListForTopic.get(i));
-                        newTargetAssignment.computeIfAbsent(memberId, k -> new 
MemberAssignmentImpl(new HashMap<>()))
-                            .partitions()
-                            .computeIfAbsent(topicId, k -> new HashSet<>())
-                            .add(currentAssignmentListForTopic.get(i));
+        List<String> memberIds = sortMemberIds(groupSpec);
+
+        Map<Uuid, TopicMetadata> topics = new HashMap<>();
+
+        for (String memberId : memberIds) {
+            MemberSubscription subs = groupSpec.memberSubscription(memberId);
+            for (Uuid topicId : subs.subscribedTopicIds()) {
+                TopicMetadata topicMetadata = topics.computeIfAbsent(topicId, 
__ -> {
+                    int numPartitions = 
subscribedTopicDescriber.numPartitions(topicId);
+                    if (numPartitions == -1) {
+                        throw new PartitionAssignorException("Member is 
subscribed to a non-existent topic");
                     }
-                }
-
-                // Number of partitions required to meet the minRequiredQuota.
-                // There are 3 cases w.r.t the value of remaining:
-                // 1) remaining < 0: this means that the member has more than 
the min required amount.
-                // 2) If remaining = 0: member has the minimum required 
partitions, but it may get an extra partition, so it is a potentially unfilled 
member.
-                // 3) If remaining > 0: member doesn't have the minimum 
required partitions, so it should be added to potentiallyUnfilledMembers.
-                int remaining = minRequiredQuota - currentAssignmentSize;
-
-                // Retain extra partitions as well when applicable.
-                if (remaining < 0 && numMembersWithExtraPartition > 0) {
-                    numMembersWithExtraPartition--;
-                    // Since we already added the minimumRequiredQuota of 
partitions in the previous step (until minReq - 1), we just need to
-                    // add the extra partition that will be present at the 
index right after min quota was satisfied.
-                    assignedStickyPartitionsForTopic
-                        
.add(currentAssignmentListForTopic.get(minRequiredQuota));
-                    newTargetAssignment.computeIfAbsent(memberId, k -> new 
MemberAssignmentImpl(new HashMap<>()))
-                        .partitions()
-                        .computeIfAbsent(topicId, k -> new HashSet<>())
-                        
.add(currentAssignmentListForTopic.get(minRequiredQuota));
-                } else {
-                    MemberWithRemainingAssignments newPair = new 
MemberWithRemainingAssignments(memberId, remaining);
-                    potentiallyUnfilledMembers.add(newPair);
-                }
+
+                    return TopicMetadata.create(
+                        topicId,
+                        numPartitions,
+                        0
+                    );
+                });
+                topicMetadata.numMembers++;
             }
+        }
 
-            // Step 3
-            // Find the difference between the total partitions per topic and 
the already assigned sticky partitions for the topic to get the unassigned 
partitions.
-            // List of unassigned partitions for topic contains the partitions 
in ascending order.
-            List<Integer> unassignedPartitionsForTopic = new ArrayList<>();
-            for (int i = 0; i < numPartitionsForTopic; i++) {
-                if (!assignedStickyPartitionsForTopic.contains(i)) {
-                    unassignedPartitionsForTopic.add(i);
-                }
+        Map<String, MemberAssignment> assignments = new HashMap<>((int) 
((groupSpec.memberIds().size() / 0.75f) + 1));
+
+        for (String memberId : memberIds) {
+            MemberSubscription subs = groupSpec.memberSubscription(memberId);
+            Map<Uuid, Set<Integer>> assignment = new 
HashMap<>(subs.subscribedTopicIds().size());
+            for (Uuid topicId : subs.subscribedTopicIds()) {
+                TopicMetadata metadata = topics.get(topicId);
+                metadata.maybeComputeQuota();
+                addPartitionsToAssignment(metadata, assignment);
             }
+            assignments.put(memberId, new MemberAssignmentImpl(assignment));
+        }
+
+        return new GroupAssignment(assignments);
+    }
 
-            // Step 4 and Step 5
-            // Account for the extra partitions if necessary and increase the 
required quota by 1.
-            // If remaining > 0 after increasing the required quota, assign 
the remaining number of partitions from the unassigned partitions list.
-            int unassignedPartitionsListStartPointer = 0;
-            for (MemberWithRemainingAssignments pair : 
potentiallyUnfilledMembers) {
-                String memberId = pair.memberId;
-                int remaining = pair.remaining;
-                if (numMembersWithExtraPartition > 0) {
-                    remaining++;
-                    numMembersWithExtraPartition--;
-                }
-                if (remaining > 0) {
-                    List<Integer> partitionsToAssign = 
unassignedPartitionsForTopic
-                        .subList(unassignedPartitionsListStartPointer, 
unassignedPartitionsListStartPointer + remaining);
-                    unassignedPartitionsListStartPointer += remaining;
-                    newTargetAssignment.computeIfAbsent(memberId, k -> new 
MemberAssignmentImpl(new HashMap<>()))
-                        .partitions()
-                        .computeIfAbsent(topicId, k -> new HashSet<>())
-                        .addAll(partitionsToAssign);
-                }
+    /**
+     * Sorts the member Ids in the group based on their instance Id if 
present, otherwise by member Id.
+     * This is done to ensure that the relative ordering of members doesn't 
change with static members
+     * thus resulting in a sticky assignment.
+     *
+     * @param groupSpec     The group specification containing the member 
information.
+     * @return a sorted list of member Ids.
+     */
+    private List<String> sortMemberIds(
+        GroupSpec groupSpec
+    ) {
+        List<String> sortedMemberIds = new ArrayList<>(groupSpec.memberIds());
+        Map<String, Optional<String>> instanceIdCache = new HashMap<>();
+
+        for (String memberId : sortedMemberIds) {
+            instanceIdCache.put(memberId, 
groupSpec.memberSubscription(memberId).instanceId());
+        }
+
+        sortedMemberIds.sort((memberId1, memberId2) -> {
+            Optional<String> instanceId1 = instanceIdCache.get(memberId1);
+            Optional<String> instanceId2 = instanceIdCache.get(memberId2);
+
+            if (instanceId1.isPresent() && instanceId2.isPresent()) {
+                return instanceId1.get().compareTo(instanceId2.get());
+            } else if (instanceId1.isPresent()) {
+                return -1;
+            } else if (instanceId2.isPresent()) {
+                return 1;
+            } else {
+                return memberId1.compareTo(memberId2);
             }
         });
+        return sortedMemberIds;
+    }
 
-        return new GroupAssignment(newTargetAssignment);
+    /**
+     * Assigns a range of partitions to the specified topic based on the 
provided metadata.
+     *
+     * @param topicMetadata         Metadata containing the topic details, 
including the number of partitions,
+     *                              the next range to assign, minQuota, and 
extra partitions.
+     * @param memberAssignment      Map from topic Id to the set of assigned 
partition Ids.
+     */
+    private void addPartitionsToAssignment(
+        TopicMetadata topicMetadata,
+        Map<Uuid, Set<Integer>> memberAssignment
+    ) {
+        if (topicMetadata.nextRange >= topicMetadata.numPartitions) {
+            memberAssignment.put(topicMetadata.topicId, 
Collections.emptySet());

Review Comment:
   No, this is not what I wanted. For instance, if a member has no assigned 
partitions, we expect him to have an empty map. Let's remove it if there is no 
reason for it.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##########
@@ -65,191 +94,233 @@ public String name() {
     }
 
     /**
-     * Pair of memberId and remaining partitions to meet the quota.
+     * Metadata for a topic including partition and subscription details.
      */
-    private static class MemberWithRemainingAssignments {
+    private static class TopicMetadata {
+        public final Uuid topicId;
+        public final int numPartitions;
+        public int numMembers;
+
+        public int minQuota = -1;
+        public int extraPartitions = -1;
+        public int nextRange = 0;
+
+        /**
+         * Constructs a new TopicMetadata instance.
+         *
+         * @param topicId           The topic Id.
+         * @param numPartitions     The number of partitions.
+         * @param numMembers        The number of subscribed members.
+         */
+        private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) 
{
+            this.topicId = topicId;
+            this.numPartitions = numPartitions;
+            this.numMembers = numMembers;
+        }
+
         /**
-         * Member Id.
+         * Factory method to create a TopicMetadata instance.
+         *
+         * @param topicId           The topic Id.
+         * @param numPartitions     The number of partitions.
+         * @param numMembers        The number of subscribed members.
+         * @return A new TopicMetadata instance.
          */
-        private final String memberId;
+        public static TopicMetadata create(Uuid topicId, int numPartitions, 
int numMembers) {
+            return new TopicMetadata(topicId, numPartitions, numMembers);
+        }
 
         /**
-         * Number of partitions required to meet the assignment quota.
+         * Computes the minimum partition quota per member and the extra 
partitions, if not already computed.
          */
-        private final int remaining;
+        void maybeComputeQuota() {
+            // The minimum number of partitions each member should receive for 
a balanced assignment.
+            if (minQuota != -1) return;
+            minQuota = numPartitions / numMembers;
+
+            // Extra partitions to be distributed one to each member.
+            extraPartitions = numPartitions % numMembers;
+        }
 
-        public MemberWithRemainingAssignments(String memberId, int remaining) {
-            this.memberId = memberId;
-            this.remaining = remaining;
+        @Override
+        public String toString() {
+            return "TopicMetadata{" +
+                "topicId=" + topicId +
+                ", numPartitions=" + numPartitions +
+                ", numMembers=" + numMembers +
+                ", minQuota=" + minQuota +
+                ", extraPartitions=" + extraPartitions +
+                ", nextRange=" + nextRange +
+                '}';
         }
     }
 
     /**
-     * Returns a map of topic Ids to a list of members subscribed to them,
-     * based on the given assignment specification and metadata.
-     *
-     * @param groupSpec                     The specification required for 
group assignments.
-     * @param subscribedTopicDescriber      The metadata describer for 
subscribed topics and clusters.
-     * @return A map of topic Ids to a list of member Ids subscribed to them.
-     *
-     * @throws PartitionAssignorException If a member is subscribed to a 
non-existent topic.
+     * Assigns partitions to members of a homogeneous group. All members are 
subscribed to the same set of topics.
+     * Assignment will be co-partitioned when all the topics have an equal 
number of partitions.
      */
-    private Map<Uuid, Collection<String>> membersPerTopic(
-        final GroupSpec groupSpec,
-        final SubscribedTopicDescriber subscribedTopicDescriber
-    ) {
-        Map<Uuid, Collection<String>> membersPerTopic = new HashMap<>();
-
-        if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
-            Collection<String> allMembers = groupSpec.memberIds();
-            Collection<Uuid> topics = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
-                .subscribedTopicIds();
-
-            for (Uuid topicId : topics) {
-                if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
-                    throw new PartitionAssignorException("Member is subscribed 
to a non-existent topic");
-                }
-                membersPerTopic.put(topicId, allMembers);
+    private GroupAssignment assignHomogeneousGroup(
+        GroupSpec groupSpec,
+        SubscribedTopicDescriber subscribedTopicDescriber
+    ) throws PartitionAssignorException {
+        List<String> memberIds = sortMemberIds(groupSpec);
+
+        MemberSubscription subs = 
groupSpec.memberSubscription(memberIds.get(0));
+        Set<Uuid> subscribedTopics = new HashSet<>(subs.subscribedTopicIds());
+        List<TopicMetadata> topics = new ArrayList<>(subscribedTopics.size());
+        int numMembers = groupSpec.memberIds().size();
+
+        for (Uuid topicId : subscribedTopics) {
+            int numPartitions = 
subscribedTopicDescriber.numPartitions(topicId);
+            if (numPartitions == -1) {
+                throw new PartitionAssignorException("Member is subscribed to 
a non-existent topic");
             }
-        } else {
-            groupSpec.memberIds().forEach(memberId -> {
-                Collection<Uuid> topics = 
groupSpec.memberSubscription(memberId).subscribedTopicIds();
-                for (Uuid topicId : topics) {
-                    if (subscribedTopicDescriber.numPartitions(topicId) == -1) 
{
-                        throw new PartitionAssignorException("Member is 
subscribed to a non-existent topic");
-                    }
-                    membersPerTopic
-                        .computeIfAbsent(topicId, k -> new ArrayList<>())
-                        .add(memberId);
-                }
-            });
+            TopicMetadata m = TopicMetadata.create(
+                topicId,
+                numPartitions,
+                numMembers
+            );
+            topics.add(m);
         }
 
-        return membersPerTopic;
+        Map<String, MemberAssignment> assignments = new HashMap<>((int) 
((groupSpec.memberIds().size() / 0.75f) + 1));
+
+        for (String memberId : memberIds) {
+            Map<Uuid, Set<Integer>> assignment = new HashMap<>((int) 
((subscribedTopics.size() / 0.75f) + 1));
+            for (TopicMetadata topicMetadata : topics) {
+                topicMetadata.maybeComputeQuota();
+                addPartitionsToAssignment(topicMetadata, assignment);
+            }
+            assignments.put(memberId, new MemberAssignmentImpl(assignment));
+        }
+
+        return new GroupAssignment(assignments);
     }
 
     /**
-     * The algorithm includes the following steps:
-     * <ol>
-     *      <li> Generate a map of members per topic using the given member 
subscriptions. </li>
-     *      <li> Generate a list of members called potentially unfilled 
members, which consists of members that have not
-     *           met the minimum required quota of partitions for the 
assignment AND get a list called assigned sticky
-     *           partitions for topic, which has the partitions that will be 
retained in the new assignment. </li>
-     *      <li> Generate a list of unassigned partitions by calculating the 
difference between the total partitions
-     *           for the topic and the assigned (sticky) partitions. </li>
-     *      <li> Find members from the potentially unfilled members list that 
haven't met the total required quota
-     *           i.e. minRequiredQuota + 1, if the member is designated to 
receive one of the excess partitions OR
-     *           minRequiredQuota otherwise. </li>
-     *      <li> Assign partitions to them in ranges from the unassigned 
partitions per topic
-     *           based on the remaining partitions value. </li>
-     * </ol>
+     * Assigns partitions to members of a heterogeneous group. Not all members 
are subscribed to the same topics.
      */
-    @Override
-    public GroupAssignment assign(
-        final GroupSpec groupSpec,
-        final SubscribedTopicDescriber subscribedTopicDescriber
+    private GroupAssignment assignHeterogeneousGroup(
+        GroupSpec groupSpec,
+        SubscribedTopicDescriber subscribedTopicDescriber
     ) throws PartitionAssignorException {
 
-        Map<String, MemberAssignment> newTargetAssignment = new HashMap<>();
-
-        // Step 1
-        Map<Uuid, Collection<String>> membersPerTopic = membersPerTopic(
-            groupSpec,
-            subscribedTopicDescriber
-        );
-
-        membersPerTopic.forEach((topicId, membersForTopic) -> {
-            int numPartitionsForTopic = 
subscribedTopicDescriber.numPartitions(topicId);
-            int minRequiredQuota = numPartitionsForTopic / 
membersForTopic.size();
-            // Each member can get only ONE extra partition per topic after 
receiving the minimum quota.
-            int numMembersWithExtraPartition = numPartitionsForTopic % 
membersForTopic.size();
-
-            // Step 2
-            Set<Integer> assignedStickyPartitionsForTopic = new HashSet<>();
-            List<MemberWithRemainingAssignments> potentiallyUnfilledMembers = 
new ArrayList<>();
-
-            for (String memberId : membersForTopic) {
-                Set<Integer> assignedPartitionsForTopic = groupSpec
-                    .memberAssignment(memberId)
-                    .partitions()
-                    .getOrDefault(topicId, Collections.emptySet());
-
-                int currentAssignmentSize = assignedPartitionsForTopic.size();
-                List<Integer> currentAssignmentListForTopic = new 
ArrayList<>(assignedPartitionsForTopic);
-
-                // If there were partitions from this topic that were 
previously assigned to this member, retain as many as possible.
-                // Sort the current assignment in ascending order since we 
want the same partition numbers from each topic
-                // to go to the same member, in order to facilitate joins in 
case of co-partitioned topics.
-                if (currentAssignmentSize > 0) {
-                    int retainedPartitionsCount = min(currentAssignmentSize, 
minRequiredQuota);
-                    Collections.sort(currentAssignmentListForTopic);
-                    for (int i = 0; i < retainedPartitionsCount; i++) {
-                        assignedStickyPartitionsForTopic
-                            .add(currentAssignmentListForTopic.get(i));
-                        newTargetAssignment.computeIfAbsent(memberId, k -> new 
MemberAssignmentImpl(new HashMap<>()))
-                            .partitions()
-                            .computeIfAbsent(topicId, k -> new HashSet<>())
-                            .add(currentAssignmentListForTopic.get(i));
+        List<String> memberIds = sortMemberIds(groupSpec);
+
+        Map<Uuid, TopicMetadata> topics = new HashMap<>();
+
+        for (String memberId : memberIds) {
+            MemberSubscription subs = groupSpec.memberSubscription(memberId);
+            for (Uuid topicId : subs.subscribedTopicIds()) {
+                TopicMetadata topicMetadata = topics.computeIfAbsent(topicId, 
__ -> {
+                    int numPartitions = 
subscribedTopicDescriber.numPartitions(topicId);
+                    if (numPartitions == -1) {
+                        throw new PartitionAssignorException("Member is 
subscribed to a non-existent topic");
                     }
-                }
-
-                // Number of partitions required to meet the minRequiredQuota.
-                // There are 3 cases w.r.t the value of remaining:
-                // 1) remaining < 0: this means that the member has more than 
the min required amount.
-                // 2) If remaining = 0: member has the minimum required 
partitions, but it may get an extra partition, so it is a potentially unfilled 
member.
-                // 3) If remaining > 0: member doesn't have the minimum 
required partitions, so it should be added to potentiallyUnfilledMembers.
-                int remaining = minRequiredQuota - currentAssignmentSize;
-
-                // Retain extra partitions as well when applicable.
-                if (remaining < 0 && numMembersWithExtraPartition > 0) {
-                    numMembersWithExtraPartition--;
-                    // Since we already added the minimumRequiredQuota of 
partitions in the previous step (until minReq - 1), we just need to
-                    // add the extra partition that will be present at the 
index right after min quota was satisfied.
-                    assignedStickyPartitionsForTopic
-                        
.add(currentAssignmentListForTopic.get(minRequiredQuota));
-                    newTargetAssignment.computeIfAbsent(memberId, k -> new 
MemberAssignmentImpl(new HashMap<>()))
-                        .partitions()
-                        .computeIfAbsent(topicId, k -> new HashSet<>())
-                        
.add(currentAssignmentListForTopic.get(minRequiredQuota));
-                } else {
-                    MemberWithRemainingAssignments newPair = new 
MemberWithRemainingAssignments(memberId, remaining);
-                    potentiallyUnfilledMembers.add(newPair);
-                }
+
+                    return TopicMetadata.create(
+                        topicId,
+                        numPartitions,
+                        0
+                    );
+                });
+                topicMetadata.numMembers++;
             }
+        }
 
-            // Step 3
-            // Find the difference between the total partitions per topic and 
the already assigned sticky partitions for the topic to get the unassigned 
partitions.
-            // List of unassigned partitions for topic contains the partitions 
in ascending order.
-            List<Integer> unassignedPartitionsForTopic = new ArrayList<>();
-            for (int i = 0; i < numPartitionsForTopic; i++) {
-                if (!assignedStickyPartitionsForTopic.contains(i)) {
-                    unassignedPartitionsForTopic.add(i);
-                }
+        Map<String, MemberAssignment> assignments = new HashMap<>((int) 
((groupSpec.memberIds().size() / 0.75f) + 1));
+
+        for (String memberId : memberIds) {
+            MemberSubscription subs = groupSpec.memberSubscription(memberId);
+            Map<Uuid, Set<Integer>> assignment = new 
HashMap<>(subs.subscribedTopicIds().size());
+            for (Uuid topicId : subs.subscribedTopicIds()) {
+                TopicMetadata metadata = topics.get(topicId);
+                metadata.maybeComputeQuota();
+                addPartitionsToAssignment(metadata, assignment);
             }
+            assignments.put(memberId, new MemberAssignmentImpl(assignment));
+        }
+
+        return new GroupAssignment(assignments);
+    }
 
-            // Step 4 and Step 5
-            // Account for the extra partitions if necessary and increase the 
required quota by 1.
-            // If remaining > 0 after increasing the required quota, assign 
the remaining number of partitions from the unassigned partitions list.
-            int unassignedPartitionsListStartPointer = 0;
-            for (MemberWithRemainingAssignments pair : 
potentiallyUnfilledMembers) {
-                String memberId = pair.memberId;
-                int remaining = pair.remaining;
-                if (numMembersWithExtraPartition > 0) {
-                    remaining++;
-                    numMembersWithExtraPartition--;
-                }
-                if (remaining > 0) {
-                    List<Integer> partitionsToAssign = 
unassignedPartitionsForTopic
-                        .subList(unassignedPartitionsListStartPointer, 
unassignedPartitionsListStartPointer + remaining);
-                    unassignedPartitionsListStartPointer += remaining;
-                    newTargetAssignment.computeIfAbsent(memberId, k -> new 
MemberAssignmentImpl(new HashMap<>()))
-                        .partitions()
-                        .computeIfAbsent(topicId, k -> new HashSet<>())
-                        .addAll(partitionsToAssign);
-                }
+    /**
+     * Sorts the member Ids in the group based on their instance Id if 
present, otherwise by member Id.
+     * This is done to ensure that the relative ordering of members doesn't 
change with static members
+     * thus resulting in a sticky assignment.
+     *
+     * @param groupSpec     The group specification containing the member 
information.
+     * @return a sorted list of member Ids.
+     */
+    private List<String> sortMemberIds(
+        GroupSpec groupSpec
+    ) {
+        List<String> sortedMemberIds = new ArrayList<>(groupSpec.memberIds());
+        Map<String, Optional<String>> instanceIdCache = new HashMap<>();
+
+        for (String memberId : sortedMemberIds) {
+            instanceIdCache.put(memberId, 
groupSpec.memberSubscription(memberId).instanceId());
+        }
+
+        sortedMemberIds.sort((memberId1, memberId2) -> {
+            Optional<String> instanceId1 = instanceIdCache.get(memberId1);
+            Optional<String> instanceId2 = instanceIdCache.get(memberId2);
+
+            if (instanceId1.isPresent() && instanceId2.isPresent()) {
+                return instanceId1.get().compareTo(instanceId2.get());
+            } else if (instanceId1.isPresent()) {
+                return -1;
+            } else if (instanceId2.isPresent()) {
+                return 1;

Review Comment:
   Understood. It would be great if we could mention this in the javadoc of the 
method.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##########
@@ -65,191 +94,228 @@ public String name() {
     }
 
     /**
-     * Pair of memberId and remaining partitions to meet the quota.
+     * Metadata for a topic including partition and subscription details.
      */
-    private static class MemberWithRemainingAssignments {
+    private static class TopicMetadata {
+        private final Uuid topicId;
+        private final int numPartitions;
+        private int numMembers;
+        private int minQuota = -1;
+        private int extraPartitions = -1;
+        private int nextRange = 0;
+
         /**
-         * Member Id.
+         * Constructs a new TopicMetadata instance.
+         *
+         * @param topicId           The topic Id.
+         * @param numPartitions     The number of partitions.
+         * @param numMembers        The number of subscribed members.
          */
-        private final String memberId;
+        private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) 
{
+            this.topicId = topicId;
+            this.numPartitions = numPartitions;
+            this.numMembers = numMembers;
+        }
 
         /**
-         * Number of partitions required to meet the assignment quota.
+         * Computes the minimum partition quota per member and the extra 
partitions, if not already computed.
          */
-        private final int remaining;
+        private void maybeComputeQuota() {
+            if (minQuota != -1) return;
 
-        public MemberWithRemainingAssignments(String memberId, int remaining) {
-            this.memberId = memberId;
-            this.remaining = remaining;
+            // The minimum number of partitions each member should receive for 
a balanced assignment.
+            minQuota = numPartitions / numMembers;
+
+            // Extra partitions to be distributed one to each member.
+            extraPartitions = numPartitions % numMembers;
+        }
+
+        @Override
+        public String toString() {
+            return "TopicMetadata(topicId=" + topicId +
+                ", numPartitions=" + numPartitions +
+                ", numMembers=" + numMembers +
+                ", minQuota=" + minQuota +
+                ", extraPartitions=" + extraPartitions +
+                ", nextRange=" + nextRange +
+                ')';
         }
     }
 
     /**
-     * Returns a map of topic Ids to a list of members subscribed to them,
-     * based on the given assignment specification and metadata.
-     *
-     * @param groupSpec                     The specification required for 
group assignments.
-     * @param subscribedTopicDescriber      The metadata describer for 
subscribed topics and clusters.
-     * @return A map of topic Ids to a list of member Ids subscribed to them.
-     *
-     * @throws PartitionAssignorException If a member is subscribed to a 
non-existent topic.
+     * Assigns partitions to members of a homogeneous group. All members are 
subscribed to the same set of topics.
+     * Assignment will be co-partitioned when all the topics have an equal 
number of partitions.
      */
-    private Map<Uuid, Collection<String>> membersPerTopic(
-        final GroupSpec groupSpec,
-        final SubscribedTopicDescriber subscribedTopicDescriber
-    ) {
-        Map<Uuid, Collection<String>> membersPerTopic = new HashMap<>();
-
-        if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
-            Collection<String> allMembers = groupSpec.memberIds();
-            Collection<Uuid> topics = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
-                .subscribedTopicIds();
-
-            for (Uuid topicId : topics) {
-                if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
-                    throw new PartitionAssignorException("Member is subscribed 
to a non-existent topic");
-                }
-                membersPerTopic.put(topicId, allMembers);
+    private GroupAssignment assignHomogeneousGroup(
+        GroupSpec groupSpec,
+        SubscribedTopicDescriber subscribedTopicDescriber
+    ) throws PartitionAssignorException {
+        List<String> memberIds = sortMemberIds(groupSpec);
+
+        MemberSubscription subs = 
groupSpec.memberSubscription(memberIds.get(0));

Review Comment:
   This could throw an error if the group does not have any members. Should we 
check if the group is empty in `assign` and return an empty assignment if it is?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##########
@@ -381,9 +558,12 @@ public void 
testReassignmentNumConsumersGreaterThanNumPartitionsWhenOneConsumerA
             mkTopicAssignment(topic1Uuid, 1),
             mkTopicAssignment(topic2Uuid, 1)
         ));
+        expectedAssignment.put(memberC, mkAssignment(
+            mkTopicAssignment(topic1Uuid),
+            mkTopicAssignment(topic2Uuid)
+        ));
 
-        // Consumer C shouldn't get any assignment, due to stickiness A, B 
retain their assignments
-        assertNull(computedAssignment.members().get(memberC));
+        // Consumer C shouldn't get any assignment.

Review Comment:
   Ah, I got confused by:
   ```
           expectedAssignment.put(memberC, mkAssignment(
               mkTopicAssignment(topic1Uuid),
               mkTopicAssignment(topic2Uuid)
           ));
   ```
   I think that we should really avoid this.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * A {@code RangeSet} represents a range of integers from {@code from} 
(inclusive)
+ * to {@code to} (exclusive).
+ * This implementation provides a view over a continuous range of integers 
without actually storing them.
+ */
+class RangeSet implements Set<Integer> {
+    private final int from;
+    private final int to;
+
+    /**
+     * Constructs a {@code RangeSet} with the specified range.
+     *
+     * @param from      The starting value (inclusive) of the range.
+     * @param to        The ending value (exclusive) of the range.
+     */
+    public RangeSet(int from, int to) {
+        this.from = from;
+        this.to = to;
+    }
+
+    @Override
+    public int size() {
+        return to - from;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return size() == 0;
+    }
+
+    @Override
+    public boolean contains(Object o) {
+        if (o instanceof Integer) {
+            int value = (Integer) o;
+            return value >= from && value < to;
+        }
+        return false;
+    }
+
+    @Override
+    public Iterator<Integer> iterator() {
+        return new Iterator<Integer>() {
+            private int current = from;
+
+            @Override
+            public boolean hasNext() {
+                return current < to;
+            }
+
+            @Override
+            public Integer next() {
+                if (!hasNext()) throw new NoSuchElementException();
+                return current++;
+            }
+        };
+    }
+
+    @Override
+    public Object[] toArray() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public <T> T[] toArray(T[] a) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean add(Integer integer) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean remove(Object o) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean containsAll(Collection<?> c) {
+        for (Object o : c) {
+            if (!contains(o)) return false;
+        }
+        return true;
+    }
+
+    @Override
+    public boolean addAll(Collection<? extends Integer> c) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean retainAll(Collection<?> c) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean removeAll(Collection<?> c) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void clear() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public String toString() {
+        return "RangeSet(from=" + from + " (inclusive), to=" + to + " 
(exclusive))";
+    }
+
+    /**
+     * Compares the specified object with this set for equality.
+     * Returns {@code true} if the specified object is also a set,
+     * the two sets have the same size, and every member of the specified
+     * set is contained in this set.
+     *
+     * @param o object to be compared for equality with this set
+     * @return {@code true} if the specified object is equal to this set
+     */
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) {
+            if (!(o instanceof Set)) return false;
+        }

Review Comment:
   I am not sure about the `getClass() != o.getClass()` condition. This means 
that comparing a HashSet with a RangeSet won't work because they don't have the 
same class. Using `!(o instanceof Set)` may be better. Could you please verify 
this in unit tests?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * A {@code RangeSet} represents a range of integers from {@code from} 
(inclusive)
+ * to {@code to} (exclusive).
+ * This implementation provides a view over a continuous range of integers 
without actually storing them.
+ */
+class RangeSet implements Set<Integer> {
+    private final int from;
+    private final int to;
+
+    /**
+     * Constructs a {@code RangeSet} with the specified range.
+     *
+     * @param from      The starting value (inclusive) of the range.
+     * @param to        The ending value (exclusive) of the range.
+     */
+    public RangeSet(int from, int to) {
+        this.from = from;
+        this.to = to;
+    }
+
+    @Override
+    public int size() {
+        return to - from;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return size() == 0;
+    }
+
+    @Override
+    public boolean contains(Object o) {
+        if (o instanceof Integer) {
+            int value = (Integer) o;
+            return value >= from && value < to;
+        }
+        return false;
+    }
+
+    @Override
+    public Iterator<Integer> iterator() {
+        return new Iterator<Integer>() {
+            private int current = from;
+
+            @Override
+            public boolean hasNext() {
+                return current < to;
+            }
+
+            @Override
+            public Integer next() {
+                if (!hasNext()) throw new NoSuchElementException();
+                return current++;
+            }
+        };
+    }
+
+    @Override
+    public Object[] toArray() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public <T> T[] toArray(T[] a) {
+        throw new UnsupportedOperationException();
+    }

Review Comment:
   I have noticed that many tests are failing in the build. I suspect that they 
do because those two methods are not implemented.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##########
@@ -65,191 +94,228 @@ public String name() {
     }
 
     /**
-     * Pair of memberId and remaining partitions to meet the quota.
+     * Metadata for a topic including partition and subscription details.
      */
-    private static class MemberWithRemainingAssignments {
+    private static class TopicMetadata {
+        private final Uuid topicId;
+        private final int numPartitions;
+        private int numMembers;
+        private int minQuota = -1;
+        private int extraPartitions = -1;
+        private int nextRange = 0;
+
         /**
-         * Member Id.
+         * Constructs a new TopicMetadata instance.
+         *
+         * @param topicId           The topic Id.
+         * @param numPartitions     The number of partitions.
+         * @param numMembers        The number of subscribed members.
          */
-        private final String memberId;
+        private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) 
{
+            this.topicId = topicId;
+            this.numPartitions = numPartitions;
+            this.numMembers = numMembers;
+        }
 
         /**
-         * Number of partitions required to meet the assignment quota.
+         * Computes the minimum partition quota per member and the extra 
partitions, if not already computed.
          */
-        private final int remaining;
+        private void maybeComputeQuota() {
+            if (minQuota != -1) return;
 
-        public MemberWithRemainingAssignments(String memberId, int remaining) {
-            this.memberId = memberId;
-            this.remaining = remaining;
+            // The minimum number of partitions each member should receive for 
a balanced assignment.
+            minQuota = numPartitions / numMembers;
+
+            // Extra partitions to be distributed one to each member.
+            extraPartitions = numPartitions % numMembers;
+        }
+
+        @Override
+        public String toString() {
+            return "TopicMetadata(topicId=" + topicId +
+                ", numPartitions=" + numPartitions +
+                ", numMembers=" + numMembers +
+                ", minQuota=" + minQuota +
+                ", extraPartitions=" + extraPartitions +
+                ", nextRange=" + nextRange +
+                ')';
         }
     }
 
     /**
-     * Returns a map of topic Ids to a list of members subscribed to them,
-     * based on the given assignment specification and metadata.
-     *
-     * @param groupSpec                     The specification required for 
group assignments.
-     * @param subscribedTopicDescriber      The metadata describer for 
subscribed topics and clusters.
-     * @return A map of topic Ids to a list of member Ids subscribed to them.
-     *
-     * @throws PartitionAssignorException If a member is subscribed to a 
non-existent topic.
+     * Assigns partitions to members of a homogeneous group. All members are 
subscribed to the same set of topics.
+     * Assignment will be co-partitioned when all the topics have an equal 
number of partitions.
      */
-    private Map<Uuid, Collection<String>> membersPerTopic(
-        final GroupSpec groupSpec,
-        final SubscribedTopicDescriber subscribedTopicDescriber
-    ) {
-        Map<Uuid, Collection<String>> membersPerTopic = new HashMap<>();
-
-        if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
-            Collection<String> allMembers = groupSpec.memberIds();
-            Collection<Uuid> topics = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
-                .subscribedTopicIds();
-
-            for (Uuid topicId : topics) {
-                if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
-                    throw new PartitionAssignorException("Member is subscribed 
to a non-existent topic");
-                }
-                membersPerTopic.put(topicId, allMembers);
+    private GroupAssignment assignHomogeneousGroup(
+        GroupSpec groupSpec,
+        SubscribedTopicDescriber subscribedTopicDescriber
+    ) throws PartitionAssignorException {
+        List<String> memberIds = sortMemberIds(groupSpec);
+
+        MemberSubscription subs = 
groupSpec.memberSubscription(memberIds.get(0));
+        Set<Uuid> subscribedTopics = new HashSet<>(subs.subscribedTopicIds());
+        List<TopicMetadata> topics = new ArrayList<>(subscribedTopics.size());
+        int numMembers = groupSpec.memberIds().size();
+
+        for (Uuid topicId : subscribedTopics) {
+            int numPartitions = 
subscribedTopicDescriber.numPartitions(topicId);
+            if (numPartitions == -1) {
+                throw new PartitionAssignorException("Member is subscribed to 
a non-existent topic");
             }
-        } else {
-            groupSpec.memberIds().forEach(memberId -> {
-                Collection<Uuid> topics = 
groupSpec.memberSubscription(memberId).subscribedTopicIds();
-                for (Uuid topicId : topics) {
-                    if (subscribedTopicDescriber.numPartitions(topicId) == -1) 
{
-                        throw new PartitionAssignorException("Member is 
subscribed to a non-existent topic");
-                    }
-                    membersPerTopic
-                        .computeIfAbsent(topicId, k -> new ArrayList<>())
-                        .add(memberId);
-                }
-            });
+            TopicMetadata m = new TopicMetadata(
+                topicId,
+                numPartitions,
+                numMembers
+            );
+            topics.add(m);
         }
 
-        return membersPerTopic;
+        Map<String, MemberAssignment> assignments = new HashMap<>((int) 
((groupSpec.memberIds().size() / 0.75f) + 1));
+        int memberAssignmentInitialCapacity = (int) ((subscribedTopics.size() 
/ 0.75f) + 1);
+
+        for (String memberId : memberIds) {
+            Map<Uuid, Set<Integer>> assignment = new 
HashMap<>(memberAssignmentInitialCapacity);
+            for (TopicMetadata topicMetadata : topics) {
+                topicMetadata.maybeComputeQuota();
+                addPartitionsToAssignment(topicMetadata, assignment);
+            }
+            assignments.put(memberId, new MemberAssignmentImpl(assignment));
+        }
+
+        return new GroupAssignment(assignments);
     }
 
     /**
-     * The algorithm includes the following steps:
-     * <ol>
-     *      <li> Generate a map of members per topic using the given member 
subscriptions. </li>
-     *      <li> Generate a list of members called potentially unfilled 
members, which consists of members that have not
-     *           met the minimum required quota of partitions for the 
assignment AND get a list called assigned sticky
-     *           partitions for topic, which has the partitions that will be 
retained in the new assignment. </li>
-     *      <li> Generate a list of unassigned partitions by calculating the 
difference between the total partitions
-     *           for the topic and the assigned (sticky) partitions. </li>
-     *      <li> Find members from the potentially unfilled members list that 
haven't met the total required quota
-     *           i.e. minRequiredQuota + 1, if the member is designated to 
receive one of the excess partitions OR
-     *           minRequiredQuota otherwise. </li>
-     *      <li> Assign partitions to them in ranges from the unassigned 
partitions per topic
-     *           based on the remaining partitions value. </li>
-     * </ol>
+     * Assigns partitions to members of a heterogeneous group. Not all members 
are subscribed to the same topics.
      */
-    @Override
-    public GroupAssignment assign(
-        final GroupSpec groupSpec,
-        final SubscribedTopicDescriber subscribedTopicDescriber
+    private GroupAssignment assignHeterogeneousGroup(
+        GroupSpec groupSpec,
+        SubscribedTopicDescriber subscribedTopicDescriber
     ) throws PartitionAssignorException {
 
-        Map<String, MemberAssignment> newTargetAssignment = new HashMap<>();
-
-        // Step 1
-        Map<Uuid, Collection<String>> membersPerTopic = membersPerTopic(
-            groupSpec,
-            subscribedTopicDescriber
-        );
-
-        membersPerTopic.forEach((topicId, membersForTopic) -> {
-            int numPartitionsForTopic = 
subscribedTopicDescriber.numPartitions(topicId);
-            int minRequiredQuota = numPartitionsForTopic / 
membersForTopic.size();
-            // Each member can get only ONE extra partition per topic after 
receiving the minimum quota.
-            int numMembersWithExtraPartition = numPartitionsForTopic % 
membersForTopic.size();
-
-            // Step 2
-            Set<Integer> assignedStickyPartitionsForTopic = new HashSet<>();
-            List<MemberWithRemainingAssignments> potentiallyUnfilledMembers = 
new ArrayList<>();
-
-            for (String memberId : membersForTopic) {
-                Set<Integer> assignedPartitionsForTopic = groupSpec
-                    .memberAssignment(memberId)
-                    .partitions()
-                    .getOrDefault(topicId, Collections.emptySet());
-
-                int currentAssignmentSize = assignedPartitionsForTopic.size();
-                List<Integer> currentAssignmentListForTopic = new 
ArrayList<>(assignedPartitionsForTopic);
-
-                // If there were partitions from this topic that were 
previously assigned to this member, retain as many as possible.
-                // Sort the current assignment in ascending order since we 
want the same partition numbers from each topic
-                // to go to the same member, in order to facilitate joins in 
case of co-partitioned topics.
-                if (currentAssignmentSize > 0) {
-                    int retainedPartitionsCount = min(currentAssignmentSize, 
minRequiredQuota);
-                    Collections.sort(currentAssignmentListForTopic);
-                    for (int i = 0; i < retainedPartitionsCount; i++) {
-                        assignedStickyPartitionsForTopic
-                            .add(currentAssignmentListForTopic.get(i));
-                        newTargetAssignment.computeIfAbsent(memberId, k -> new 
MemberAssignmentImpl(new HashMap<>()))
-                            .partitions()
-                            .computeIfAbsent(topicId, k -> new HashSet<>())
-                            .add(currentAssignmentListForTopic.get(i));
+        List<String> memberIds = sortMemberIds(groupSpec);
+
+        Map<Uuid, TopicMetadata> topics = new HashMap<>();
+
+        for (String memberId : memberIds) {
+            MemberSubscription subs = groupSpec.memberSubscription(memberId);
+            for (Uuid topicId : subs.subscribedTopicIds()) {
+                TopicMetadata topicMetadata = topics.computeIfAbsent(topicId, 
__ -> {
+                    int numPartitions = 
subscribedTopicDescriber.numPartitions(topicId);
+                    if (numPartitions == -1) {
+                        throw new PartitionAssignorException("Member is 
subscribed to a non-existent topic");
                     }
-                }
-
-                // Number of partitions required to meet the minRequiredQuota.
-                // There are 3 cases w.r.t the value of remaining:
-                // 1) remaining < 0: this means that the member has more than 
the min required amount.
-                // 2) If remaining = 0: member has the minimum required 
partitions, but it may get an extra partition, so it is a potentially unfilled 
member.
-                // 3) If remaining > 0: member doesn't have the minimum 
required partitions, so it should be added to potentiallyUnfilledMembers.
-                int remaining = minRequiredQuota - currentAssignmentSize;
-
-                // Retain extra partitions as well when applicable.
-                if (remaining < 0 && numMembersWithExtraPartition > 0) {
-                    numMembersWithExtraPartition--;
-                    // Since we already added the minimumRequiredQuota of 
partitions in the previous step (until minReq - 1), we just need to
-                    // add the extra partition that will be present at the 
index right after min quota was satisfied.
-                    assignedStickyPartitionsForTopic
-                        
.add(currentAssignmentListForTopic.get(minRequiredQuota));
-                    newTargetAssignment.computeIfAbsent(memberId, k -> new 
MemberAssignmentImpl(new HashMap<>()))
-                        .partitions()
-                        .computeIfAbsent(topicId, k -> new HashSet<>())
-                        
.add(currentAssignmentListForTopic.get(minRequiredQuota));
-                } else {
-                    MemberWithRemainingAssignments newPair = new 
MemberWithRemainingAssignments(memberId, remaining);
-                    potentiallyUnfilledMembers.add(newPair);
-                }
+
+                    return new TopicMetadata(
+                        topicId,
+                        numPartitions,
+                        0
+                    );
+                });
+                topicMetadata.numMembers++;
             }
+        }
+
+        Map<String, MemberAssignment> assignments = new HashMap<>((int) 
((groupSpec.memberIds().size() / 0.75f) + 1));
 
-            // Step 3
-            // Find the difference between the total partitions per topic and 
the already assigned sticky partitions for the topic to get the unassigned 
partitions.
-            // List of unassigned partitions for topic contains the partitions 
in ascending order.
-            List<Integer> unassignedPartitionsForTopic = new ArrayList<>();
-            for (int i = 0; i < numPartitionsForTopic; i++) {
-                if (!assignedStickyPartitionsForTopic.contains(i)) {
-                    unassignedPartitionsForTopic.add(i);
-                }
+        for (String memberId : memberIds) {
+            MemberSubscription subs = groupSpec.memberSubscription(memberId);
+            Map<Uuid, Set<Integer>> assignment = new HashMap<>((int) 
((subs.subscribedTopicIds().size() / 0.75f) + 1));
+            for (Uuid topicId : subs.subscribedTopicIds()) {
+                TopicMetadata metadata = topics.get(topicId);
+                metadata.maybeComputeQuota();
+                addPartitionsToAssignment(metadata, assignment);
             }
+            assignments.put(memberId, new MemberAssignmentImpl(assignment));
+        }
+
+        return new GroupAssignment(assignments);
+    }
+
+    /**
+     * Sorts the member Ids in the group based on their instance Id if 
present, otherwise by member Id.
+     * This is done to ensure that the relative ordering of members doesn't 
change with static members
+     * thus resulting in a sticky assignment.
+     *
+     * @param groupSpec     The group specification containing the member 
information.
+     * @return a sorted list of member Ids.
+     */
+    private List<String> sortMemberIds(
+        GroupSpec groupSpec
+    ) {
+        List<String> sortedMemberIds = new ArrayList<>(groupSpec.memberIds());
+        Map<String, Optional<String>> instanceIdCache = new HashMap<>();
+
+        // Caching the instanceIds improves performance.
+        for (String memberId : sortedMemberIds) {
+            instanceIdCache.put(memberId, 
groupSpec.memberSubscription(memberId).instanceId());
+        }

Review Comment:
   I ran the 10k members with and without this locally and it does not make a 
difference. Could you perhaps double check too? If it does not bring a 
significant gain, I would remove it.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##########
@@ -91,7 +96,10 @@ public void testOneConsumerNoTopic() {
             subscribedTopicMetadata
         );
 
-        assertEquals(Collections.emptyMap(), groupAssignment.members());
+        Map<String, MemberAssignment> expectedAssignment = new HashMap<>();
+        expectedAssignment.put(memberA, new 
MemberAssignmentImpl(Collections.emptyMap()));

Review Comment:
   I meant the following:
   ```
           Map<String, MemberAssignment> expectedAssignment = 
Collections.singletonMap(
               memberA,
               new MemberAssignmentImpl(Collections.emptyMap())
           );
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##########
@@ -65,191 +94,228 @@ public String name() {
     }
 
     /**
-     * Pair of memberId and remaining partitions to meet the quota.
+     * Metadata for a topic including partition and subscription details.
      */
-    private static class MemberWithRemainingAssignments {
+    private static class TopicMetadata {
+        private final Uuid topicId;
+        private final int numPartitions;
+        private int numMembers;
+        private int minQuota = -1;
+        private int extraPartitions = -1;
+        private int nextRange = 0;
+
         /**
-         * Member Id.
+         * Constructs a new TopicMetadata instance.
+         *
+         * @param topicId           The topic Id.
+         * @param numPartitions     The number of partitions.
+         * @param numMembers        The number of subscribed members.
          */
-        private final String memberId;
+        private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) 
{
+            this.topicId = topicId;
+            this.numPartitions = numPartitions;
+            this.numMembers = numMembers;
+        }
 
         /**
-         * Number of partitions required to meet the assignment quota.
+         * Computes the minimum partition quota per member and the extra 
partitions, if not already computed.
          */
-        private final int remaining;
+        private void maybeComputeQuota() {
+            if (minQuota != -1) return;
 
-        public MemberWithRemainingAssignments(String memberId, int remaining) {
-            this.memberId = memberId;
-            this.remaining = remaining;
+            // The minimum number of partitions each member should receive for 
a balanced assignment.
+            minQuota = numPartitions / numMembers;
+
+            // Extra partitions to be distributed one to each member.
+            extraPartitions = numPartitions % numMembers;
+        }
+
+        @Override
+        public String toString() {
+            return "TopicMetadata(topicId=" + topicId +
+                ", numPartitions=" + numPartitions +
+                ", numMembers=" + numMembers +
+                ", minQuota=" + minQuota +
+                ", extraPartitions=" + extraPartitions +
+                ", nextRange=" + nextRange +
+                ')';
         }
     }
 
     /**
-     * Returns a map of topic Ids to a list of members subscribed to them,
-     * based on the given assignment specification and metadata.
-     *
-     * @param groupSpec                     The specification required for 
group assignments.
-     * @param subscribedTopicDescriber      The metadata describer for 
subscribed topics and clusters.
-     * @return A map of topic Ids to a list of member Ids subscribed to them.
-     *
-     * @throws PartitionAssignorException If a member is subscribed to a 
non-existent topic.
+     * Assigns partitions to members of a homogeneous group. All members are 
subscribed to the same set of topics.
+     * Assignment will be co-partitioned when all the topics have an equal 
number of partitions.
      */
-    private Map<Uuid, Collection<String>> membersPerTopic(
-        final GroupSpec groupSpec,
-        final SubscribedTopicDescriber subscribedTopicDescriber
-    ) {
-        Map<Uuid, Collection<String>> membersPerTopic = new HashMap<>();
-
-        if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
-            Collection<String> allMembers = groupSpec.memberIds();
-            Collection<Uuid> topics = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
-                .subscribedTopicIds();
-
-            for (Uuid topicId : topics) {
-                if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
-                    throw new PartitionAssignorException("Member is subscribed 
to a non-existent topic");
-                }
-                membersPerTopic.put(topicId, allMembers);
+    private GroupAssignment assignHomogeneousGroup(
+        GroupSpec groupSpec,
+        SubscribedTopicDescriber subscribedTopicDescriber
+    ) throws PartitionAssignorException {
+        List<String> memberIds = sortMemberIds(groupSpec);
+
+        MemberSubscription subs = 
groupSpec.memberSubscription(memberIds.get(0));
+        Set<Uuid> subscribedTopics = new HashSet<>(subs.subscribedTopicIds());
+        List<TopicMetadata> topics = new ArrayList<>(subscribedTopics.size());
+        int numMembers = groupSpec.memberIds().size();
+
+        for (Uuid topicId : subscribedTopics) {
+            int numPartitions = 
subscribedTopicDescriber.numPartitions(topicId);
+            if (numPartitions == -1) {
+                throw new PartitionAssignorException("Member is subscribed to 
a non-existent topic");
             }
-        } else {
-            groupSpec.memberIds().forEach(memberId -> {
-                Collection<Uuid> topics = 
groupSpec.memberSubscription(memberId).subscribedTopicIds();
-                for (Uuid topicId : topics) {
-                    if (subscribedTopicDescriber.numPartitions(topicId) == -1) 
{
-                        throw new PartitionAssignorException("Member is 
subscribed to a non-existent topic");
-                    }
-                    membersPerTopic
-                        .computeIfAbsent(topicId, k -> new ArrayList<>())
-                        .add(memberId);
-                }
-            });
+            TopicMetadata m = new TopicMetadata(
+                topicId,
+                numPartitions,
+                numMembers
+            );
+            topics.add(m);
         }
 
-        return membersPerTopic;
+        Map<String, MemberAssignment> assignments = new HashMap<>((int) 
((groupSpec.memberIds().size() / 0.75f) + 1));
+        int memberAssignmentInitialCapacity = (int) ((subscribedTopics.size() 
/ 0.75f) + 1);
+
+        for (String memberId : memberIds) {
+            Map<Uuid, Set<Integer>> assignment = new 
HashMap<>(memberAssignmentInitialCapacity);
+            for (TopicMetadata topicMetadata : topics) {
+                topicMetadata.maybeComputeQuota();
+                addPartitionsToAssignment(topicMetadata, assignment);
+            }
+            assignments.put(memberId, new MemberAssignmentImpl(assignment));
+        }
+
+        return new GroupAssignment(assignments);
     }
 
     /**
-     * The algorithm includes the following steps:
-     * <ol>
-     *      <li> Generate a map of members per topic using the given member 
subscriptions. </li>
-     *      <li> Generate a list of members called potentially unfilled 
members, which consists of members that have not
-     *           met the minimum required quota of partitions for the 
assignment AND get a list called assigned sticky
-     *           partitions for topic, which has the partitions that will be 
retained in the new assignment. </li>
-     *      <li> Generate a list of unassigned partitions by calculating the 
difference between the total partitions
-     *           for the topic and the assigned (sticky) partitions. </li>
-     *      <li> Find members from the potentially unfilled members list that 
haven't met the total required quota
-     *           i.e. minRequiredQuota + 1, if the member is designated to 
receive one of the excess partitions OR
-     *           minRequiredQuota otherwise. </li>
-     *      <li> Assign partitions to them in ranges from the unassigned 
partitions per topic
-     *           based on the remaining partitions value. </li>
-     * </ol>
+     * Assigns partitions to members of a heterogeneous group. Not all members 
are subscribed to the same topics.
      */
-    @Override
-    public GroupAssignment assign(
-        final GroupSpec groupSpec,
-        final SubscribedTopicDescriber subscribedTopicDescriber
+    private GroupAssignment assignHeterogeneousGroup(
+        GroupSpec groupSpec,
+        SubscribedTopicDescriber subscribedTopicDescriber
     ) throws PartitionAssignorException {
 

Review Comment:
   nit: We could remove this empty line.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeSetTest.java:
##########
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+import static org.apache.kafka.common.utils.Utils.mkSet;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+class RangeSetTest {

Review Comment:
   nit: `public`. We usually keep test classes public.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * A {@code RangeSet} represents a range of integers from {@code from} 
(inclusive)
+ * to {@code to} (exclusive).
+ * This implementation provides a view over a continuous range of integers 
without actually storing them.
+ */
+class RangeSet implements Set<Integer> {
+    private final int from;
+    private final int to;
+
+    /**
+     * Constructs a {@code RangeSet} with the specified range.
+     *
+     * @param from      The starting value (inclusive) of the range.
+     * @param to        The ending value (exclusive) of the range.
+     */
+    public RangeSet(int from, int to) {
+        this.from = from;
+        this.to = to;
+    }
+
+    @Override
+    public int size() {
+        return to - from;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return size() == 0;
+    }
+
+    @Override
+    public boolean contains(Object o) {
+        if (o instanceof Integer) {
+            int value = (Integer) o;
+            return value >= from && value < to;
+        }
+        return false;
+    }
+
+    @Override
+    public Iterator<Integer> iterator() {
+        return new Iterator<Integer>() {
+            private int current = from;
+
+            @Override
+            public boolean hasNext() {
+                return current < to;
+            }
+
+            @Override
+            public Integer next() {
+                if (!hasNext()) throw new NoSuchElementException();
+                return current++;
+            }
+        };
+    }
+
+    @Override
+    public Object[] toArray() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public <T> T[] toArray(T[] a) {
+        throw new UnsupportedOperationException();
+    }

Review Comment:
   From the logs: 
   ```
   java.lang.UnsupportedOperationException
        at 
org.apache.kafka.coordinator.group.assignor.RangeSet.toArray(RangeSet.java:83)
        at java.base/java.util.ArrayList.<init>(ArrayList.java:181)
        at 
org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newTargetAssignmentRecord(CoordinatorRecordHelpers.java:242)
        at 
org.apache.kafka.coordinator.group.consumer.TargetAssignmentBuilder.build(TargetAssignmentBuilder.java:368)
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * A {@code RangeSet} represents a range of integers from {@code from} 
(inclusive)
+ * to {@code to} (exclusive).
+ * This implementation provides a view over a continuous range of integers 
without actually storing them.
+ */
+class RangeSet implements Set<Integer> {
+    private final int from;
+    private final int to;
+
+    /**
+     * Constructs a {@code RangeSet} with the specified range.
+     *
+     * @param from      The starting value (inclusive) of the range.
+     * @param to        The ending value (exclusive) of the range.
+     */
+    public RangeSet(int from, int to) {
+        this.from = from;
+        this.to = to;
+    }
+
+    @Override
+    public int size() {
+        return to - from;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return size() == 0;
+    }
+
+    @Override
+    public boolean contains(Object o) {
+        if (o instanceof Integer) {
+            int value = (Integer) o;
+            return value >= from && value < to;
+        }
+        return false;
+    }
+
+    @Override
+    public Iterator<Integer> iterator() {
+        return new Iterator<Integer>() {
+            private int current = from;
+
+            @Override
+            public boolean hasNext() {
+                return current < to;
+            }
+
+            @Override
+            public Integer next() {
+                if (!hasNext()) throw new NoSuchElementException();
+                return current++;
+            }
+        };
+    }
+
+    @Override
+    public Object[] toArray() {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public <T> T[] toArray(T[] a) {
+        throw new UnsupportedOperationException();
+    }

Review Comment:
   The `TargetAssignmentBuilderBenchmark` results are very likely impacted by 
this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to