This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 278a93c45d9 KAFKA-18901: [1/N] Improved homogeneous SimpleAssignor 
(#19142)
278a93c45d9 is described below

commit 278a93c45d9622ecb30a31506420213bf70e521d
Author: Andrew Schofield <[email protected]>
AuthorDate: Tue Mar 11 10:08:31 2025 +0000

    KAFKA-18901: [1/N] Improved homogeneous SimpleAssignor (#19142)
    
    The current homogeneous SimpleAssignor for share groups is not very good
    at revoking partitions which have previously been assigned when the
    number of members increases. This PR improves the situation.
    
    It also fixes the sorting of assignments in `kafka-consumer-groups.sh`
    and `kafka-share-groups.sh` so that it sorts partition indices
    numerically instead of alphabetically. It also adds the missing number
    of partitions column for share groups.
---
 .../coordinator/group/assignor/SimpleAssignor.java | 233 ++++++++++++++-------
 .../group/assignor/SimpleAssignorTest.java         |  59 +++++-
 .../tools/consumer/group/ConsumerGroupCommand.java |   2 +-
 .../tools/consumer/group/ShareGroupCommand.java    |  16 +-
 .../consumer/group/ShareGroupCommandTest.java      |  16 +-
 5 files changed, 229 insertions(+), 97 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
index 28c89966936..16ada6c4668 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
@@ -34,13 +34,26 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
+import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
 
 import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.HOMOGENEOUS;
 
 /**
- * A simple partition assignor that assigns partitions of the subscribed 
topics based on the rules defined in KIP-932 to different members.
+ * A simple partition assignor for share groups that assigns partitions of the 
subscribed topics
+ * based on the rules defined in KIP-932 to different members. It is not 
rack-aware.
+ * <p>
+ * Assignments are done according to the following principles:
+ * <ol>
+ *   <li>Balance:          Ensure partitions are distributed equally among all 
members.
+ *                         The difference in assignments sizes between any two 
members
+ *                         should not exceed one partition.</li>
+ *   <li>Stickiness:       Minimize partition movements among members by 
retaining
+ *                         as much of the existing assignment as possible.</li>
+ * </ol>
+ * <p>
+ * Balance is prioritized above stickiness.
  */
 public class SimpleAssignor implements ShareGroupPartitionAssignor {
 
@@ -60,13 +73,13 @@ public class SimpleAssignor implements 
ShareGroupPartitionAssignor {
             return new GroupAssignment(Map.of());
 
         if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
-            return assignHomogenous(groupSpec, subscribedTopicDescriber);
+            return assignHomogeneous(groupSpec, subscribedTopicDescriber);
         } else {
             return assignHeterogeneous(groupSpec, subscribedTopicDescriber);
         }
     }
 
-    private GroupAssignment assignHomogenous(
+    private GroupAssignment assignHomogeneous(
         GroupSpec groupSpec,
         SubscribedTopicDescriber subscribedTopicDescriber
     ) {
@@ -107,7 +120,7 @@ public class SimpleAssignor implements 
ShareGroupPartitionAssignor {
 
     /**
      * Get the current assignment by topic partitions.
-     * @param groupSpec - The group metadata specifications.
+     * @param groupSpec The group metadata specifications.
      * @return the current assignment for subscribed topic partitions to 
memberIds.
      */
     private Map<TopicIdPartition, List<String>> currentAssignment(GroupSpec 
groupSpec) {
@@ -123,10 +136,10 @@ public class SimpleAssignor implements 
ShareGroupPartitionAssignor {
 
     /**
      * This function computes the new assignment for a homogeneous group.
-     * @param groupSpec - The group metadata specifications.
-     * @param subscribedTopicIds - The set of all the subscribed topic ids for 
the group.
-     * @param targetPartitions - The list of all topic partitions that need 
assignment.
-     * @param currentAssignment - The current assignment for subscribed topic 
partitions to memberIds.
+     * @param groupSpec           The group metadata specifications.
+     * @param subscribedTopicIds  The set of all the subscribed topic ids for 
the group.
+     * @param targetPartitions    The list of all topic partitions that need 
assignment.
+     * @param currentAssignment   The current assignment for subscribed topic 
partitions to memberIds.
      * @return the new partition assignment for the members of the group.
      */
     private GroupAssignment newAssignmentHomogeneous(
@@ -135,49 +148,63 @@ public class SimpleAssignor implements 
ShareGroupPartitionAssignor {
         List<TopicIdPartition> targetPartitions,
         Map<TopicIdPartition, List<String>> currentAssignment
     ) {
-        Map<TopicIdPartition, List<String>> newAssignment = new HashMap<>();
-
-        // Step 1: Hash member IDs to topic partitions.
-        memberHashAssignment(targetPartitions, groupSpec.memberIds(), 
newAssignment);
-
-        // Step 2: Round-robin assignment for unassigned partitions which do 
not have members already assigned in the current assignment.
-        List<TopicIdPartition> unassignedPartitions = targetPartitions.stream()
-            .filter(targetPartition -> 
!newAssignment.containsKey(targetPartition))
-            .filter(targetPartition -> 
!currentAssignment.containsKey(targetPartition))
-            .toList();
-
-        roundRobinAssignment(groupSpec.memberIds(), unassignedPartitions, 
newAssignment);
-
-        // Step 3: We combine current assignment and new assignment.
-        Map<String, Set<TopicIdPartition>> finalAssignment = new HashMap<>();
-
-        // As per the KIP, we should revoke the assignments from current 
assignment for partitions that were assigned by step 1
-        // in the new assignment and have members in current assignment by 
step 2. But we haven't implemented it to avoid the
-        // complexity in both the implementation and the run time complexity. 
This step was mentioned in the KIP to reduce
-        // the burden of certain members of the share groups. This can be 
achieved with the help of limiting the max
-        // no. of partitions assignment for every member(KAFKA-18788). Hence, 
the potential problem of burdening
-        // the share consumers will be addressed in a future PR.
+        // For entirely balanced assignment, we would expect 
(numTargetPartitions / numGroupMembers) partitions per member, rounded upwards.
+        // That can be expressed as         Math.ceil(numTargetPartitions / 
(double) numGroupMembers)
+        // Using integer arithmetic, as     (numTargetPartitions + 
numGroupMembers - 1) / numGroupMembers
+        int numGroupMembers = groupSpec.memberIds().size();
+        int numTargetPartitions = targetPartitions.size();
+        int desiredAssignmentCount = (numTargetPartitions + numGroupMembers - 
1) / numGroupMembers;
+
+        Map<TopicIdPartition, List<String>> newAssignment = 
newHashMap(numTargetPartitions);
+
+        // Hash member IDs to topic partitions. Each member will be assigned 
one partition, but some partitions
+        // might have been assigned to more than one member.
+        memberHashAssignment(groupSpec.memberIds(), targetPartitions, 
newAssignment);
+
+        // Combine current and new hashed assignments, sized to accommodate 
the expected number of mappings.
+        Map<String, Set<TopicIdPartition>> finalAssignment = 
newHashMap(numGroupMembers);
+        Map<TopicIdPartition, Set<String>> finalAssignmentByPartition = 
newHashMap(numTargetPartitions);
+
+        // First, take the members assigned by hashing.
+        newAssignment.forEach((targetPartition, members) -> 
members.forEach(member -> {
+            finalAssignment.computeIfAbsent(member, k -> new 
HashSet<>()).add(targetPartition);
+            finalAssignmentByPartition.computeIfAbsent(targetPartition, k -> 
new HashSet<>()).add(member);
+        }));
 
-        newAssignment.forEach((targetPartition, members) -> 
members.forEach(member ->
-            finalAssignment.computeIfAbsent(member, k -> new 
HashSet<>()).add(targetPartition)));
+        // Then, take the members from the current assignment, making sure 
that no member has too many assigned partitions.
         // When combining current assignment, we need to only consider the 
topics in current assignment that are also being
         // subscribed in the new assignment as well.
         currentAssignment.forEach((targetPartition, members) -> {
-            if (subscribedTopicIds.contains(targetPartition.topicId()))
+            if (subscribedTopicIds.contains(targetPartition.topicId())) {
                 members.forEach(member -> {
-                    if (groupSpec.memberIds().contains(member) && 
!newAssignment.containsKey(targetPartition))
-                        finalAssignment.computeIfAbsent(member, k -> new 
HashSet<>()).add(targetPartition);
+                    if (groupSpec.memberIds().contains(member)) {
+                        Set<TopicIdPartition> memberPartitions = 
finalAssignment.computeIfAbsent(member, k -> new HashSet<>());
+                        if ((memberPartitions.size() < desiredAssignmentCount) 
&& !newAssignment.containsKey(targetPartition)) {
+                            memberPartitions.add(targetPartition);
+                            
finalAssignmentByPartition.computeIfAbsent(targetPartition, k -> new 
HashSet<>()).add(member);
+                        }
+                    }
                 });
+            }
         });
 
+        // Finally, round-robin assignment for unassigned partitions which do 
not already have members assigned.
+        // The order of steps differs slightly from KIP-932 because the 
desired assignment count has been taken into
+        // account when copying partitions across from the current assignment, 
and this is more convenient.
+        List<TopicIdPartition> unassignedPartitions = targetPartitions.stream()
+            .filter(targetPartition -> 
!finalAssignmentByPartition.containsKey(targetPartition))
+            .toList();
+
+        roundRobinAssignmentWithCount(groupSpec.memberIds(), 
unassignedPartitions, finalAssignment, desiredAssignmentCount);
+
         return groupAssignment(finalAssignment, groupSpec.memberIds());
     }
 
     /**
      * This function computes the new assignment for a heterogeneous group.
-     * @param groupSpec - The group metadata specifications.
-     * @param memberToPartitionsSubscription - The member to subscribed topic 
partitions map.
-     * @param currentAssignment - The current assignment for subscribed topic 
partitions to memberIds.
+     * @param groupSpec                       The group metadata 
specifications.
+     * @param memberToPartitionsSubscription  The member to subscribed topic 
partitions map.
+     * @param currentAssignment               The current assignment for 
subscribed topic partitions to memberIds.
      * @return the new partition assignment for the members of the group.
      */
     private GroupAssignment newAssignmentHeterogeneous(
@@ -185,6 +212,7 @@ public class SimpleAssignor implements 
ShareGroupPartitionAssignor {
         Map<String, List<TopicIdPartition>> memberToPartitionsSubscription,
         Map<TopicIdPartition, List<String>> currentAssignment
     ) {
+        int numGroupMembers = groupSpec.memberIds().size();
 
         // Exhaustive set of all subscribed topic partitions.
         Set<TopicIdPartition> targetPartitions = new LinkedHashSet<>();
@@ -199,7 +227,7 @@ public class SimpleAssignor implements 
ShareGroupPartitionAssignor {
 
         // Step 1: Hash member IDs to partitions.
         memberToPartitionsSubscription.forEach((member, partitions) ->
-            memberHashAssignment(partitions, List.of(member), newAssignment));
+            memberHashAssignment(List.of(member), partitions, newAssignment));
 
         // Step 2: Round-robin assignment for unassigned partitions which do 
not have members already assigned in the current assignment.
         Set<TopicIdPartition> assignedPartitions = new 
LinkedHashSet<>(newAssignment.keySet());
@@ -213,16 +241,11 @@ public class SimpleAssignor implements 
ShareGroupPartitionAssignor {
             
roundRobinAssignment(topicToMemberSubscription.get(unassignedTopic), 
unassignedPartitions.get(unassignedTopic), newAssignment));
 
         // Step 3: We combine current assignment and new assignment.
-        Map<String, Set<TopicIdPartition>> finalAssignment = new HashMap<>();
-        // As per the KIP, we should revoke the assignments from current 
assignment for partitions that were assigned by step 1
-        // in the new assignment and have members in current assignment by 
step 2. But we haven't implemented it to avoid the
-        // complexity in both the implementation and the run time complexity. 
This step was mentioned in the KIP to reduce
-        // the burden of certain members of the share groups. This can be 
achieved with the help of limiting the max
-        // no. of partitions assignment for every member(KAFKA-18788). Hence, 
the potential problem of burdening
-        // the share consumers will be addressed in a future PR.
+        Map<String, Set<TopicIdPartition>> finalAssignment = 
newHashMap(numGroupMembers);
 
         newAssignment.forEach((targetPartition, members) -> 
members.forEach(member ->
             finalAssignment.computeIfAbsent(member, k -> new 
HashSet<>()).add(targetPartition)));
+
         // When combining current assignment, we need to only consider the 
member topic subscription in current assignment
         // which is being subscribed in the new assignment as well.
         currentAssignment.forEach((topicIdPartition, members) -> 
members.forEach(member -> {
@@ -234,67 +257,94 @@ public class SimpleAssignor implements 
ShareGroupPartitionAssignor {
         return groupAssignment(finalAssignment, groupSpec.memberIds());
     }
 
-    private GroupAssignment groupAssignment(
-        Map<String, Set<TopicIdPartition>> assignmentByMember,
-        Collection<String> allGroupMembers
-    ) {
-        Map<String, MemberAssignment> members = new HashMap<>();
-        for (Map.Entry<String, Set<TopicIdPartition>> entry : 
assignmentByMember.entrySet()) {
-            Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>();
-            entry.getValue().forEach(targetPartition -> 
targetPartitions.computeIfAbsent(targetPartition.topicId(), k -> new 
HashSet<>()).add(targetPartition.partitionId()));
-            members.put(entry.getKey(), new 
MemberAssignmentImpl(targetPartitions));
-        }
-        allGroupMembers.forEach(member -> {
-            if (!members.containsKey(member))
-                members.put(member, new MemberAssignmentImpl(new HashMap<>()));
-        });
-
-        return new GroupAssignment(members);
-    }
-
     /**
      * This function updates assignment by hashing the member IDs of the 
members and maps the partitions assigned to the
-     * members based on the hash. This gives approximately even balance.
-     * @param unassignedPartitions - the subscribed topic partitions which 
needs assignment.
-     * @param memberIds - the member ids to which the topic partitions need to 
be assigned.
-     * @param assignment - the existing assignment by topic partition. We need 
to pass it as a parameter because this
-     *                   function would be called multiple times for 
heterogeneous assignment.
+     * members based on the hash, one partition per member. This gives 
approximately even balance.
+     * @param memberIds           The member ids to which the topic partitions 
need to be assigned.
+     * @param partitionsToAssign  The subscribed topic partitions which needs 
assignment.
+     * @param assignment          The existing assignment by topic partition. 
We need to pass it as a parameter because this
+     *                            method can be called multiple times for 
heterogeneous assignment.
      */
     // Visible for testing
     void memberHashAssignment(
-        List<TopicIdPartition> unassignedPartitions,
         Collection<String> memberIds,
+        List<TopicIdPartition> partitionsToAssign,
         Map<TopicIdPartition, List<String>> assignment
     ) {
-        if (!unassignedPartitions.isEmpty())
+        if (!partitionsToAssign.isEmpty()) {
             for (String memberId : memberIds) {
-                int topicPartitionIndex = Math.abs(memberId.hashCode() % 
unassignedPartitions.size());
-                TopicIdPartition topicPartition = 
unassignedPartitions.get(topicPartitionIndex);
+                int topicPartitionIndex = Math.abs(memberId.hashCode() % 
partitionsToAssign.size());
+                TopicIdPartition topicPartition = 
partitionsToAssign.get(topicPartitionIndex);
                 assignment.computeIfAbsent(topicPartition, k -> new 
ArrayList<>()).add(memberId);
             }
+        }
     }
 
     /**
-     * This functions assigns topic partitions to members by round-robin 
approach and updates the existing assignment.
-     * @param memberIds - the member ids to which the topic partitions need to 
be assigned, should be non-empty.
-     * @param unassignedPartitions - the subscribed topic partitions which 
needs assignment.
-     * @param assignment - the existing assignment by topic partition.
+     * This functions assigns topic partitions to members by a round-robin 
approach and updates the existing assignment.
+     * @param memberIds              The member ids to which the topic 
partitions need to be assigned, should be non-empty.
+     * @param partitionsToAssign     The subscribed topic partitions which 
needs assignment.
+     * @param assignment             The existing assignment by topic 
partition. We need to pass it as a parameter because this
+     *                               method can be called multiple times for 
heterogeneous assignment.
      */
     // Visible for testing
     void roundRobinAssignment(
         Collection<String> memberIds,
-        List<TopicIdPartition> unassignedPartitions,
+        List<TopicIdPartition> partitionsToAssign,
         Map<TopicIdPartition, List<String>> assignment
     ) {
         // We iterate through the target partitions and assign a memberId to 
them. In case we run out of members (members < targetPartitions),
         // we again start from the starting index of memberIds.
         Iterator<String> memberIdIterator = memberIds.iterator();
-        for (TopicIdPartition targetPartition : unassignedPartitions) {
+        for (TopicIdPartition topicPartition : partitionsToAssign) {
             if (!memberIdIterator.hasNext()) {
                 memberIdIterator = memberIds.iterator();
             }
             String memberId = memberIdIterator.next();
-            assignment.computeIfAbsent(targetPartition, k -> new 
ArrayList<>()).add(memberId);
+            assignment.computeIfAbsent(topicPartition, k -> new 
ArrayList<>()).add(memberId);
+        }
+    }
+
+    /**
+     * This functions assigns topic partitions to members by a round-robin 
approach and updates the existing assignment.
+     * @param memberIds              The member ids to which the topic 
partitions need to be assigned, should be non-empty.
+     * @param partitionsToAssign     The subscribed topic partitions which 
need assignment.
+     * @param assignment             The existing assignment by topic 
partition. We need to pass it as a parameter because this
+     *                               method can be called multiple times for 
heterogeneous assignment.
+     * @param desiredAssignmentCount The number of partitions which can be 
assigned to each member to give even balance.
+     *                               Note that this number can be exceeded by 
one to allow for situations
+     *                               in which we have hashing collisions.
+     */
+    void roundRobinAssignmentWithCount(
+        Collection<String> memberIds,
+        List<TopicIdPartition> partitionsToAssign,
+        Map<String, Set<TopicIdPartition>> assignment,
+        int desiredAssignmentCount
+    ) {
+        Collection<String> memberIdsCopy = new LinkedHashSet<>(memberIds);
+
+        // We iterate through the target partitions which are not in the 
assignment and assign a memberId to them.
+        // In case we run out of members (memberIds < partitionsToAssign), we 
again start from the starting index of memberIds.
+        Iterator<String> memberIdIterator = memberIdsCopy.iterator();
+        ListIterator<TopicIdPartition> partitionListIterator = 
partitionsToAssign.listIterator();
+        while (partitionListIterator.hasNext()) {
+            TopicIdPartition partition = partitionListIterator.next();
+            if (!memberIdIterator.hasNext()) {
+                memberIdIterator = memberIdsCopy.iterator();
+                if (memberIdsCopy.isEmpty()) {
+                    // This should never happen, but guarding against an 
infinite loop
+                    throw new PartitionAssignorException("Inconsistent number 
of member IDs");
+                }
+            }
+            String memberId = memberIdIterator.next();
+            Set<TopicIdPartition> memberPartitions = 
assignment.computeIfAbsent(memberId, k -> new HashSet<>());
+            // We are prepared to add one more partition, even if the desired 
assignment count is already reached.
+            if (memberPartitions.size() <= desiredAssignmentCount) {
+                memberPartitions.add(partition);
+            } else {
+                memberIdIterator.remove();
+                partitionListIterator.previous();
+            }
         }
     }
 
@@ -318,4 +368,27 @@ public class SimpleAssignor implements 
ShareGroupPartitionAssignor {
         });
         return targetPartitions;
     }
+
+    private GroupAssignment groupAssignment(
+        Map<String, Set<TopicIdPartition>> assignmentByMember,
+        Collection<String> allGroupMembers
+    ) {
+        Map<String, MemberAssignment> members = new HashMap<>();
+        for (Map.Entry<String, Set<TopicIdPartition>> entry : 
assignmentByMember.entrySet()) {
+            Map<Uuid, Set<Integer>> targetPartitions = new HashMap<>();
+            entry.getValue().forEach(targetPartition ->
+                targetPartitions.computeIfAbsent(targetPartition.topicId(), k 
-> new HashSet<>()).add(targetPartition.partitionId()));
+            members.put(entry.getKey(), new 
MemberAssignmentImpl(targetPartitions));
+        }
+        allGroupMembers.forEach(member -> {
+            if (!members.containsKey(member))
+                members.put(member, new MemberAssignmentImpl(new HashMap<>()));
+        });
+
+        return new GroupAssignment(members);
+    }
+
+    private static <K, V> HashMap<K, V> newHashMap(int numMappings) {
+        return new HashMap<>((int) (((numMappings + 1) / 0.75f) + 1));
+    }
 }
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
index a0b86d6fa13..8be6abbdd43 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
@@ -391,7 +391,7 @@ public class SimpleAssignorTest {
         List<TopicIdPartition> partitions = List.of(partition1, partition2, 
partition3);
 
         Map<TopicIdPartition, List<String>> computedAssignment = new 
HashMap<>();
-        assignor.memberHashAssignment(partitions, members, computedAssignment);
+        assignor.memberHashAssignment(members, partitions, computedAssignment);
 
         Map<TopicIdPartition, List<String>> expectedAssignment = new 
HashMap<>();
         expectedAssignment.put(partition1, List.of(member3));
@@ -425,6 +425,51 @@ public class SimpleAssignorTest {
         assertAssignment(expectedAssignment, assignment);
     }
 
+    @Test
+    public void testRoundRobinAssignmentWithCount() {
+        String member1 = "member1";
+        String member2 = "member2";
+        List<String> members = List.of(member1, member2);
+        TopicIdPartition partition1 = new TopicIdPartition(TOPIC_1_UUID, 0);
+        TopicIdPartition partition2 = new TopicIdPartition(TOPIC_2_UUID, 0);
+        TopicIdPartition partition3 = new TopicIdPartition(TOPIC_3_UUID, 0);
+        TopicIdPartition partition4 = new TopicIdPartition(TOPIC_4_UUID, 0);
+        List<TopicIdPartition> unassignedPartitions = List.of(partition2, 
partition3, partition4);
+
+        Map<String, Set<TopicIdPartition>> assignment = new HashMap<>();
+        assignment.put(member1, new HashSet<>(Set.of(partition1)));
+        assignment.put(member2, new HashSet<>(Set.of(partition1)));
+
+        assignor.roundRobinAssignmentWithCount(members, unassignedPartitions, 
assignment, 2);
+        Map<String, Set<TopicIdPartition>> expectedAssignment = Map.of(
+            member1, Set.of(partition1, partition2, partition4),
+            member2, Set.of(partition1, partition3)
+        );
+
+        assertFinalAssignment(expectedAssignment, assignment);
+    }
+
+    @Test
+    public void testRoundRobinAssignmentWithCountTooManyPartitions() {
+        String member1 = "member1";
+        String member2 = "member2";
+        List<String> members = List.of(member1, member2);
+        TopicIdPartition partition1 = new TopicIdPartition(TOPIC_1_UUID, 0);
+        TopicIdPartition partition2 = new TopicIdPartition(TOPIC_2_UUID, 0);
+        TopicIdPartition partition3 = new TopicIdPartition(TOPIC_3_UUID, 0);
+        TopicIdPartition partition4 = new TopicIdPartition(TOPIC_4_UUID, 0);
+        TopicIdPartition partition5 = new TopicIdPartition(TOPIC_4_UUID, 1);
+        TopicIdPartition partition6 = new TopicIdPartition(TOPIC_4_UUID, 2);
+        List<TopicIdPartition> unassignedPartitions = List.of(partition2, 
partition3, partition4, partition5, partition6);
+
+        Map<String, Set<TopicIdPartition>> assignment = new HashMap<>();
+        assignment.put(member1, new HashSet<>(Set.of(partition1)));
+        assignment.put(member2, new HashSet<>(Set.of(partition1)));
+
+        assertThrows(PartitionAssignorException.class,
+            () -> assignor.roundRobinAssignmentWithCount(members, 
unassignedPartitions, assignment, 2));
+    }
+
     @Test
     public void testAssignWithCurrentAssignmentHomogeneous() {
         // Current assignment setup - Two members A, B subscribing to T1 and 
T2.
@@ -758,6 +803,18 @@ public class SimpleAssignorTest {
         });
     }
 
+    private void assertFinalAssignment(
+        Map<String, Set<TopicIdPartition>> expectedAssignment,
+        Map<String, Set<TopicIdPartition>> computedAssignment
+    ) {
+        assertEquals(expectedAssignment.size(), computedAssignment.size());
+        expectedAssignment.forEach((memberId, partitions) -> {
+            Set<TopicIdPartition> computedPartitions = 
computedAssignment.getOrDefault(memberId, Set.of());
+            assertEquals(partitions.size(), computedPartitions.size());
+            partitions.forEach(member -> 
assertTrue(computedPartitions.contains(member)));
+        });
+    }
+
     private void assertEveryPartitionGetsAssignment(
         int expectedPartitions,
         GroupAssignment computedGroupAssignment
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
index 6f9f56af077..48756b90a1b 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommand.java
@@ -531,8 +531,8 @@ public class ConsumerGroupCommand {
                 return topicPartitions
                     .stream()
                     .map(TopicPartition::partition)
-                    .map(Object::toString)
                     .sorted()
+                    .map(Object::toString)
                     .collect(Collectors.joining(",", topicName + ":", ""));
             }).sorted().collect(Collectors.joining(";"));
         }
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
index 246a5eecbf1..075d0a5b282 100644
--- 
a/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
+++ 
b/tools/src/main/java/org/apache/kafka/tools/consumer/group/ShareGroupCommand.java
@@ -431,16 +431,18 @@ public class ShareGroupCommand {
                     }
 
                     if (verbose) {
-                        String fmt = "\n%" + -groupLen + "s %" + 
-maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %-13s 
%s";
-                        System.out.printf(fmt, "GROUP", "CONSUMER-ID", "HOST", 
"CLIENT-ID", "MEMBER-EPOCH", "ASSIGNMENT");
+                        String fmt = "\n%" + -groupLen + "s %" + 
-maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %-12s 
%-13s %s";
+                        System.out.printf(fmt, "GROUP", "CONSUMER-ID", "HOST", 
"CLIENT-ID", "#PARTITIONS", "MEMBER-EPOCH", "ASSIGNMENT");
                         for (ShareMemberDescription member : members) {
-                            System.out.printf(fmt, groupId, 
member.consumerId(), member.host(), member.clientId(), member.memberEpoch(), 
getAssignmentString(member.assignment()));
+                            System.out.printf(fmt, groupId, 
member.consumerId(), member.host(), member.clientId(),
+                                member.assignment().topicPartitions().size(), 
member.memberEpoch(), getAssignmentString(member.assignment()));
                         }
                     } else {
-                        String fmt = "\n%" + -groupLen + "s %" + 
-maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %s";
-                        System.out.printf(fmt, "GROUP", "CONSUMER-ID", "HOST", 
"CLIENT-ID", "ASSIGNMENT");
+                        String fmt = "\n%" + -groupLen + "s %" + 
-maxConsumerIdLen + "s %" + -maxHostLen + "s %" + -maxClientIdLen + "s %-12s 
%s";
+                        System.out.printf(fmt, "GROUP", "CONSUMER-ID", "HOST", 
"CLIENT-ID", "#PARTITIONS", "ASSIGNMENT");
                         for (ShareMemberDescription member : members) {
-                            System.out.printf(fmt, groupId, 
member.consumerId(), member.host(), member.clientId(), 
getAssignmentString(member.assignment()));
+                            System.out.printf(fmt, groupId, 
member.consumerId(), member.host(), member.clientId(),
+                                member.assignment().topicPartitions().size(), 
getAssignmentString(member.assignment()));
                         }
                     }
                     System.out.println();
@@ -461,8 +463,8 @@ public class ShareGroupCommand {
                 return topicPartitions
                     .stream()
                     .map(TopicPartition::partition)
-                    .map(Object::toString)
                     .sorted()
+                    .map(Object::toString)
                     .collect(Collectors.joining(",", topicName + ":", ""));
             }).sorted().collect(Collectors.joining(";"));
         }
diff --git 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
index 46103e10f43..c3690d953ab 100644
--- 
a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
+++ 
b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ShareGroupCommandTest.java
@@ -475,10 +475,10 @@ public class ShareGroupCommandTest {
 
                     List<String> expectedValues1;
                     if (describeType.contains("--verbose")) {
-                        expectedValues1 = List.of(firstGroup, "memid1", 
"host1", "clId1", "0", "topic1:0,1;topic2:0");
+                        expectedValues1 = List.of(firstGroup, "memid1", 
"host1", "clId1", "3", "0", "topic1:0,1;topic2:0");
 
                     } else {
-                        expectedValues1 = List.of(firstGroup, "memid1", 
"host1", "clId1", "topic1:0,1;topic2:0");
+                        expectedValues1 = List.of(firstGroup, "memid1", 
"host1", "clId1", "3", "topic1:0,1;topic2:0");
                     }
                     return checkArgsHeaderOutput(cgcArgs, lines[0]) &&
                         
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1);
@@ -531,12 +531,12 @@ public class ShareGroupCommandTest {
                     List<String> expectedValues1;
                     List<String> expectedValues2;
                     if (describeType.contains("--verbose")) {
-                        expectedValues1 = List.of(firstGroup, "memid1", 
"host1", "clId1", "0", "topic1:0,1;topic2:0");
-                        expectedValues2 = List.of(secondGroup, "memid1", 
"host1", "clId1", "0", "topic1:0");
+                        expectedValues1 = List.of(firstGroup, "memid1", 
"host1", "clId1", "3", "0", "topic1:0,1;topic2:0");
+                        expectedValues2 = List.of(secondGroup, "memid1", 
"host1", "clId1", "1", "0", "topic1:0");
 
                     } else {
-                        expectedValues1 = List.of(firstGroup, "memid1", 
"host1", "clId1", "topic1:0,1;topic2:0");
-                        expectedValues2 = List.of(secondGroup, "memid1", 
"host1", "clId1", "topic1:0");
+                        expectedValues1 = List.of(firstGroup, "memid1", 
"host1", "clId1", "3", "topic1:0,1;topic2:0");
+                        expectedValues2 = List.of(secondGroup, "memid1", 
"host1", "clId1", "1", "topic1:0");
                     }
                     return checkArgsHeaderOutput(cgcArgs, lines[0]) && 
checkArgsHeaderOutput(cgcArgs, lines[3]) &&
                         
Arrays.stream(lines[1].trim().split("\\s+")).toList().equals(expectedValues1) &&
@@ -899,8 +899,8 @@ public class ShareGroupCommandTest {
 
     private boolean checkMembersArgsHeaderOutput(String output, boolean 
verbose) {
         List<String> expectedKeys = verbose ?
-            List.of("GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", 
"MEMBER-EPOCH", "ASSIGNMENT") :
-            List.of("GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", "ASSIGNMENT");
+            List.of("GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", 
"#PARTITIONS", "MEMBER-EPOCH", "ASSIGNMENT") :
+            List.of("GROUP", "CONSUMER-ID", "HOST", "CLIENT-ID", 
"#PARTITIONS", "ASSIGNMENT");
         return 
Arrays.stream(output.trim().split("\\s+")).toList().equals(expectedKeys);
     }
 


Reply via email to