This is an automated email from the ASF dual-hosted git repository.

schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4690527fabf KAFKA-19362: Finalize homogeneous simple share assignor 
(#19977)
4690527fabf is described below

commit 4690527fabfebbbde0fd99501f072b453ef0f962
Author: Andrew Schofield <[email protected]>
AuthorDate: Fri Jun 20 16:10:47 2025 +0100

    KAFKA-19362: Finalize homogeneous simple share assignor (#19977)
    
    Finalise the share group SimpleAssignor for homogeneous subscriptions.
    The assignor code is much more accurate about the number of partitions
    assigned to each member, and the number of members assigned for each
    partition. It eliminates the idea of hash-based assignment because that
    has been shown to the unhelpful. The revised code is very much more
    effective at assigning evenly as the number of members grows and shrinks
    over time.
    
    A future PR will address the code for heterogeneous subscriptions.
    
    Reviewers: Apoorv Mittal <[email protected]>
---
 .../group/assignor/AssignorHelpers.java            |  56 +++
 .../coordinator/group/assignor/SimpleAssignor.java | 222 +++---------
 .../SimpleHomogeneousAssignmentBuilder.java        | 385 +++++++++++++++++++++
 .../group/assignor/SimpleAssignorTest.java         | 331 +++++-------------
 .../jmh/assignor/ShareGroupAssignorBenchmark.java  |   2 +-
 5 files changed, 573 insertions(+), 423 deletions(-)

diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignorHelpers.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignorHelpers.java
index 2b30b0b3ae9..5fbe65067b2 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignorHelpers.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignorHelpers.java
@@ -17,10 +17,16 @@
 package org.apache.kafka.coordinator.group.assignor;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
+import 
org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
+import 
org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.server.common.TopicIdPartition;
 
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -51,4 +57,54 @@ public final class AssignorHelpers {
         }
         return copy;
     }
+
+    /**
+     * Computes the list of target partitions which can be assigned to 
members. This list includes all partitions
+     * for the subscribed topic IDs, with the additional check that they must 
be assignable.
+     * @param groupSpec                 The assignment spec which includes 
member metadata.
+     * @param subscribedTopicIds        The set of subscribed topic IDs.
+     * @param subscribedTopicDescriber  The topic and partition metadata 
describer.
+     * @return The list of target partitions.
+     */
+    static List<TopicIdPartition> computeTargetPartitions(
+        GroupSpec groupSpec,
+        Set<Uuid> subscribedTopicIds,
+        SubscribedTopicDescriber subscribedTopicDescriber
+    ) {
+        List<TopicIdPartition> targetPartitions = new ArrayList<>();
+        subscribedTopicIds.forEach(topicId -> {
+            int numPartitions = 
subscribedTopicDescriber.numPartitions(topicId);
+            if (numPartitions == -1) {
+                throw new PartitionAssignorException(
+                    "Members are subscribed to topic " + topicId + " which 
doesn't exist in the topic metadata."
+                );
+            }
+
+            for (int partition = 0; partition < numPartitions; partition++) {
+                if (groupSpec.isPartitionAssignable(topicId, partition)) {
+                    targetPartitions.add(new TopicIdPartition(topicId, 
partition));
+                }
+            }
+        });
+
+        return targetPartitions;
+    }
+
+    /**
+     * Constructs a HashNap with a known capacity. This is equivalent to 
HashMap.newHashMap which is introduced in Java 19.
+     * @param numMappings The expected number of mappings.
+     * @return The newly created map.
+     */
+    static <K, V> HashMap<K, V> newHashMap(int numMappings) {
+        return new HashMap<>((int) (((numMappings + 1) / 0.75f) + 1));
+    }
+
+    /**
+     * Constructs a HashSet with a known capacity. This is equivalent to 
HashSet.newHashSet which is introduced in Java 19.
+     * @param numElements The expected number of elements.
+     * @return The newly created set.
+     */
+    static <K> HashSet<K> newHashSet(int numElements) {
+        return new HashSet<>((int) (((numElements + 1) / 0.75f) + 1));
+    }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
index da04257edf6..8238be7c58e 100644
--- 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
@@ -27,6 +27,9 @@ import 
org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
 import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
 import org.apache.kafka.server.common.TopicIdPartition;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
@@ -34,7 +37,6 @@ import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
-import java.util.ListIterator;
 import java.util.Map;
 import java.util.Set;
 
@@ -42,7 +44,7 @@ import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.H
 
 /**
  * A simple partition assignor for share groups that assigns partitions of the 
subscribed topics
- * based on the rules defined in KIP-932 to different members. It is not 
rack-aware.
+ * to different members based on the rules defined in KIP-932. It is not 
rack-aware.
  * <p>
  * Assignments are done according to the following principles:
  * <ol>
@@ -56,51 +58,39 @@ import static 
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.H
  * Balance is prioritized above stickiness.
  */
 public class SimpleAssignor implements ShareGroupPartitionAssignor {
-
+    private static final Logger log = 
LoggerFactory.getLogger(SimpleAssignor.class);
     private static final String SIMPLE_ASSIGNOR_NAME = "simple";
 
+    /**
+     * Unique name for this assignor.
+     */
     @Override
     public String name() {
         return SIMPLE_ASSIGNOR_NAME;
     }
 
+    /**
+     * Assigns partitions to group members based on the given assignment 
specification and topic metadata.
+     *
+     * @param groupSpec                The assignment spec which includes 
member metadata.
+     * @param subscribedTopicDescriber The topic and partition metadata 
describer.
+     * @return The new assignment for the group.
+     */
     @Override
-    public GroupAssignment assign(
-        GroupSpec groupSpec,
-        SubscribedTopicDescriber subscribedTopicDescriber
-    ) throws PartitionAssignorException {
+    public GroupAssignment assign(GroupSpec groupSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) throws 
PartitionAssignorException {
         if (groupSpec.memberIds().isEmpty())
             return new GroupAssignment(Map.of());
 
         if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
-            return assignHomogeneous(groupSpec, subscribedTopicDescriber);
+            log.debug("Detected that all members are subscribed to the same 
set of topics, invoking the homogeneous assignment algorithm");
+            return new SimpleHomogeneousAssignmentBuilder(groupSpec, 
subscribedTopicDescriber).build();
         } else {
+            log.debug("Detected that the members are subscribed to different 
sets of topics, invoking the heterogeneous assignment algorithm");
             return assignHeterogeneous(groupSpec, subscribedTopicDescriber);
         }
     }
 
-    private GroupAssignment assignHomogeneous(
-        GroupSpec groupSpec,
-        SubscribedTopicDescriber subscribedTopicDescriber
-    ) {
-        Set<Uuid> subscribedTopicIds = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
-            .subscribedTopicIds();
-        if (subscribedTopicIds.isEmpty())
-            return new GroupAssignment(Map.of());
-
-        // Subscribed topic partitions for the share group.
-        List<TopicIdPartition> targetPartitions = computeTargetPartitions(
-            groupSpec, subscribedTopicIds, subscribedTopicDescriber);
-
-        // The current assignment from topic partition to members.
-        Map<TopicIdPartition, List<String>> currentAssignment = 
currentAssignment(groupSpec);
-        return newAssignmentHomogeneous(groupSpec, subscribedTopicIds, 
targetPartitions, currentAssignment);
-    }
-
-    private GroupAssignment assignHeterogeneous(
-        GroupSpec groupSpec,
-        SubscribedTopicDescriber subscribedTopicDescriber
-    ) {
+    private GroupAssignment assignHeterogeneous(GroupSpec groupSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
         Map<String, List<TopicIdPartition>> memberToPartitionsSubscription = 
new HashMap<>();
         for (String memberId : groupSpec.memberIds()) {
             MemberSubscription spec = groupSpec.memberSubscription(memberId);
@@ -108,22 +98,23 @@ public class SimpleAssignor implements 
ShareGroupPartitionAssignor {
                 continue;
 
             // Subscribed topic partitions for the share group member.
-            List<TopicIdPartition> targetPartitions = computeTargetPartitions(
-                groupSpec, spec.subscribedTopicIds(), 
subscribedTopicDescriber);
+            List<TopicIdPartition> targetPartitions = 
AssignorHelpers.computeTargetPartitions(groupSpec, spec.subscribedTopicIds(), 
subscribedTopicDescriber);
             memberToPartitionsSubscription.put(memberId, targetPartitions);
         }
 
         // The current assignment from topic partition to members.
         Map<TopicIdPartition, List<String>> currentAssignment = 
currentAssignment(groupSpec);
+
         return newAssignmentHeterogeneous(groupSpec, 
memberToPartitionsSubscription, currentAssignment);
     }
 
     /**
      * Get the current assignment by topic partitions.
+     *
      * @param groupSpec The group metadata specifications.
      * @return the current assignment for subscribed topic partitions to 
memberIds.
      */
-    private Map<TopicIdPartition, List<String>> currentAssignment(GroupSpec 
groupSpec) {
+    static Map<TopicIdPartition, List<String>> currentAssignment(GroupSpec 
groupSpec) {
         Map<TopicIdPartition, List<String>> assignment = new HashMap<>();
 
         for (String member : groupSpec.memberIds()) {
@@ -131,80 +122,16 @@ public class SimpleAssignor implements 
ShareGroupPartitionAssignor {
             assignedTopicPartitions.forEach((topicId, partitions) -> 
partitions.forEach(
                 partition -> assignment.computeIfAbsent(new 
TopicIdPartition(topicId, partition), k -> new ArrayList<>()).add(member)));
         }
-        return assignment;
-    }
-
-    /**
-     * This function computes the new assignment for a homogeneous group.
-     * @param groupSpec           The group metadata specifications.
-     * @param subscribedTopicIds  The set of all the subscribed topic ids for 
the group.
-     * @param targetPartitions    The list of all topic partitions that need 
assignment.
-     * @param currentAssignment   The current assignment for subscribed topic 
partitions to memberIds.
-     * @return the new partition assignment for the members of the group.
-     */
-    private GroupAssignment newAssignmentHomogeneous(
-        GroupSpec groupSpec,
-        Set<Uuid> subscribedTopicIds,
-        List<TopicIdPartition> targetPartitions,
-        Map<TopicIdPartition, List<String>> currentAssignment
-    ) {
-        // For entirely balanced assignment, we would expect 
(numTargetPartitions / numGroupMembers) partitions per member, rounded upwards.
-        // That can be expressed as         Math.ceil(numTargetPartitions / 
(double) numGroupMembers)
-        // Using integer arithmetic, as     (numTargetPartitions + 
numGroupMembers - 1) / numGroupMembers
-        int numGroupMembers = groupSpec.memberIds().size();
-        int numTargetPartitions = targetPartitions.size();
-        int desiredAssignmentCount = (numTargetPartitions + numGroupMembers - 
1) / numGroupMembers;
 
-        Map<TopicIdPartition, List<String>> newAssignment = 
newHashMap(numTargetPartitions);
-
-        // Hash member IDs to topic partitions. Each member will be assigned 
one partition, but some partitions
-        // might have been assigned to more than one member.
-        memberHashAssignment(groupSpec.memberIds(), targetPartitions, 
newAssignment);
-
-        // Combine current and new hashed assignments, sized to accommodate 
the expected number of mappings.
-        Map<String, Set<TopicIdPartition>> finalAssignment = 
newHashMap(numGroupMembers);
-        Map<TopicIdPartition, Set<String>> finalAssignmentByPartition = 
newHashMap(numTargetPartitions);
-
-        // First, take the members assigned by hashing.
-        newAssignment.forEach((targetPartition, members) -> 
members.forEach(member -> {
-            finalAssignment.computeIfAbsent(member, k -> new 
HashSet<>()).add(targetPartition);
-            finalAssignmentByPartition.computeIfAbsent(targetPartition, k -> 
new HashSet<>()).add(member);
-        }));
-
-        // Then, take the members from the current assignment, making sure 
that no member has too many assigned partitions.
-        // When combining current assignment, we need to only consider the 
topics in current assignment that are also being
-        // subscribed in the new assignment as well.
-        currentAssignment.forEach((targetPartition, members) -> {
-            if (subscribedTopicIds.contains(targetPartition.topicId())) {
-                members.forEach(member -> {
-                    if (groupSpec.memberIds().contains(member)) {
-                        Set<TopicIdPartition> memberPartitions = 
finalAssignment.computeIfAbsent(member, k -> new HashSet<>());
-                        if ((memberPartitions.size() < desiredAssignmentCount) 
&& !newAssignment.containsKey(targetPartition)) {
-                            memberPartitions.add(targetPartition);
-                            
finalAssignmentByPartition.computeIfAbsent(targetPartition, k -> new 
HashSet<>()).add(member);
-                        }
-                    }
-                });
-            }
-        });
-
-        // Finally, round-robin assignment for unassigned partitions which do 
not already have members assigned.
-        // The order of steps differs slightly from KIP-932 because the 
desired assignment count has been taken into
-        // account when copying partitions across from the current assignment, 
and this is more convenient.
-        List<TopicIdPartition> unassignedPartitions = targetPartitions.stream()
-            .filter(targetPartition -> 
!finalAssignmentByPartition.containsKey(targetPartition))
-            .toList();
-
-        roundRobinAssignmentWithCount(groupSpec.memberIds(), 
unassignedPartitions, finalAssignment, desiredAssignmentCount);
-
-        return groupAssignment(finalAssignment, groupSpec.memberIds());
+        return assignment;
     }
 
     /**
      * This function computes the new assignment for a heterogeneous group.
-     * @param groupSpec                       The group metadata 
specifications.
-     * @param memberToPartitionsSubscription  The member to subscribed topic 
partitions map.
-     * @param currentAssignment               The current assignment for 
subscribed topic partitions to memberIds.
+     *
+     * @param groupSpec                      The group metadata specifications.
+     * @param memberToPartitionsSubscription The member to subscribed topic 
partitions map.
+     * @param currentAssignment              The current assignment for 
subscribed topic partitions to memberIds.
      * @return the new partition assignment for the members of the group.
      */
     private GroupAssignment newAssignmentHeterogeneous(
@@ -241,7 +168,7 @@ public class SimpleAssignor implements 
ShareGroupPartitionAssignor {
             
roundRobinAssignment(topicToMemberSubscription.get(unassignedTopic), 
unassignedPartitions.get(unassignedTopic), newAssignment));
 
         // Step 3: We combine current assignment and new assignment.
-        Map<String, Set<TopicIdPartition>> finalAssignment = 
newHashMap(numGroupMembers);
+        Map<String, Set<TopicIdPartition>> finalAssignment = 
AssignorHelpers.newHashMap(numGroupMembers);
 
         newAssignment.forEach((targetPartition, members) -> 
members.forEach(member ->
             finalAssignment.computeIfAbsent(member, k -> new 
HashSet<>()).add(targetPartition)));
@@ -260,10 +187,11 @@ public class SimpleAssignor implements 
ShareGroupPartitionAssignor {
     /**
      * This function updates assignment by hashing the member IDs of the 
members and maps the partitions assigned to the
      * members based on the hash, one partition per member. This gives 
approximately even balance.
-     * @param memberIds           The member ids to which the topic partitions 
need to be assigned.
-     * @param partitionsToAssign  The subscribed topic partitions which needs 
assignment.
-     * @param assignment          The existing assignment by topic partition. 
We need to pass it as a parameter because this
-     *                            method can be called multiple times for 
heterogeneous assignment.
+     *
+     * @param memberIds          The member ids to which the topic partitions 
need to be assigned.
+     * @param partitionsToAssign The subscribed topic partitions which needs 
assignment.
+     * @param assignment         The existing assignment by topic partition. 
We need to pass it as a parameter because this
+     *                           method can be called multiple times for 
heterogeneous assignment.
      */
     // Visible for testing
     void memberHashAssignment(
@@ -282,10 +210,11 @@ public class SimpleAssignor implements 
ShareGroupPartitionAssignor {
 
     /**
      * This functions assigns topic partitions to members by a round-robin 
approach and updates the existing assignment.
-     * @param memberIds                 The member ids to which the topic 
partitions need to be assigned, should be non-empty.
-     * @param partitionsToAssign        The subscribed topic partitions which 
needs assignment.
-     * @param assignment                The existing assignment by topic 
partition. We need to pass it as a parameter because this
-     *                                  method can be called multiple times 
for heterogeneous assignment.
+     *
+     * @param memberIds          The member ids to which the topic partitions 
need to be assigned, should be non-empty.
+     * @param partitionsToAssign The subscribed topic partitions which needs 
assignment.
+     * @param assignment         The existing assignment by topic partition. 
We need to pass it as a parameter because this
+     *                           method can be called multiple times for 
heterogeneous assignment.
      */
     // Visible for testing
     void roundRobinAssignment(
@@ -305,73 +234,6 @@ public class SimpleAssignor implements 
ShareGroupPartitionAssignor {
         }
     }
 
-    /**
-     * This functions assigns topic partitions to members by a round-robin 
approach and updates the existing assignment.
-     * @param memberIds                 The member ids to which the topic 
partitions need to be assigned, should be non-empty.
-     * @param partitionsToAssign        The subscribed topic partitions which 
need assignment.
-     * @param assignment                The existing assignment by topic 
partition. We need to pass it as a parameter because this
-     *                                  method can be called multiple times 
for heterogeneous assignment.
-     * @param desiredAssignmentCount    The number of partitions which can be 
assigned to each member to give even balance.
-     *                                  Note that this number can be exceeded 
by one to allow for situations
-     *                                  in which we have hashing collisions.
-     */
-    void roundRobinAssignmentWithCount(
-        Collection<String> memberIds,
-        List<TopicIdPartition> partitionsToAssign,
-        Map<String, Set<TopicIdPartition>> assignment,
-        int desiredAssignmentCount
-    ) {
-        Collection<String> memberIdsCopy = new LinkedHashSet<>(memberIds);
-
-        // We iterate through the target partitions which are not in the 
assignment and assign a memberId to them.
-        // In case we run out of members (memberIds < partitionsToAssign), we 
again start from the starting index of memberIds.
-        Iterator<String> memberIdIterator = memberIdsCopy.iterator();
-        ListIterator<TopicIdPartition> partitionListIterator = 
partitionsToAssign.listIterator();
-        while (partitionListIterator.hasNext()) {
-            TopicIdPartition partition = partitionListIterator.next();
-            if (!memberIdIterator.hasNext()) {
-                memberIdIterator = memberIdsCopy.iterator();
-                if (memberIdsCopy.isEmpty()) {
-                    // This should never happen, but guarding against an 
infinite loop
-                    throw new PartitionAssignorException("Inconsistent number 
of member IDs");
-                }
-            }
-            String memberId = memberIdIterator.next();
-            Set<TopicIdPartition> memberPartitions = 
assignment.computeIfAbsent(memberId, k -> new HashSet<>());
-            // We are prepared to add one more partition, even if the desired 
assignment count is already reached.
-            if (memberPartitions.size() <= desiredAssignmentCount) {
-                memberPartitions.add(partition);
-            } else {
-                memberIdIterator.remove();
-                partitionListIterator.previous();
-            }
-        }
-    }
-
-    private List<TopicIdPartition> computeTargetPartitions(
-        GroupSpec groupSpec,
-        Set<Uuid> subscribedTopicIds,
-        SubscribedTopicDescriber subscribedTopicDescriber
-    ) {
-        List<TopicIdPartition> targetPartitions = new ArrayList<>();
-        subscribedTopicIds.forEach(topicId -> {
-            int numPartitions = 
subscribedTopicDescriber.numPartitions(topicId);
-            if (numPartitions == -1) {
-                throw new PartitionAssignorException(
-                    "Members are subscribed to topic " + topicId
-                        + " which doesn't exist in the topic metadata."
-                );
-            }
-
-            for (int partition = 0; partition < numPartitions; partition++) {
-                if (groupSpec.isPartitionAssignable(topicId, partition)) {
-                    targetPartitions.add(new TopicIdPartition(topicId, 
partition));
-                }
-            }
-        });
-        return targetPartitions;
-    }
-
     private GroupAssignment groupAssignment(
         Map<String, Set<TopicIdPartition>> assignmentByMember,
         Collection<String> allGroupMembers
@@ -390,8 +252,4 @@ public class SimpleAssignor implements 
ShareGroupPartitionAssignor {
 
         return new GroupAssignment(members);
     }
-
-    private static <K, V> HashMap<K, V> newHashMap(int numMappings) {
-        return new HashMap<>((int) (((numMappings + 1) / 0.75f) + 1));
-    }
 }
diff --git 
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleHomogeneousAssignmentBuilder.java
 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleHomogeneousAssignmentBuilder.java
new file mode 100644
index 00000000000..1d44a97cc2f
--- /dev/null
+++ 
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleHomogeneousAssignmentBuilder.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
+import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
+import 
org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
+import org.apache.kafka.server.common.TopicIdPartition;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The homogeneous simple assignment builder is used to generate the target 
assignment for a share group with
+ * all its members subscribed to the same set of topics.
+ * <p>
+ * Assignments are done according to the following principles:
+ * <ol>
+ *   <li>Balance:          Ensure partitions are distributed equally among all 
members.
+ *                         The difference in assignments sizes between any two 
members
+ *                         should not exceed one partition.</li>
+ *   <li>Stickiness:       Minimize partition movements among members by 
retaining
+ *                         as much of the existing assignment as possible.</li>
+ * </ol>
+ * <p>
+ * Balance is prioritized above stickiness.
+ */
+public class SimpleHomogeneousAssignmentBuilder {
+
+    /**
+     * The list of all the topic Ids that the share group is subscribed to.
+     */
+    private final Set<Uuid> subscribedTopicIds;
+
+    /**
+     * The list of members in the consumer group.
+     */
+    private final List<String> memberIds;
+
+    /**
+     * Maps member ids to their indices in the memberIds list.
+     */
+    private final Map<String, Integer> memberIndices;
+
+    /**
+     * The list of all the topic-partitions assignable for the share group.
+     */
+    private final List<TopicIdPartition> targetPartitions;
+
+    /**
+     * The number of members in the share group.
+     */
+    private final int numGroupMembers;
+
+    /**
+     * The desired sharing for each target partition.
+     * For entirely balanced assignment, we would expect (numTargetPartitions 
/ numGroupMembers) partitions per member, rounded upwards.
+     * That can be expressed as:  Math.ceil(numTargetPartitions / (double) 
numGroupMembers)
+     */
+    private final int desiredSharing;
+
+    /**
+     * The desired number of assignments for each share group member.
+     * <p>
+     * Members are stored as integer indices into the memberIds array.
+     */
+    private final int[] desiredAssignmentCount;
+
+    /**
+     * The share group assignment from the group metadata specification at the 
start of the assignment operation.
+     * <p>
+     * Members are stored as integer indices into the memberIds array.
+     */
+    private final Map<Integer, Map<Uuid, Set<Integer>>> oldGroupAssignment;
+
+    /**
+     * The share group assignment calculated iteratively by the assignment 
operation. Entries in this map override those
+     * in the old group assignment map.
+     * <p>
+     * Members are stored as integer indices into the memberIds array.
+     */
+    private final Map<Integer, Map<Uuid, Set<Integer>>> newGroupAssignment;
+
+    /**
+     * The final assignment keyed by topic-partition mapping to member.
+     * <p>
+     * Members are stored as integer indices into the memberIds array.
+     */
+    private final Map<TopicIdPartition, Set<Integer>> 
finalAssignmentByPartition;
+
+    /**
+     * The final assignment keyed by member ID mapping to topic-partitions.
+     * <p>
+     * Members are stored as integer indices into the memberIds array.
+     */
+    private final Map<Integer, Set<TopicIdPartition>> finalAssignmentByMember;
+
+    /**
+     * The set of members which have too few assigned partitions.
+     * <p>
+     * Members are stored as integer indices into the memberIds array.
+     */
+    private final Set<Integer> unfilledMembers;
+
+    /**
+     * The set of members which have too many assigned partitions.
+     * <p>
+     * Members are stored as integer indices into the memberIds array.
+     */
+    private final Set<Integer> overfilledMembers;
+
+    public SimpleHomogeneousAssignmentBuilder(GroupSpec groupSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+        this.subscribedTopicIds = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()).subscribedTopicIds();
+
+        // Number the members 0 to M - 1.
+        this.numGroupMembers = groupSpec.memberIds().size();
+        this.memberIds = new ArrayList<>(groupSpec.memberIds());
+        this.memberIndices = AssignorHelpers.newHashMap(numGroupMembers);
+        for (int memberIndex = 0; memberIndex < numGroupMembers; 
memberIndex++) {
+            memberIndices.put(memberIds.get(memberIndex), memberIndex);
+        }
+
+        this.targetPartitions = 
AssignorHelpers.computeTargetPartitions(groupSpec, subscribedTopicIds, 
subscribedTopicDescriber);
+
+        int numTargetPartitions = targetPartitions.size();
+        if (numTargetPartitions == 0) {
+            this.desiredSharing = 0;
+        } else {
+            this.desiredSharing = (numGroupMembers + numTargetPartitions - 1) 
/ numTargetPartitions;
+        }
+        this.desiredAssignmentCount = new int[numGroupMembers];
+        this.oldGroupAssignment = AssignorHelpers.newHashMap(numGroupMembers);
+        this.newGroupAssignment = AssignorHelpers.newHashMap(numGroupMembers);
+        this.finalAssignmentByPartition = 
AssignorHelpers.newHashMap(numTargetPartitions);
+        this.finalAssignmentByMember = 
AssignorHelpers.newHashMap(numGroupMembers);
+        this.unfilledMembers = AssignorHelpers.newHashSet(numGroupMembers);
+        this.overfilledMembers = AssignorHelpers.newHashSet(numGroupMembers);
+
+        // Extract the old group assignment from the group metadata 
specification.
+        groupSpec.memberIds().forEach(memberId -> {
+            int memberIndex = memberIndices.get(memberId);
+            oldGroupAssignment.put(memberIndex, 
groupSpec.memberAssignment(memberId).partitions());
+        });
+
+        // Calculate the desired number of assignments for each member.
+        // The precise desired assignment count per target partition. This can 
be a fractional number.
+        // We would expect (numGroupMembers / numTargetPartitions) assignments 
per partition, rounded upwards.
+        // Using integer arithmetic:  (numGroupMembers + numTargetPartitions - 
1) / numTargetPartitions
+        double preciseDesiredAssignmentCount = desiredSharing * 
numTargetPartitions / (double) numGroupMembers;
+        for (int memberIndex = 0; memberIndex < numGroupMembers; 
memberIndex++) {
+            desiredAssignmentCount[memberIndex] =
+                (int) Math.ceil(preciseDesiredAssignmentCount * (double) 
(memberIndex + 1)) -
+                    (int) Math.ceil(preciseDesiredAssignmentCount * (double) 
memberIndex);
+        }
+    }
+
+    /**
+     * Here's the step-by-step breakdown of the assignment process:
+     * <ol>
+     *   <li>Revoke partitions from the existing assignment that are no longer 
part of each member's subscriptions.</li>
+     *   <li>Revoke partitions from members which have too many 
partitions.</li>
+     *   <li>Revoke any partitions which are shared more than desired.</li>
+     *   <li>Assign any partitions which have insufficient members 
assigned.</li>
+     * </ol>
+     */
+    public GroupAssignment build() {
+        if (subscribedTopicIds.isEmpty()) {
+            return new GroupAssignment(Map.of());
+        }
+
+        // The order of steps here is not that significant, but 
assignRemainingPartitions must go last.
+        revokeUnassignablePartitions();
+
+        revokeOverfilledMembers();
+
+        revokeOversharedPartitions();
+
+        // Add in any partitions which are currently not in the assignment.
+        targetPartitions.forEach(topicPartition -> 
finalAssignmentByPartition.computeIfAbsent(topicPartition, k -> new 
HashSet<>()));
+
+        assignRemainingPartitions();
+
+        // Combine the old and the new group assignments to give the result.
+        Map<String, MemberAssignment> targetAssignment = 
AssignorHelpers.newHashMap(numGroupMembers);
+        for (int memberIndex = 0; memberIndex < numGroupMembers; 
memberIndex++) {
+            Map<Uuid, Set<Integer>> memberAssignment = 
newGroupAssignment.get(memberIndex);
+            if (memberAssignment == null) {
+                targetAssignment.put(memberIds.get(memberIndex), new 
MemberAssignmentImpl(oldGroupAssignment.get(memberIndex)));
+            } else {
+                targetAssignment.put(memberIds.get(memberIndex), new 
MemberAssignmentImpl(memberAssignment));
+            }
+        }
+
+        return new GroupAssignment(targetAssignment);
+    }
+
+    /**
+     * Examine the members from the current assignment, making sure that no 
member has too many assigned partitions.
+     * When looking at the current assignment, we need to only consider the 
topics in the current assignment that are
+     * also being subscribed in the new assignment.
+     */
+    private void revokeUnassignablePartitions() {
+        for (Map.Entry<Integer, Map<Uuid, Set<Integer>>> entry : 
oldGroupAssignment.entrySet()) {
+            Integer memberIndex = entry.getKey();
+            Map<Uuid, Set<Integer>> oldMemberAssignment = entry.getValue();
+            Map<Uuid, Set<Integer>> newMemberAssignment = null;
+            int memberAssignedPartitions = 0;
+            int desiredAssignmentCountForMember = 
desiredAssignmentCount[memberIndex];
+
+            for (Map.Entry<Uuid, Set<Integer>> oldMemberPartitions : 
oldMemberAssignment.entrySet()) {
+                Uuid topicId = oldMemberPartitions.getKey();
+                Set<Integer> assignedPartitions = 
oldMemberPartitions.getValue();
+
+                if (subscribedTopicIds.contains(topicId)) {
+                    for (int partition : assignedPartitions) {
+                        TopicIdPartition topicPartition = new 
TopicIdPartition(topicId, partition);
+                        memberAssignedPartitions++;
+                        
finalAssignmentByPartition.computeIfAbsent(topicPartition, k -> new 
HashSet<>()).add(memberIndex);
+                        finalAssignmentByMember.computeIfAbsent(memberIndex, k 
-> new HashSet<>()).add(topicPartition);
+                        if (memberAssignedPartitions >= 
desiredAssignmentCountForMember) {
+                            if (newMemberAssignment == null) {
+                                // If the new assignment is null, we create a 
deep copy of the
+                                // original assignment so that we can alter it.
+                                newMemberAssignment = 
AssignorHelpers.deepCopyAssignment(oldMemberAssignment);
+                            }
+                        }
+                    }
+                } else {
+                    if (newMemberAssignment == null) {
+                        // If the new member assignment is null, we create a 
deep copy of the
+                        // original assignment so we can alter it.
+                        newMemberAssignment = 
AssignorHelpers.deepCopyAssignment(oldMemberAssignment);
+                    }
+                    // Remove the entire topic.
+                    newMemberAssignment.remove(topicId);
+                }
+            }
+
+            if (newMemberAssignment != null) {
+                newGroupAssignment.put(memberIndex, newMemberAssignment);
+            }
+
+            if (memberAssignedPartitions < desiredAssignmentCountForMember) {
+                unfilledMembers.add(memberIndex);
+            } else if (memberAssignedPartitions > 
desiredAssignmentCountForMember) {
+                overfilledMembers.add(memberIndex);
+            }
+        }
+    }
+
+    /**
+     * Revoke partitions from members which are overfilled.
+     */
+    private void revokeOverfilledMembers() {
+        if (overfilledMembers.isEmpty())
+            return;
+
+        overfilledMembers.forEach(memberIndex -> {
+            int memberDesiredAssignmentCount = 
desiredAssignmentCount[memberIndex];
+            Set<TopicIdPartition> memberFinalAssignment = 
finalAssignmentByMember.get(memberIndex);
+            if (memberFinalAssignment.size() > memberDesiredAssignmentCount) {
+                Iterator<TopicIdPartition> iterator = 
memberFinalAssignment.iterator();
+                while (iterator.hasNext()) {
+                    TopicIdPartition topicPartition = iterator.next();
+                    
newGroupAssignment.get(memberIndex).get(topicPartition.topicId()).remove(topicPartition.partitionId());
+                    
finalAssignmentByPartition.get(topicPartition).remove(memberIndex);
+                    iterator.remove();
+                    if (memberFinalAssignment.size() == 
memberDesiredAssignmentCount) {
+                        break;
+                    }
+                }
+            }
+        });
+    }
+
+    /**
+     * Revoke any over-shared partitions.
+     */
+    private void revokeOversharedPartitions() {
+        finalAssignmentByPartition.forEach((topicPartition, assignedMembers) 
-> {
+            int assignedMemberCount = assignedMembers.size();
+            if (assignedMemberCount > desiredSharing) {
+                Iterator<Integer> assignedMemberIterator = 
assignedMembers.iterator();
+                while (assignedMemberIterator.hasNext()) {
+                    Integer memberIndex = assignedMemberIterator.next();
+                    Map<Uuid, Set<Integer>> newMemberAssignment = 
newGroupAssignment.get(memberIndex);
+                    if (newMemberAssignment == null) {
+                        newMemberAssignment = 
AssignorHelpers.deepCopyAssignment(oldGroupAssignment.get(memberIndex));
+                        newGroupAssignment.put(memberIndex, 
newMemberAssignment);
+                    }
+                    Set<Integer> partitions = 
newMemberAssignment.get(topicPartition.topicId());
+                    if (partitions != null) {
+                        if (partitions.remove(topicPartition.partitionId())) {
+                            assignedMemberCount--;
+                            assignedMemberIterator.remove();
+                            
finalAssignmentByMember.get(memberIndex).remove(topicPartition);
+                            unfilledMembers.add(memberIndex);
+                        }
+                    }
+                    if (assignedMemberCount <= desiredSharing) {
+                        break;
+                    }
+                }
+            }
+        });
+    }
+
+    /**
+     * Assign partitions to unfilled members. It repeatedly iterates through 
the unfilled members while running
+     * once thrown the set of partitions. When a partition is found that has 
insufficient sharing, it attempts to assign
+     * to one of the partitions.
+     * <p>
+     * There is one tricky cases here and that's where a partition wants 
another assignment, but none of the unfilled
+     * members are able to take it (because they already have that partition). 
In this situation, we just accept that
+     * no additional assignments for this partition could be made and carry 
on. In theory, a different shuffling of the
+     * partitions would be able to achieve better balance, but it's harmless 
tolerating a slight imbalance in this case.
+     * <p>
+     * Note that finalAssignmentByMember is not maintained by this method 
which is expected to be the final step in the
+     * computation.
+     */
+    private void assignRemainingPartitions() {
+        if (unfilledMembers.isEmpty())
+            return;
+
+        Iterator<Integer> memberIterator = unfilledMembers.iterator();
+        boolean partitionAssignedForThisIterator = false;
+        for (Map.Entry<TopicIdPartition, Set<Integer>> partitionAssignment : 
finalAssignmentByPartition.entrySet()) {
+            TopicIdPartition topicPartition = partitionAssignment.getKey();
+            Set<Integer> membersAssigned = partitionAssignment.getValue();
+
+            if (membersAssigned.size() < desiredSharing) {
+                int assignmentsToMake = desiredSharing - 
membersAssigned.size();
+                while (assignmentsToMake > 0) {
+                    if (!memberIterator.hasNext()) {
+                        if (!partitionAssignedForThisIterator) {
+                            break;
+                        }
+                        memberIterator = unfilledMembers.iterator();
+                        partitionAssignedForThisIterator = false;
+                    }
+                    int memberIndex = memberIterator.next();
+                    if (!membersAssigned.contains(memberIndex)) {
+                        Map<Uuid, Set<Integer>> newMemberAssignment = 
newGroupAssignment.get(memberIndex);
+                        if (newMemberAssignment == null) {
+                            newMemberAssignment = 
AssignorHelpers.deepCopyAssignment(oldGroupAssignment.get(memberIndex));
+                            newGroupAssignment.put(memberIndex, 
newMemberAssignment);
+                        }
+                        
newMemberAssignment.computeIfAbsent(topicPartition.topicId(), k -> new 
HashSet<>()).add(topicPartition.partitionId());
+                        finalAssignmentByMember.computeIfAbsent(memberIndex, k 
-> new HashSet<>()).add(topicPartition);
+                        assignmentsToMake--;
+                        partitionAssignedForThisIterator = true;
+                        if (finalAssignmentByMember.get(memberIndex).size() >= 
desiredAssignmentCount[memberIndex]) {
+                            memberIterator.remove();
+                        }
+                    }
+                }
+            }
+
+            if (unfilledMembers.isEmpty()) {
+                break;
+            }
+        }
+    }
+}
diff --git 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
index 498620248a8..7550ad400e7 100644
--- 
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
+++ 
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
@@ -202,23 +202,7 @@ public class SimpleAssignorTest {
             subscribedTopicMetadata
         );
 
-        // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
-        // Step 1 -> T1:0 -> MEMBER_A and T1:1 -> MEMBER_B by hash assignment.
-        // Step 2 -> T1:2, T3:1 -> MEMBER_A and T3:0 -> MEMBER_B by 
round-robin assignment.
-        // Step 3 -> no new assignment gets added by current assignment since 
it is empty.
-        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
-        expectedAssignment.put(MEMBER_A, mkAssignment(
-            mkTopicAssignment(TOPIC_1_UUID, 0, 2),
-            mkTopicAssignment(TOPIC_3_UUID, 1)
-        ));
-        expectedAssignment.put(MEMBER_B, mkAssignment(
-            mkTopicAssignment(TOPIC_1_UUID, 1),
-            mkTopicAssignment(TOPIC_3_UUID, 0)
-        ));
-
-        // T1: 3 partitions + T3: 2 partitions = 5 partitions
         assertEveryPartitionGetsAssignment(5, computedAssignment);
-        assertAssignment(expectedAssignment, computedAssignment);
     }
 
     @Test
@@ -268,23 +252,7 @@ public class SimpleAssignorTest {
             subscribedTopicMetadata
         );
 
-        // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
-        // Step 1 -> T1:0 -> MEMBER_A and T1:1 -> MEMBER_B by hash assignment.
-        // Step 2 -> T1:2, T3:1 -> MEMBER_A and T3:0 -> MEMBER_B by 
round-robin assignment.
-        // Step 3 -> no new assignment gets added by current assignment since 
it is empty.
-        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
-        expectedAssignment.put(MEMBER_A, mkAssignment(
-            mkTopicAssignment(TOPIC_1_UUID, 0, 2),
-            mkTopicAssignment(TOPIC_3_UUID, 1)
-        ));
-        expectedAssignment.put(MEMBER_B, mkAssignment(
-            mkTopicAssignment(TOPIC_1_UUID, 1),
-            mkTopicAssignment(TOPIC_3_UUID, 0)
-        ));
-
-        // T1: 3 partitions + T3: 2 partitions = 5 partitions
         assertEveryPartitionGetsAssignment(5, computedAssignment);
-        assertAssignment(expectedAssignment, computedAssignment);
     }
 
     @Test
@@ -332,17 +300,7 @@ public class SimpleAssignorTest {
             subscribedTopicMetadata
         );
 
-        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new 
HashMap<>();
-        expectedAssignment.put(MEMBER_A, mkAssignment(
-            mkTopicAssignment(TOPIC_1_UUID, 1, 2)
-        ));
-        expectedAssignment.put(MEMBER_B, mkAssignment(
-            mkTopicAssignment(TOPIC_1_UUID, 0)
-        ));
-
-        // T1: 3 partitions + T3(non-assignable): 2 partitions = 3 partitions
         assertEveryPartitionGetsAssignment(3, computedAssignment);
-        assertAssignment(expectedAssignment, computedAssignment);
     }
 
     @Test
@@ -675,182 +633,121 @@ public class SimpleAssignorTest {
     }
 
     @Test
-    public void testRoundRobinAssignmentWithCount() {
-        String member1 = "member1";
-        String member2 = "member2";
-        List<String> members = List.of(member1, member2);
-        TopicIdPartition partition1 = new TopicIdPartition(TOPIC_1_UUID, 0);
-        TopicIdPartition partition2 = new TopicIdPartition(TOPIC_2_UUID, 0);
-        TopicIdPartition partition3 = new TopicIdPartition(TOPIC_3_UUID, 0);
-        TopicIdPartition partition4 = new TopicIdPartition(TOPIC_4_UUID, 0);
-        List<TopicIdPartition> unassignedPartitions = List.of(partition2, 
partition3, partition4);
+    public void testIncrementalAssignmentIncreasingMembersHomogeneous() {
+        final int numPartitions = 24;
+        final int numMembers = 101;
 
-        Map<String, Set<TopicIdPartition>> assignment = new HashMap<>();
-        assignment.put(member1, new HashSet<>(Set.of(partition1)));
-        assignment.put(member2, new HashSet<>(Set.of(partition1)));
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(TOPIC_1_UUID, TOPIC_1_NAME, numPartitions)
+            .build();
 
-        assignor.roundRobinAssignmentWithCount(members, unassignedPartitions, 
assignment, 2);
-        Map<String, Set<TopicIdPartition>> expectedAssignment = Map.of(
-            member1, Set.of(partition1, partition2, partition4),
-            member2, Set.of(partition1, partition3)
+        SubscribedTopicDescriberImpl subscribedTopicMetadata = new 
SubscribedTopicDescriberImpl(
+            metadataImage
         );
 
-        assertFinalAssignment(expectedAssignment, assignment);
-    }
+        Set<Uuid> topicsSubscription = new LinkedHashSet<>();
+        topicsSubscription.add(TOPIC_1_UUID);
+        Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
HashMap<>();
 
-    @Test
-    public void testRoundRobinAssignmentWithCountTooManyPartitions() {
-        String member1 = "member1";
-        String member2 = "member2";
-        List<String> members = List.of(member1, member2);
-        TopicIdPartition partition1 = new TopicIdPartition(TOPIC_1_UUID, 0);
-        TopicIdPartition partition2 = new TopicIdPartition(TOPIC_2_UUID, 0);
-        TopicIdPartition partition3 = new TopicIdPartition(TOPIC_3_UUID, 0);
-        TopicIdPartition partition4 = new TopicIdPartition(TOPIC_4_UUID, 0);
-        TopicIdPartition partition5 = new TopicIdPartition(TOPIC_4_UUID, 1);
-        TopicIdPartition partition6 = new TopicIdPartition(TOPIC_4_UUID, 2);
-        List<TopicIdPartition> unassignedPartitions = List.of(partition2, 
partition3, partition4, partition5, partition6);
+        SimpleAssignor assignor = new SimpleAssignor();
 
-        Map<String, Set<TopicIdPartition>> assignment = new HashMap<>();
-        assignment.put(member1, new HashSet<>(Set.of(partition1)));
-        assignment.put(member2, new HashSet<>(Set.of(partition1)));
+        // Increase the number of members one a time, checking that the 
partitions are assigned as expected
+        for (int member = 0; member < numMembers; member++) {
+            String newMemberId = "M" + member;
+            members.put(newMemberId, new MemberSubscriptionAndAssignmentImpl(
+                Optional.empty(),
+                Optional.empty(),
+                topicsSubscription,
+                Assignment.EMPTY
+            ));
 
-        assertThrows(PartitionAssignorException.class,
-            () -> assignor.roundRobinAssignmentWithCount(members, 
unassignedPartitions, assignment, 2));
+            GroupSpec groupSpec = new GroupSpecImpl(
+                members,
+                HOMOGENEOUS,
+                new HashMap<>()
+            );
+
+            GroupAssignment computedAssignment = assignor.assign(groupSpec, 
subscribedTopicMetadata);
+            assertEveryPartitionGetsAssignment(numPartitions, 
computedAssignment);
+
+            computedAssignment.members().forEach((memberId, partitions) -> 
members.put(memberId, new MemberSubscriptionAndAssignmentImpl(
+                Optional.empty(),
+                Optional.empty(),
+                topicsSubscription,
+                new Assignment(partitions.partitions())
+            )));
+        }
     }
 
     @Test
-    public void testAssignWithCurrentAssignmentHomogeneous() {
-        // Current assignment setup - Two members A, B subscribing to T1 and 
T2.
-        MetadataImage metadataImage1 = new MetadataImageBuilder()
-            .addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 3)
-            .addTopic(TOPIC_2_UUID, TOPIC_2_NAME, 2)
+    public void testIncrementalAssignmentDecreasingMembersHomogeneous() {
+        final int numPartitions = 24;
+        final int numMembers = 101;
+
+        MetadataImage metadataImage = new MetadataImageBuilder()
+            .addTopic(TOPIC_1_UUID, TOPIC_1_NAME, numPartitions)
             .build();
 
-        Map<String, MemberSubscriptionAndAssignmentImpl> members1 = new 
HashMap<>();
+        SubscribedTopicDescriberImpl subscribedTopicMetadata = new 
SubscribedTopicDescriberImpl(
+            metadataImage
+        );
 
-        Set<Uuid> topicsSubscription1 = new LinkedHashSet<>();
-        topicsSubscription1.add(TOPIC_1_UUID);
-        topicsSubscription1.add(TOPIC_2_UUID);
+        Set<Uuid> topicsSubscription = new LinkedHashSet<>();
+        topicsSubscription.add(TOPIC_1_UUID);
+        Map<String, MemberSubscriptionAndAssignmentImpl> members = new 
HashMap<>();
 
-        members1.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
-            Optional.empty(),
-            Optional.empty(),
-            topicsSubscription1,
-            Assignment.EMPTY
-        ));
+        SimpleAssignor assignor = new SimpleAssignor();
 
-        members1.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
-            Optional.empty(),
-            Optional.empty(),
-            topicsSubscription1,
-            Assignment.EMPTY
-        ));
+        for (int member = 0; member < numMembers; member++) {
+            String newMemberId = "M" + member;
+            members.put(newMemberId, new MemberSubscriptionAndAssignmentImpl(
+                Optional.empty(),
+                Optional.empty(),
+                topicsSubscription,
+                Assignment.EMPTY
+            ));
+        }
 
-        GroupSpec groupSpec1 = new GroupSpecImpl(
-            members1,
+        GroupSpec groupSpec = new GroupSpecImpl(
+            members,
             HOMOGENEOUS,
-            Map.of()
-        );
-        SubscribedTopicDescriberImpl subscribedTopicMetadata1 = new 
SubscribedTopicDescriberImpl(
-            metadataImage1
+            new HashMap<>()
         );
 
-        GroupAssignment computedAssignment1 = assignor.assign(
-            groupSpec1,
-            subscribedTopicMetadata1
-        );
-
-        // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
-        // Step 1 -> T1:0 -> MEMBER_A and T1:1 -> MEMBER_B by hash assignment.
-        // Step 2 -> T1:2, T2:1 -> MEMBER_A and T2:0 -> MEMBER_B by 
round-robin assignment.
-        // Step 3 -> no new assignment gets added by current assignment since 
it is empty.
-        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment1 = new 
HashMap<>();
-        expectedAssignment1.put(MEMBER_A, mkAssignment(
-            mkTopicAssignment(TOPIC_1_UUID, 0, 2),
-            mkTopicAssignment(TOPIC_2_UUID, 1)
-        ));
-        expectedAssignment1.put(MEMBER_B, mkAssignment(
-            mkTopicAssignment(TOPIC_1_UUID, 1),
-            mkTopicAssignment(TOPIC_2_UUID, 0)
-        ));
-
-        // T1: 3 partitions + T2: 2 partitions = 5 partitions
-        assertEveryPartitionGetsAssignment(5, computedAssignment1);
-        assertAssignment(expectedAssignment1, computedAssignment1);
-
-        // New assignment setup - Three members A, B, C subscribing to T2 and 
T3.
-        MetadataImage metadataImage2 = new MetadataImageBuilder()
-            .addTopic(TOPIC_2_UUID, TOPIC_2_NAME, 2)
-            .addTopic(TOPIC_3_UUID, TOPIC_3_NAME, 3)
-            .build();
-
-        Map<String, MemberSubscriptionAndAssignmentImpl> members2 = new 
HashMap<>();
+        GroupAssignment computedAssignment = assignor.assign(groupSpec, 
subscribedTopicMetadata);
+        assertEveryPartitionGetsAssignment(numPartitions, computedAssignment);
 
-        Set<Uuid> topicsSubscription2 = new LinkedHashSet<>();
-        topicsSubscription2.add(TOPIC_2_UUID);
-        topicsSubscription2.add(TOPIC_3_UUID);
-
-        members2.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
-            Optional.empty(),
-            Optional.empty(),
-            topicsSubscription2,
-            // Utilizing the assignment from current assignment
-            new Assignment(mkAssignment(
-                mkTopicAssignment(TOPIC_1_UUID, 0, 2),
-                mkTopicAssignment(TOPIC_2_UUID, 1)))
-        ));
-
-        members2.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
-            Optional.empty(),
-            Optional.empty(),
-            topicsSubscription2,
-            new Assignment(mkAssignment(
-                mkTopicAssignment(TOPIC_1_UUID, 1),
-                mkTopicAssignment(TOPIC_2_UUID, 0)))
-        ));
+        for (int member = 0; member < numMembers; member++) {
+            String newMemberId = "M" + member;
+            members.put(newMemberId, new MemberSubscriptionAndAssignmentImpl(
+                Optional.empty(),
+                Optional.empty(),
+                topicsSubscription,
+                new 
Assignment(computedAssignment.members().get(newMemberId).partitions()))
+            );
+        }
 
-        members2.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl(
-            Optional.empty(),
-            Optional.empty(),
-            topicsSubscription2,
-            Assignment.EMPTY
-        ));
+        // Decrease the number of members one a time, checking that the 
partitions are assigned as expected
+        for (int member = numMembers - 1; member > 0; member--) {
+            String newMemberId = "M" + member;
+            members.remove(newMemberId);
 
-        GroupSpec groupSpec2 = new GroupSpecImpl(
-            members2,
-            HOMOGENEOUS,
-            Map.of()
-        );
-        SubscribedTopicDescriberImpl subscribedTopicMetadata2 = new 
SubscribedTopicDescriberImpl(
-            metadataImage2
-        );
-
-        GroupAssignment computedAssignment2 = assignor.assign(
-            groupSpec2,
-            subscribedTopicMetadata2
-        );
+            groupSpec = new GroupSpecImpl(
+                members,
+                HOMOGENEOUS,
+                new HashMap<>()
+            );
 
-        // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of 
MEMBER_C is 67.
-        // Step 1 -> T2:0 -> MEMBER_A, T2:1 -> MEMBER_B, T3:0 -> MEMBER_C by 
hash assignment
-        // Step 2 -> T3:1 -> MEMBER_A, T3:2 -> MEMBER_B by round-robin 
assignment
-        // Step 3 -> no new addition by current assignment since T2:0 and T2:1 
were already a part of new assignment.
-        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment2 = new 
HashMap<>();
-        expectedAssignment2.put(MEMBER_A, mkAssignment(
-            mkTopicAssignment(TOPIC_2_UUID, 0),
-            mkTopicAssignment(TOPIC_3_UUID, 1)
-        ));
-        expectedAssignment2.put(MEMBER_B, mkAssignment(
-            mkTopicAssignment(TOPIC_2_UUID, 1),
-            mkTopicAssignment(TOPIC_3_UUID, 2)
-        ));
-        expectedAssignment2.put(MEMBER_C, mkAssignment(
-            mkTopicAssignment(TOPIC_3_UUID, 0)
-        ));
+            computedAssignment = assignor.assign(groupSpec, 
subscribedTopicMetadata);
+            assertEveryPartitionGetsAssignment(numPartitions, 
computedAssignment);
 
-        // T2: 2 partitions + T3: 3 partitions = 5 partitions
-        assertEveryPartitionGetsAssignment(5, computedAssignment2);
-        assertAssignment(expectedAssignment2, computedAssignment2);
+            computedAssignment.members().forEach((memberId, partitions) -> 
members.put(memberId, new MemberSubscriptionAndAssignmentImpl(
+                Optional.empty(),
+                Optional.empty(),
+                topicsSubscription,
+                new Assignment(partitions.partitions())
+            )));
+        }
     }
 
     @Test
@@ -905,25 +802,7 @@ public class SimpleAssignorTest {
             subscribedTopicMetadata1
         );
 
-        // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of 
MEMBER_C is 67.
-        // Step 1 -> T2:2 -> member_A, T3:0 -> member_B, T2:2 -> member_C by 
hash assignment.
-        // Step 2 -> T1:0, T1:1, T1:2, T2:0 -> member_A, T3:1, -> member_B, 
T2:1 -> member_C by round-robin assignment.
-        // Step 3 -> no new assignment gets added by current assignment since 
it is empty.
-        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment1 = new 
HashMap<>();
-        expectedAssignment1.put(MEMBER_A, mkAssignment(
-            mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
-            mkTopicAssignment(TOPIC_2_UUID, 0, 2)
-        ));
-        expectedAssignment1.put(MEMBER_B, mkAssignment(
-            mkTopicAssignment(TOPIC_3_UUID, 0, 1)
-        ));
-        expectedAssignment1.put(MEMBER_C, mkAssignment(
-            mkTopicAssignment(TOPIC_2_UUID, 1, 2)
-        ));
-
-        // T1: 3 partitions + T2: 3 partitions + T3: 2 partitions = 8 
partitions
         assertEveryPartitionGetsAssignment(8, computedAssignment1);
-        assertAssignment(expectedAssignment1, computedAssignment1);
 
         // New assignment setup - 2 members A - {T1, T2, T3}, B - {T3, T4}.
 
@@ -977,23 +856,7 @@ public class SimpleAssignorTest {
             subscribedTopicMetadata2
         );
 
-        // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
-        // Step 1 -> T1:1 -> member_A, T3:0 -> member_B by hash assignment.
-        // Step 2 -> T2:1 -> member_A, T4:0 -> member_B by round-robin 
assignment.
-        // Step 3 -> T1:0, T1:2, T2:0 -> member_A,  T3:1 -> member_B by 
current assignment.
-        Map<String, Map<Uuid, Set<Integer>>> expectedAssignment2 = new 
HashMap<>();
-        expectedAssignment2.put(MEMBER_A, mkAssignment(
-            mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
-            mkTopicAssignment(TOPIC_2_UUID, 0, 1, 2)
-        ));
-        expectedAssignment2.put(MEMBER_B, mkAssignment(
-            mkTopicAssignment(TOPIC_3_UUID, 0, 1),
-            mkTopicAssignment(TOPIC_4_UUID, 0)
-        ));
-
-        // T1: 3 partitions + T2: 3 partitions + T3: 2 partitions + T4: 1 
partition = 9 partitions
         assertEveryPartitionGetsAssignment(9, computedAssignment2);
-        assertAssignment(expectedAssignment2, computedAssignment2);
     }
 
     private void assertAssignment(
@@ -1019,18 +882,6 @@ public class SimpleAssignorTest {
         });
     }
 
-    private void assertFinalAssignment(
-        Map<String, Set<TopicIdPartition>> expectedAssignment,
-        Map<String, Set<TopicIdPartition>> computedAssignment
-    ) {
-        assertEquals(expectedAssignment.size(), computedAssignment.size());
-        expectedAssignment.forEach((memberId, partitions) -> {
-            Set<TopicIdPartition> computedPartitions = 
computedAssignment.getOrDefault(memberId, Set.of());
-            assertEquals(partitions.size(), computedPartitions.size());
-            partitions.forEach(member -> 
assertTrue(computedPartitions.contains(member)));
-        });
-    }
-
     private void assertEveryPartitionGetsAssignment(
         int expectedPartitions,
         GroupAssignment computedGroupAssignment
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ShareGroupAssignorBenchmark.java
 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ShareGroupAssignorBenchmark.java
index 6c6db6d75b1..8febd4f0488 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ShareGroupAssignorBenchmark.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ShareGroupAssignorBenchmark.java
@@ -95,7 +95,7 @@ public class ShareGroupAssignorBenchmark {
     @Param({"1", "10", "100"})
     private int partitionCount;
 
-    @Param({"10", "100"})
+    @Param({"1", "10", "100"})
     private int topicCount;
 
     @Param({"HOMOGENEOUS", "HETEROGENEOUS"})

Reply via email to