This is an automated email from the ASF dual-hosted git repository.
schofielaj pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4690527fabf KAFKA-19362: Finalize homogeneous simple share assignor
(#19977)
4690527fabf is described below
commit 4690527fabfebbbde0fd99501f072b453ef0f962
Author: Andrew Schofield <[email protected]>
AuthorDate: Fri Jun 20 16:10:47 2025 +0100
KAFKA-19362: Finalize homogeneous simple share assignor (#19977)
Finalise the share group SimpleAssignor for homogeneous subscriptions.
The assignor code is much more accurate about the number of partitions
assigned to each member, and the number of members assigned for each
partition. It eliminates the idea of hash-based assignment because that
has been shown to the unhelpful. The revised code is very much more
effective at assigning evenly as the number of members grows and shrinks
over time.
A future PR will address the code for heterogeneous subscriptions.
Reviewers: Apoorv Mittal <[email protected]>
---
.../group/assignor/AssignorHelpers.java | 56 +++
.../coordinator/group/assignor/SimpleAssignor.java | 222 +++---------
.../SimpleHomogeneousAssignmentBuilder.java | 385 +++++++++++++++++++++
.../group/assignor/SimpleAssignorTest.java | 331 +++++-------------
.../jmh/assignor/ShareGroupAssignorBenchmark.java | 2 +-
5 files changed, 573 insertions(+), 423 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignorHelpers.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignorHelpers.java
index 2b30b0b3ae9..5fbe65067b2 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignorHelpers.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/AssignorHelpers.java
@@ -17,10 +17,16 @@
package org.apache.kafka.coordinator.group.assignor;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
+import
org.apache.kafka.coordinator.group.api.assignor.PartitionAssignorException;
+import
org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.server.common.TopicIdPartition;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -51,4 +57,54 @@ public final class AssignorHelpers {
}
return copy;
}
+
+ /**
+ * Computes the list of target partitions which can be assigned to
members. This list includes all partitions
+ * for the subscribed topic IDs, with the additional check that they must
be assignable.
+ * @param groupSpec The assignment spec which includes
member metadata.
+ * @param subscribedTopicIds The set of subscribed topic IDs.
+ * @param subscribedTopicDescriber The topic and partition metadata
describer.
+ * @return The list of target partitions.
+ */
+ static List<TopicIdPartition> computeTargetPartitions(
+ GroupSpec groupSpec,
+ Set<Uuid> subscribedTopicIds,
+ SubscribedTopicDescriber subscribedTopicDescriber
+ ) {
+ List<TopicIdPartition> targetPartitions = new ArrayList<>();
+ subscribedTopicIds.forEach(topicId -> {
+ int numPartitions =
subscribedTopicDescriber.numPartitions(topicId);
+ if (numPartitions == -1) {
+ throw new PartitionAssignorException(
+ "Members are subscribed to topic " + topicId + " which
doesn't exist in the topic metadata."
+ );
+ }
+
+ for (int partition = 0; partition < numPartitions; partition++) {
+ if (groupSpec.isPartitionAssignable(topicId, partition)) {
+ targetPartitions.add(new TopicIdPartition(topicId,
partition));
+ }
+ }
+ });
+
+ return targetPartitions;
+ }
+
+ /**
+ * Constructs a HashNap with a known capacity. This is equivalent to
HashMap.newHashMap which is introduced in Java 19.
+ * @param numMappings The expected number of mappings.
+ * @return The newly created map.
+ */
+ static <K, V> HashMap<K, V> newHashMap(int numMappings) {
+ return new HashMap<>((int) (((numMappings + 1) / 0.75f) + 1));
+ }
+
+ /**
+ * Constructs a HashSet with a known capacity. This is equivalent to
HashSet.newHashSet which is introduced in Java 19.
+ * @param numElements The expected number of elements.
+ * @return The newly created set.
+ */
+ static <K> HashSet<K> newHashSet(int numElements) {
+ return new HashSet<>((int) (((numElements + 1) / 0.75f) + 1));
+ }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
index da04257edf6..8238be7c58e 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java
@@ -27,6 +27,9 @@ import
org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
import org.apache.kafka.server.common.TopicIdPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
@@ -34,7 +37,6 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
-import java.util.ListIterator;
import java.util.Map;
import java.util.Set;
@@ -42,7 +44,7 @@ import static
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.H
/**
* A simple partition assignor for share groups that assigns partitions of the
subscribed topics
- * based on the rules defined in KIP-932 to different members. It is not
rack-aware.
+ * to different members based on the rules defined in KIP-932. It is not
rack-aware.
* <p>
* Assignments are done according to the following principles:
* <ol>
@@ -56,51 +58,39 @@ import static
org.apache.kafka.coordinator.group.api.assignor.SubscriptionType.H
* Balance is prioritized above stickiness.
*/
public class SimpleAssignor implements ShareGroupPartitionAssignor {
-
+ private static final Logger log =
LoggerFactory.getLogger(SimpleAssignor.class);
private static final String SIMPLE_ASSIGNOR_NAME = "simple";
+ /**
+ * Unique name for this assignor.
+ */
@Override
public String name() {
return SIMPLE_ASSIGNOR_NAME;
}
+ /**
+ * Assigns partitions to group members based on the given assignment
specification and topic metadata.
+ *
+ * @param groupSpec The assignment spec which includes
member metadata.
+ * @param subscribedTopicDescriber The topic and partition metadata
describer.
+ * @return The new assignment for the group.
+ */
@Override
- public GroupAssignment assign(
- GroupSpec groupSpec,
- SubscribedTopicDescriber subscribedTopicDescriber
- ) throws PartitionAssignorException {
+ public GroupAssignment assign(GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber) throws
PartitionAssignorException {
if (groupSpec.memberIds().isEmpty())
return new GroupAssignment(Map.of());
if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
- return assignHomogeneous(groupSpec, subscribedTopicDescriber);
+ log.debug("Detected that all members are subscribed to the same
set of topics, invoking the homogeneous assignment algorithm");
+ return new SimpleHomogeneousAssignmentBuilder(groupSpec,
subscribedTopicDescriber).build();
} else {
+ log.debug("Detected that the members are subscribed to different
sets of topics, invoking the heterogeneous assignment algorithm");
return assignHeterogeneous(groupSpec, subscribedTopicDescriber);
}
}
- private GroupAssignment assignHomogeneous(
- GroupSpec groupSpec,
- SubscribedTopicDescriber subscribedTopicDescriber
- ) {
- Set<Uuid> subscribedTopicIds =
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
- .subscribedTopicIds();
- if (subscribedTopicIds.isEmpty())
- return new GroupAssignment(Map.of());
-
- // Subscribed topic partitions for the share group.
- List<TopicIdPartition> targetPartitions = computeTargetPartitions(
- groupSpec, subscribedTopicIds, subscribedTopicDescriber);
-
- // The current assignment from topic partition to members.
- Map<TopicIdPartition, List<String>> currentAssignment =
currentAssignment(groupSpec);
- return newAssignmentHomogeneous(groupSpec, subscribedTopicIds,
targetPartitions, currentAssignment);
- }
-
- private GroupAssignment assignHeterogeneous(
- GroupSpec groupSpec,
- SubscribedTopicDescriber subscribedTopicDescriber
- ) {
+ private GroupAssignment assignHeterogeneous(GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber) {
Map<String, List<TopicIdPartition>> memberToPartitionsSubscription =
new HashMap<>();
for (String memberId : groupSpec.memberIds()) {
MemberSubscription spec = groupSpec.memberSubscription(memberId);
@@ -108,22 +98,23 @@ public class SimpleAssignor implements
ShareGroupPartitionAssignor {
continue;
// Subscribed topic partitions for the share group member.
- List<TopicIdPartition> targetPartitions = computeTargetPartitions(
- groupSpec, spec.subscribedTopicIds(),
subscribedTopicDescriber);
+ List<TopicIdPartition> targetPartitions =
AssignorHelpers.computeTargetPartitions(groupSpec, spec.subscribedTopicIds(),
subscribedTopicDescriber);
memberToPartitionsSubscription.put(memberId, targetPartitions);
}
// The current assignment from topic partition to members.
Map<TopicIdPartition, List<String>> currentAssignment =
currentAssignment(groupSpec);
+
return newAssignmentHeterogeneous(groupSpec,
memberToPartitionsSubscription, currentAssignment);
}
/**
* Get the current assignment by topic partitions.
+ *
* @param groupSpec The group metadata specifications.
* @return the current assignment for subscribed topic partitions to
memberIds.
*/
- private Map<TopicIdPartition, List<String>> currentAssignment(GroupSpec
groupSpec) {
+ static Map<TopicIdPartition, List<String>> currentAssignment(GroupSpec
groupSpec) {
Map<TopicIdPartition, List<String>> assignment = new HashMap<>();
for (String member : groupSpec.memberIds()) {
@@ -131,80 +122,16 @@ public class SimpleAssignor implements
ShareGroupPartitionAssignor {
assignedTopicPartitions.forEach((topicId, partitions) ->
partitions.forEach(
partition -> assignment.computeIfAbsent(new
TopicIdPartition(topicId, partition), k -> new ArrayList<>()).add(member)));
}
- return assignment;
- }
-
- /**
- * This function computes the new assignment for a homogeneous group.
- * @param groupSpec The group metadata specifications.
- * @param subscribedTopicIds The set of all the subscribed topic ids for
the group.
- * @param targetPartitions The list of all topic partitions that need
assignment.
- * @param currentAssignment The current assignment for subscribed topic
partitions to memberIds.
- * @return the new partition assignment for the members of the group.
- */
- private GroupAssignment newAssignmentHomogeneous(
- GroupSpec groupSpec,
- Set<Uuid> subscribedTopicIds,
- List<TopicIdPartition> targetPartitions,
- Map<TopicIdPartition, List<String>> currentAssignment
- ) {
- // For entirely balanced assignment, we would expect
(numTargetPartitions / numGroupMembers) partitions per member, rounded upwards.
- // That can be expressed as Math.ceil(numTargetPartitions /
(double) numGroupMembers)
- // Using integer arithmetic, as (numTargetPartitions +
numGroupMembers - 1) / numGroupMembers
- int numGroupMembers = groupSpec.memberIds().size();
- int numTargetPartitions = targetPartitions.size();
- int desiredAssignmentCount = (numTargetPartitions + numGroupMembers -
1) / numGroupMembers;
- Map<TopicIdPartition, List<String>> newAssignment =
newHashMap(numTargetPartitions);
-
- // Hash member IDs to topic partitions. Each member will be assigned
one partition, but some partitions
- // might have been assigned to more than one member.
- memberHashAssignment(groupSpec.memberIds(), targetPartitions,
newAssignment);
-
- // Combine current and new hashed assignments, sized to accommodate
the expected number of mappings.
- Map<String, Set<TopicIdPartition>> finalAssignment =
newHashMap(numGroupMembers);
- Map<TopicIdPartition, Set<String>> finalAssignmentByPartition =
newHashMap(numTargetPartitions);
-
- // First, take the members assigned by hashing.
- newAssignment.forEach((targetPartition, members) ->
members.forEach(member -> {
- finalAssignment.computeIfAbsent(member, k -> new
HashSet<>()).add(targetPartition);
- finalAssignmentByPartition.computeIfAbsent(targetPartition, k ->
new HashSet<>()).add(member);
- }));
-
- // Then, take the members from the current assignment, making sure
that no member has too many assigned partitions.
- // When combining current assignment, we need to only consider the
topics in current assignment that are also being
- // subscribed in the new assignment as well.
- currentAssignment.forEach((targetPartition, members) -> {
- if (subscribedTopicIds.contains(targetPartition.topicId())) {
- members.forEach(member -> {
- if (groupSpec.memberIds().contains(member)) {
- Set<TopicIdPartition> memberPartitions =
finalAssignment.computeIfAbsent(member, k -> new HashSet<>());
- if ((memberPartitions.size() < desiredAssignmentCount)
&& !newAssignment.containsKey(targetPartition)) {
- memberPartitions.add(targetPartition);
-
finalAssignmentByPartition.computeIfAbsent(targetPartition, k -> new
HashSet<>()).add(member);
- }
- }
- });
- }
- });
-
- // Finally, round-robin assignment for unassigned partitions which do
not already have members assigned.
- // The order of steps differs slightly from KIP-932 because the
desired assignment count has been taken into
- // account when copying partitions across from the current assignment,
and this is more convenient.
- List<TopicIdPartition> unassignedPartitions = targetPartitions.stream()
- .filter(targetPartition ->
!finalAssignmentByPartition.containsKey(targetPartition))
- .toList();
-
- roundRobinAssignmentWithCount(groupSpec.memberIds(),
unassignedPartitions, finalAssignment, desiredAssignmentCount);
-
- return groupAssignment(finalAssignment, groupSpec.memberIds());
+ return assignment;
}
/**
* This function computes the new assignment for a heterogeneous group.
- * @param groupSpec The group metadata
specifications.
- * @param memberToPartitionsSubscription The member to subscribed topic
partitions map.
- * @param currentAssignment The current assignment for
subscribed topic partitions to memberIds.
+ *
+ * @param groupSpec The group metadata specifications.
+ * @param memberToPartitionsSubscription The member to subscribed topic
partitions map.
+ * @param currentAssignment The current assignment for
subscribed topic partitions to memberIds.
* @return the new partition assignment for the members of the group.
*/
private GroupAssignment newAssignmentHeterogeneous(
@@ -241,7 +168,7 @@ public class SimpleAssignor implements
ShareGroupPartitionAssignor {
roundRobinAssignment(topicToMemberSubscription.get(unassignedTopic),
unassignedPartitions.get(unassignedTopic), newAssignment));
// Step 3: We combine current assignment and new assignment.
- Map<String, Set<TopicIdPartition>> finalAssignment =
newHashMap(numGroupMembers);
+ Map<String, Set<TopicIdPartition>> finalAssignment =
AssignorHelpers.newHashMap(numGroupMembers);
newAssignment.forEach((targetPartition, members) ->
members.forEach(member ->
finalAssignment.computeIfAbsent(member, k -> new
HashSet<>()).add(targetPartition)));
@@ -260,10 +187,11 @@ public class SimpleAssignor implements
ShareGroupPartitionAssignor {
/**
* This function updates assignment by hashing the member IDs of the
members and maps the partitions assigned to the
* members based on the hash, one partition per member. This gives
approximately even balance.
- * @param memberIds The member ids to which the topic partitions
need to be assigned.
- * @param partitionsToAssign The subscribed topic partitions which needs
assignment.
- * @param assignment The existing assignment by topic partition.
We need to pass it as a parameter because this
- * method can be called multiple times for
heterogeneous assignment.
+ *
+ * @param memberIds The member ids to which the topic partitions
need to be assigned.
+ * @param partitionsToAssign The subscribed topic partitions which needs
assignment.
+ * @param assignment The existing assignment by topic partition.
We need to pass it as a parameter because this
+ * method can be called multiple times for
heterogeneous assignment.
*/
// Visible for testing
void memberHashAssignment(
@@ -282,10 +210,11 @@ public class SimpleAssignor implements
ShareGroupPartitionAssignor {
/**
* This functions assigns topic partitions to members by a round-robin
approach and updates the existing assignment.
- * @param memberIds The member ids to which the topic
partitions need to be assigned, should be non-empty.
- * @param partitionsToAssign The subscribed topic partitions which
needs assignment.
- * @param assignment The existing assignment by topic
partition. We need to pass it as a parameter because this
- * method can be called multiple times
for heterogeneous assignment.
+ *
+ * @param memberIds The member ids to which the topic partitions
need to be assigned, should be non-empty.
+ * @param partitionsToAssign The subscribed topic partitions which needs
assignment.
+ * @param assignment The existing assignment by topic partition.
We need to pass it as a parameter because this
+ * method can be called multiple times for
heterogeneous assignment.
*/
// Visible for testing
void roundRobinAssignment(
@@ -305,73 +234,6 @@ public class SimpleAssignor implements
ShareGroupPartitionAssignor {
}
}
- /**
- * This functions assigns topic partitions to members by a round-robin
approach and updates the existing assignment.
- * @param memberIds The member ids to which the topic
partitions need to be assigned, should be non-empty.
- * @param partitionsToAssign The subscribed topic partitions which
need assignment.
- * @param assignment The existing assignment by topic
partition. We need to pass it as a parameter because this
- * method can be called multiple times
for heterogeneous assignment.
- * @param desiredAssignmentCount The number of partitions which can be
assigned to each member to give even balance.
- * Note that this number can be exceeded
by one to allow for situations
- * in which we have hashing collisions.
- */
- void roundRobinAssignmentWithCount(
- Collection<String> memberIds,
- List<TopicIdPartition> partitionsToAssign,
- Map<String, Set<TopicIdPartition>> assignment,
- int desiredAssignmentCount
- ) {
- Collection<String> memberIdsCopy = new LinkedHashSet<>(memberIds);
-
- // We iterate through the target partitions which are not in the
assignment and assign a memberId to them.
- // In case we run out of members (memberIds < partitionsToAssign), we
again start from the starting index of memberIds.
- Iterator<String> memberIdIterator = memberIdsCopy.iterator();
- ListIterator<TopicIdPartition> partitionListIterator =
partitionsToAssign.listIterator();
- while (partitionListIterator.hasNext()) {
- TopicIdPartition partition = partitionListIterator.next();
- if (!memberIdIterator.hasNext()) {
- memberIdIterator = memberIdsCopy.iterator();
- if (memberIdsCopy.isEmpty()) {
- // This should never happen, but guarding against an
infinite loop
- throw new PartitionAssignorException("Inconsistent number
of member IDs");
- }
- }
- String memberId = memberIdIterator.next();
- Set<TopicIdPartition> memberPartitions =
assignment.computeIfAbsent(memberId, k -> new HashSet<>());
- // We are prepared to add one more partition, even if the desired
assignment count is already reached.
- if (memberPartitions.size() <= desiredAssignmentCount) {
- memberPartitions.add(partition);
- } else {
- memberIdIterator.remove();
- partitionListIterator.previous();
- }
- }
- }
-
- private List<TopicIdPartition> computeTargetPartitions(
- GroupSpec groupSpec,
- Set<Uuid> subscribedTopicIds,
- SubscribedTopicDescriber subscribedTopicDescriber
- ) {
- List<TopicIdPartition> targetPartitions = new ArrayList<>();
- subscribedTopicIds.forEach(topicId -> {
- int numPartitions =
subscribedTopicDescriber.numPartitions(topicId);
- if (numPartitions == -1) {
- throw new PartitionAssignorException(
- "Members are subscribed to topic " + topicId
- + " which doesn't exist in the topic metadata."
- );
- }
-
- for (int partition = 0; partition < numPartitions; partition++) {
- if (groupSpec.isPartitionAssignable(topicId, partition)) {
- targetPartitions.add(new TopicIdPartition(topicId,
partition));
- }
- }
- });
- return targetPartitions;
- }
-
private GroupAssignment groupAssignment(
Map<String, Set<TopicIdPartition>> assignmentByMember,
Collection<String> allGroupMembers
@@ -390,8 +252,4 @@ public class SimpleAssignor implements
ShareGroupPartitionAssignor {
return new GroupAssignment(members);
}
-
- private static <K, V> HashMap<K, V> newHashMap(int numMappings) {
- return new HashMap<>((int) (((numMappings + 1) / 0.75f) + 1));
- }
}
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleHomogeneousAssignmentBuilder.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleHomogeneousAssignmentBuilder.java
new file mode 100644
index 00000000000..1d44a97cc2f
--- /dev/null
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleHomogeneousAssignmentBuilder.java
@@ -0,0 +1,385 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
+import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
+import
org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
+import org.apache.kafka.server.common.TopicIdPartition;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The homogeneous simple assignment builder is used to generate the target
assignment for a share group with
+ * all its members subscribed to the same set of topics.
+ * <p>
+ * Assignments are done according to the following principles:
+ * <ol>
+ * <li>Balance: Ensure partitions are distributed equally among all
members.
+ * The difference in assignments sizes between any two
members
+ * should not exceed one partition.</li>
+ * <li>Stickiness: Minimize partition movements among members by
retaining
+ * as much of the existing assignment as possible.</li>
+ * </ol>
+ * <p>
+ * Balance is prioritized above stickiness.
+ */
+public class SimpleHomogeneousAssignmentBuilder {
+
+ /**
+ * The list of all the topic Ids that the share group is subscribed to.
+ */
+ private final Set<Uuid> subscribedTopicIds;
+
+ /**
+ * The list of members in the consumer group.
+ */
+ private final List<String> memberIds;
+
+ /**
+ * Maps member ids to their indices in the memberIds list.
+ */
+ private final Map<String, Integer> memberIndices;
+
+ /**
+ * The list of all the topic-partitions assignable for the share group.
+ */
+ private final List<TopicIdPartition> targetPartitions;
+
+ /**
+ * The number of members in the share group.
+ */
+ private final int numGroupMembers;
+
+ /**
+ * The desired sharing for each target partition.
+ * For entirely balanced assignment, we would expect (numTargetPartitions
/ numGroupMembers) partitions per member, rounded upwards.
+ * That can be expressed as: Math.ceil(numTargetPartitions / (double)
numGroupMembers)
+ */
+ private final int desiredSharing;
+
+ /**
+ * The desired number of assignments for each share group member.
+ * <p>
+ * Members are stored as integer indices into the memberIds array.
+ */
+ private final int[] desiredAssignmentCount;
+
+ /**
+ * The share group assignment from the group metadata specification at the
start of the assignment operation.
+ * <p>
+ * Members are stored as integer indices into the memberIds array.
+ */
+ private final Map<Integer, Map<Uuid, Set<Integer>>> oldGroupAssignment;
+
+ /**
+ * The share group assignment calculated iteratively by the assignment
operation. Entries in this map override those
+ * in the old group assignment map.
+ * <p>
+ * Members are stored as integer indices into the memberIds array.
+ */
+ private final Map<Integer, Map<Uuid, Set<Integer>>> newGroupAssignment;
+
+ /**
+ * The final assignment keyed by topic-partition mapping to member.
+ * <p>
+ * Members are stored as integer indices into the memberIds array.
+ */
+ private final Map<TopicIdPartition, Set<Integer>>
finalAssignmentByPartition;
+
+ /**
+ * The final assignment keyed by member ID mapping to topic-partitions.
+ * <p>
+ * Members are stored as integer indices into the memberIds array.
+ */
+ private final Map<Integer, Set<TopicIdPartition>> finalAssignmentByMember;
+
+ /**
+ * The set of members which have too few assigned partitions.
+ * <p>
+ * Members are stored as integer indices into the memberIds array.
+ */
+ private final Set<Integer> unfilledMembers;
+
+ /**
+ * The set of members which have too many assigned partitions.
+ * <p>
+ * Members are stored as integer indices into the memberIds array.
+ */
+ private final Set<Integer> overfilledMembers;
+
+ public SimpleHomogeneousAssignmentBuilder(GroupSpec groupSpec,
SubscribedTopicDescriber subscribedTopicDescriber) {
+ this.subscribedTopicIds =
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()).subscribedTopicIds();
+
+ // Number the members 0 to M - 1.
+ this.numGroupMembers = groupSpec.memberIds().size();
+ this.memberIds = new ArrayList<>(groupSpec.memberIds());
+ this.memberIndices = AssignorHelpers.newHashMap(numGroupMembers);
+ for (int memberIndex = 0; memberIndex < numGroupMembers;
memberIndex++) {
+ memberIndices.put(memberIds.get(memberIndex), memberIndex);
+ }
+
+ this.targetPartitions =
AssignorHelpers.computeTargetPartitions(groupSpec, subscribedTopicIds,
subscribedTopicDescriber);
+
+ int numTargetPartitions = targetPartitions.size();
+ if (numTargetPartitions == 0) {
+ this.desiredSharing = 0;
+ } else {
+ this.desiredSharing = (numGroupMembers + numTargetPartitions - 1)
/ numTargetPartitions;
+ }
+ this.desiredAssignmentCount = new int[numGroupMembers];
+ this.oldGroupAssignment = AssignorHelpers.newHashMap(numGroupMembers);
+ this.newGroupAssignment = AssignorHelpers.newHashMap(numGroupMembers);
+ this.finalAssignmentByPartition =
AssignorHelpers.newHashMap(numTargetPartitions);
+ this.finalAssignmentByMember =
AssignorHelpers.newHashMap(numGroupMembers);
+ this.unfilledMembers = AssignorHelpers.newHashSet(numGroupMembers);
+ this.overfilledMembers = AssignorHelpers.newHashSet(numGroupMembers);
+
+ // Extract the old group assignment from the group metadata
specification.
+ groupSpec.memberIds().forEach(memberId -> {
+ int memberIndex = memberIndices.get(memberId);
+ oldGroupAssignment.put(memberIndex,
groupSpec.memberAssignment(memberId).partitions());
+ });
+
+ // Calculate the desired number of assignments for each member.
+ // The precise desired assignment count per target partition. This can
be a fractional number.
+ // We would expect (numGroupMembers / numTargetPartitions) assignments
per partition, rounded upwards.
+ // Using integer arithmetic: (numGroupMembers + numTargetPartitions -
1) / numTargetPartitions
+ double preciseDesiredAssignmentCount = desiredSharing *
numTargetPartitions / (double) numGroupMembers;
+ for (int memberIndex = 0; memberIndex < numGroupMembers;
memberIndex++) {
+ desiredAssignmentCount[memberIndex] =
+ (int) Math.ceil(preciseDesiredAssignmentCount * (double)
(memberIndex + 1)) -
+ (int) Math.ceil(preciseDesiredAssignmentCount * (double)
memberIndex);
+ }
+ }
+
+ /**
+ * Here's the step-by-step breakdown of the assignment process:
+ * <ol>
+ * <li>Revoke partitions from the existing assignment that are no longer
part of each member's subscriptions.</li>
+ * <li>Revoke partitions from members which have too many
partitions.</li>
+ * <li>Revoke any partitions which are shared more than desired.</li>
+ * <li>Assign any partitions which have insufficient members
assigned.</li>
+ * </ol>
+ */
+ public GroupAssignment build() {
+ if (subscribedTopicIds.isEmpty()) {
+ return new GroupAssignment(Map.of());
+ }
+
+ // The order of steps here is not that significant, but
assignRemainingPartitions must go last.
+ revokeUnassignablePartitions();
+
+ revokeOverfilledMembers();
+
+ revokeOversharedPartitions();
+
+ // Add in any partitions which are currently not in the assignment.
+ targetPartitions.forEach(topicPartition ->
finalAssignmentByPartition.computeIfAbsent(topicPartition, k -> new
HashSet<>()));
+
+ assignRemainingPartitions();
+
+ // Combine the old and the new group assignments to give the result.
+ Map<String, MemberAssignment> targetAssignment =
AssignorHelpers.newHashMap(numGroupMembers);
+ for (int memberIndex = 0; memberIndex < numGroupMembers;
memberIndex++) {
+ Map<Uuid, Set<Integer>> memberAssignment =
newGroupAssignment.get(memberIndex);
+ if (memberAssignment == null) {
+ targetAssignment.put(memberIds.get(memberIndex), new
MemberAssignmentImpl(oldGroupAssignment.get(memberIndex)));
+ } else {
+ targetAssignment.put(memberIds.get(memberIndex), new
MemberAssignmentImpl(memberAssignment));
+ }
+ }
+
+ return new GroupAssignment(targetAssignment);
+ }
+
+ /**
+ * Examine the members from the current assignment, making sure that no
member has too many assigned partitions.
+ * When looking at the current assignment, we need to only consider the
topics in the current assignment that are
+ * also being subscribed in the new assignment.
+ */
+ private void revokeUnassignablePartitions() {
+ for (Map.Entry<Integer, Map<Uuid, Set<Integer>>> entry :
oldGroupAssignment.entrySet()) {
+ Integer memberIndex = entry.getKey();
+ Map<Uuid, Set<Integer>> oldMemberAssignment = entry.getValue();
+ Map<Uuid, Set<Integer>> newMemberAssignment = null;
+ int memberAssignedPartitions = 0;
+ int desiredAssignmentCountForMember =
desiredAssignmentCount[memberIndex];
+
+ for (Map.Entry<Uuid, Set<Integer>> oldMemberPartitions :
oldMemberAssignment.entrySet()) {
+ Uuid topicId = oldMemberPartitions.getKey();
+ Set<Integer> assignedPartitions =
oldMemberPartitions.getValue();
+
+ if (subscribedTopicIds.contains(topicId)) {
+ for (int partition : assignedPartitions) {
+ TopicIdPartition topicPartition = new
TopicIdPartition(topicId, partition);
+ memberAssignedPartitions++;
+
finalAssignmentByPartition.computeIfAbsent(topicPartition, k -> new
HashSet<>()).add(memberIndex);
+ finalAssignmentByMember.computeIfAbsent(memberIndex, k
-> new HashSet<>()).add(topicPartition);
+ if (memberAssignedPartitions >=
desiredAssignmentCountForMember) {
+ if (newMemberAssignment == null) {
+ // If the new assignment is null, we create a
deep copy of the
+ // original assignment so that we can alter it.
+ newMemberAssignment =
AssignorHelpers.deepCopyAssignment(oldMemberAssignment);
+ }
+ }
+ }
+ } else {
+ if (newMemberAssignment == null) {
+ // If the new member assignment is null, we create a
deep copy of the
+ // original assignment so we can alter it.
+ newMemberAssignment =
AssignorHelpers.deepCopyAssignment(oldMemberAssignment);
+ }
+ // Remove the entire topic.
+ newMemberAssignment.remove(topicId);
+ }
+ }
+
+ if (newMemberAssignment != null) {
+ newGroupAssignment.put(memberIndex, newMemberAssignment);
+ }
+
+ if (memberAssignedPartitions < desiredAssignmentCountForMember) {
+ unfilledMembers.add(memberIndex);
+ } else if (memberAssignedPartitions >
desiredAssignmentCountForMember) {
+ overfilledMembers.add(memberIndex);
+ }
+ }
+ }
+
+ /**
+ * Revoke partitions from members which are overfilled.
+ */
+ private void revokeOverfilledMembers() {
+ if (overfilledMembers.isEmpty())
+ return;
+
+ overfilledMembers.forEach(memberIndex -> {
+ int memberDesiredAssignmentCount =
desiredAssignmentCount[memberIndex];
+ Set<TopicIdPartition> memberFinalAssignment =
finalAssignmentByMember.get(memberIndex);
+ if (memberFinalAssignment.size() > memberDesiredAssignmentCount) {
+ Iterator<TopicIdPartition> iterator =
memberFinalAssignment.iterator();
+ while (iterator.hasNext()) {
+ TopicIdPartition topicPartition = iterator.next();
+
newGroupAssignment.get(memberIndex).get(topicPartition.topicId()).remove(topicPartition.partitionId());
+
finalAssignmentByPartition.get(topicPartition).remove(memberIndex);
+ iterator.remove();
+ if (memberFinalAssignment.size() ==
memberDesiredAssignmentCount) {
+ break;
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Revoke any over-shared partitions.
+ */
+ private void revokeOversharedPartitions() {
+ finalAssignmentByPartition.forEach((topicPartition, assignedMembers)
-> {
+ int assignedMemberCount = assignedMembers.size();
+ if (assignedMemberCount > desiredSharing) {
+ Iterator<Integer> assignedMemberIterator =
assignedMembers.iterator();
+ while (assignedMemberIterator.hasNext()) {
+ Integer memberIndex = assignedMemberIterator.next();
+ Map<Uuid, Set<Integer>> newMemberAssignment =
newGroupAssignment.get(memberIndex);
+ if (newMemberAssignment == null) {
+ newMemberAssignment =
AssignorHelpers.deepCopyAssignment(oldGroupAssignment.get(memberIndex));
+ newGroupAssignment.put(memberIndex,
newMemberAssignment);
+ }
+ Set<Integer> partitions =
newMemberAssignment.get(topicPartition.topicId());
+ if (partitions != null) {
+ if (partitions.remove(topicPartition.partitionId())) {
+ assignedMemberCount--;
+ assignedMemberIterator.remove();
+
finalAssignmentByMember.get(memberIndex).remove(topicPartition);
+ unfilledMembers.add(memberIndex);
+ }
+ }
+ if (assignedMemberCount <= desiredSharing) {
+ break;
+ }
+ }
+ }
+ });
+ }
+
+ /**
+ * Assign partitions to unfilled members. It repeatedly iterates through
the unfilled members while running
+ * once thrown the set of partitions. When a partition is found that has
insufficient sharing, it attempts to assign
+ * to one of the partitions.
+ * <p>
+ * There is one tricky cases here and that's where a partition wants
another assignment, but none of the unfilled
+ * members are able to take it (because they already have that partition).
In this situation, we just accept that
+ * no additional assignments for this partition could be made and carry
on. In theory, a different shuffling of the
+ * partitions would be able to achieve better balance, but it's harmless
tolerating a slight imbalance in this case.
+ * <p>
+ * Note that finalAssignmentByMember is not maintained by this method
which is expected to be the final step in the
+ * computation.
+ */
+ private void assignRemainingPartitions() {
+ if (unfilledMembers.isEmpty())
+ return;
+
+ Iterator<Integer> memberIterator = unfilledMembers.iterator();
+ boolean partitionAssignedForThisIterator = false;
+ for (Map.Entry<TopicIdPartition, Set<Integer>> partitionAssignment :
finalAssignmentByPartition.entrySet()) {
+ TopicIdPartition topicPartition = partitionAssignment.getKey();
+ Set<Integer> membersAssigned = partitionAssignment.getValue();
+
+ if (membersAssigned.size() < desiredSharing) {
+ int assignmentsToMake = desiredSharing -
membersAssigned.size();
+ while (assignmentsToMake > 0) {
+ if (!memberIterator.hasNext()) {
+ if (!partitionAssignedForThisIterator) {
+ break;
+ }
+ memberIterator = unfilledMembers.iterator();
+ partitionAssignedForThisIterator = false;
+ }
+ int memberIndex = memberIterator.next();
+ if (!membersAssigned.contains(memberIndex)) {
+ Map<Uuid, Set<Integer>> newMemberAssignment =
newGroupAssignment.get(memberIndex);
+ if (newMemberAssignment == null) {
+ newMemberAssignment =
AssignorHelpers.deepCopyAssignment(oldGroupAssignment.get(memberIndex));
+ newGroupAssignment.put(memberIndex,
newMemberAssignment);
+ }
+
newMemberAssignment.computeIfAbsent(topicPartition.topicId(), k -> new
HashSet<>()).add(topicPartition.partitionId());
+ finalAssignmentByMember.computeIfAbsent(memberIndex, k
-> new HashSet<>()).add(topicPartition);
+ assignmentsToMake--;
+ partitionAssignedForThisIterator = true;
+ if (finalAssignmentByMember.get(memberIndex).size() >=
desiredAssignmentCount[memberIndex]) {
+ memberIterator.remove();
+ }
+ }
+ }
+ }
+
+ if (unfilledMembers.isEmpty()) {
+ break;
+ }
+ }
+ }
+}
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
index 498620248a8..7550ad400e7 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java
@@ -202,23 +202,7 @@ public class SimpleAssignorTest {
subscribedTopicMetadata
);
- // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
- // Step 1 -> T1:0 -> MEMBER_A and T1:1 -> MEMBER_B by hash assignment.
- // Step 2 -> T1:2, T3:1 -> MEMBER_A and T3:0 -> MEMBER_B by
round-robin assignment.
- // Step 3 -> no new assignment gets added by current assignment since
it is empty.
- Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new
HashMap<>();
- expectedAssignment.put(MEMBER_A, mkAssignment(
- mkTopicAssignment(TOPIC_1_UUID, 0, 2),
- mkTopicAssignment(TOPIC_3_UUID, 1)
- ));
- expectedAssignment.put(MEMBER_B, mkAssignment(
- mkTopicAssignment(TOPIC_1_UUID, 1),
- mkTopicAssignment(TOPIC_3_UUID, 0)
- ));
-
- // T1: 3 partitions + T3: 2 partitions = 5 partitions
assertEveryPartitionGetsAssignment(5, computedAssignment);
- assertAssignment(expectedAssignment, computedAssignment);
}
@Test
@@ -268,23 +252,7 @@ public class SimpleAssignorTest {
subscribedTopicMetadata
);
- // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
- // Step 1 -> T1:0 -> MEMBER_A and T1:1 -> MEMBER_B by hash assignment.
- // Step 2 -> T1:2, T3:1 -> MEMBER_A and T3:0 -> MEMBER_B by
round-robin assignment.
- // Step 3 -> no new assignment gets added by current assignment since
it is empty.
- Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new
HashMap<>();
- expectedAssignment.put(MEMBER_A, mkAssignment(
- mkTopicAssignment(TOPIC_1_UUID, 0, 2),
- mkTopicAssignment(TOPIC_3_UUID, 1)
- ));
- expectedAssignment.put(MEMBER_B, mkAssignment(
- mkTopicAssignment(TOPIC_1_UUID, 1),
- mkTopicAssignment(TOPIC_3_UUID, 0)
- ));
-
- // T1: 3 partitions + T3: 2 partitions = 5 partitions
assertEveryPartitionGetsAssignment(5, computedAssignment);
- assertAssignment(expectedAssignment, computedAssignment);
}
@Test
@@ -332,17 +300,7 @@ public class SimpleAssignorTest {
subscribedTopicMetadata
);
- Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new
HashMap<>();
- expectedAssignment.put(MEMBER_A, mkAssignment(
- mkTopicAssignment(TOPIC_1_UUID, 1, 2)
- ));
- expectedAssignment.put(MEMBER_B, mkAssignment(
- mkTopicAssignment(TOPIC_1_UUID, 0)
- ));
-
- // T1: 3 partitions + T3(non-assignable): 2 partitions = 3 partitions
assertEveryPartitionGetsAssignment(3, computedAssignment);
- assertAssignment(expectedAssignment, computedAssignment);
}
@Test
@@ -675,182 +633,121 @@ public class SimpleAssignorTest {
}
@Test
- public void testRoundRobinAssignmentWithCount() {
- String member1 = "member1";
- String member2 = "member2";
- List<String> members = List.of(member1, member2);
- TopicIdPartition partition1 = new TopicIdPartition(TOPIC_1_UUID, 0);
- TopicIdPartition partition2 = new TopicIdPartition(TOPIC_2_UUID, 0);
- TopicIdPartition partition3 = new TopicIdPartition(TOPIC_3_UUID, 0);
- TopicIdPartition partition4 = new TopicIdPartition(TOPIC_4_UUID, 0);
- List<TopicIdPartition> unassignedPartitions = List.of(partition2,
partition3, partition4);
+ public void testIncrementalAssignmentIncreasingMembersHomogeneous() {
+ final int numPartitions = 24;
+ final int numMembers = 101;
- Map<String, Set<TopicIdPartition>> assignment = new HashMap<>();
- assignment.put(member1, new HashSet<>(Set.of(partition1)));
- assignment.put(member2, new HashSet<>(Set.of(partition1)));
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(TOPIC_1_UUID, TOPIC_1_NAME, numPartitions)
+ .build();
- assignor.roundRobinAssignmentWithCount(members, unassignedPartitions,
assignment, 2);
- Map<String, Set<TopicIdPartition>> expectedAssignment = Map.of(
- member1, Set.of(partition1, partition2, partition4),
- member2, Set.of(partition1, partition3)
+ SubscribedTopicDescriberImpl subscribedTopicMetadata = new
SubscribedTopicDescriberImpl(
+ metadataImage
);
- assertFinalAssignment(expectedAssignment, assignment);
- }
+ Set<Uuid> topicsSubscription = new LinkedHashSet<>();
+ topicsSubscription.add(TOPIC_1_UUID);
+ Map<String, MemberSubscriptionAndAssignmentImpl> members = new
HashMap<>();
- @Test
- public void testRoundRobinAssignmentWithCountTooManyPartitions() {
- String member1 = "member1";
- String member2 = "member2";
- List<String> members = List.of(member1, member2);
- TopicIdPartition partition1 = new TopicIdPartition(TOPIC_1_UUID, 0);
- TopicIdPartition partition2 = new TopicIdPartition(TOPIC_2_UUID, 0);
- TopicIdPartition partition3 = new TopicIdPartition(TOPIC_3_UUID, 0);
- TopicIdPartition partition4 = new TopicIdPartition(TOPIC_4_UUID, 0);
- TopicIdPartition partition5 = new TopicIdPartition(TOPIC_4_UUID, 1);
- TopicIdPartition partition6 = new TopicIdPartition(TOPIC_4_UUID, 2);
- List<TopicIdPartition> unassignedPartitions = List.of(partition2,
partition3, partition4, partition5, partition6);
+ SimpleAssignor assignor = new SimpleAssignor();
- Map<String, Set<TopicIdPartition>> assignment = new HashMap<>();
- assignment.put(member1, new HashSet<>(Set.of(partition1)));
- assignment.put(member2, new HashSet<>(Set.of(partition1)));
+ // Increase the number of members one a time, checking that the
partitions are assigned as expected
+ for (int member = 0; member < numMembers; member++) {
+ String newMemberId = "M" + member;
+ members.put(newMemberId, new MemberSubscriptionAndAssignmentImpl(
+ Optional.empty(),
+ Optional.empty(),
+ topicsSubscription,
+ Assignment.EMPTY
+ ));
- assertThrows(PartitionAssignorException.class,
- () -> assignor.roundRobinAssignmentWithCount(members,
unassignedPartitions, assignment, 2));
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
+ HOMOGENEOUS,
+ new HashMap<>()
+ );
+
+ GroupAssignment computedAssignment = assignor.assign(groupSpec,
subscribedTopicMetadata);
+ assertEveryPartitionGetsAssignment(numPartitions,
computedAssignment);
+
+ computedAssignment.members().forEach((memberId, partitions) ->
members.put(memberId, new MemberSubscriptionAndAssignmentImpl(
+ Optional.empty(),
+ Optional.empty(),
+ topicsSubscription,
+ new Assignment(partitions.partitions())
+ )));
+ }
}
@Test
- public void testAssignWithCurrentAssignmentHomogeneous() {
- // Current assignment setup - Two members A, B subscribing to T1 and
T2.
- MetadataImage metadataImage1 = new MetadataImageBuilder()
- .addTopic(TOPIC_1_UUID, TOPIC_1_NAME, 3)
- .addTopic(TOPIC_2_UUID, TOPIC_2_NAME, 2)
+ public void testIncrementalAssignmentDecreasingMembersHomogeneous() {
+ final int numPartitions = 24;
+ final int numMembers = 101;
+
+ MetadataImage metadataImage = new MetadataImageBuilder()
+ .addTopic(TOPIC_1_UUID, TOPIC_1_NAME, numPartitions)
.build();
- Map<String, MemberSubscriptionAndAssignmentImpl> members1 = new
HashMap<>();
+ SubscribedTopicDescriberImpl subscribedTopicMetadata = new
SubscribedTopicDescriberImpl(
+ metadataImage
+ );
- Set<Uuid> topicsSubscription1 = new LinkedHashSet<>();
- topicsSubscription1.add(TOPIC_1_UUID);
- topicsSubscription1.add(TOPIC_2_UUID);
+ Set<Uuid> topicsSubscription = new LinkedHashSet<>();
+ topicsSubscription.add(TOPIC_1_UUID);
+ Map<String, MemberSubscriptionAndAssignmentImpl> members = new
HashMap<>();
- members1.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
- Optional.empty(),
- Optional.empty(),
- topicsSubscription1,
- Assignment.EMPTY
- ));
+ SimpleAssignor assignor = new SimpleAssignor();
- members1.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
- Optional.empty(),
- Optional.empty(),
- topicsSubscription1,
- Assignment.EMPTY
- ));
+ for (int member = 0; member < numMembers; member++) {
+ String newMemberId = "M" + member;
+ members.put(newMemberId, new MemberSubscriptionAndAssignmentImpl(
+ Optional.empty(),
+ Optional.empty(),
+ topicsSubscription,
+ Assignment.EMPTY
+ ));
+ }
- GroupSpec groupSpec1 = new GroupSpecImpl(
- members1,
+ GroupSpec groupSpec = new GroupSpecImpl(
+ members,
HOMOGENEOUS,
- Map.of()
- );
- SubscribedTopicDescriberImpl subscribedTopicMetadata1 = new
SubscribedTopicDescriberImpl(
- metadataImage1
+ new HashMap<>()
);
- GroupAssignment computedAssignment1 = assignor.assign(
- groupSpec1,
- subscribedTopicMetadata1
- );
-
- // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
- // Step 1 -> T1:0 -> MEMBER_A and T1:1 -> MEMBER_B by hash assignment.
- // Step 2 -> T1:2, T2:1 -> MEMBER_A and T2:0 -> MEMBER_B by
round-robin assignment.
- // Step 3 -> no new assignment gets added by current assignment since
it is empty.
- Map<String, Map<Uuid, Set<Integer>>> expectedAssignment1 = new
HashMap<>();
- expectedAssignment1.put(MEMBER_A, mkAssignment(
- mkTopicAssignment(TOPIC_1_UUID, 0, 2),
- mkTopicAssignment(TOPIC_2_UUID, 1)
- ));
- expectedAssignment1.put(MEMBER_B, mkAssignment(
- mkTopicAssignment(TOPIC_1_UUID, 1),
- mkTopicAssignment(TOPIC_2_UUID, 0)
- ));
-
- // T1: 3 partitions + T2: 2 partitions = 5 partitions
- assertEveryPartitionGetsAssignment(5, computedAssignment1);
- assertAssignment(expectedAssignment1, computedAssignment1);
-
- // New assignment setup - Three members A, B, C subscribing to T2 and
T3.
- MetadataImage metadataImage2 = new MetadataImageBuilder()
- .addTopic(TOPIC_2_UUID, TOPIC_2_NAME, 2)
- .addTopic(TOPIC_3_UUID, TOPIC_3_NAME, 3)
- .build();
-
- Map<String, MemberSubscriptionAndAssignmentImpl> members2 = new
HashMap<>();
+ GroupAssignment computedAssignment = assignor.assign(groupSpec,
subscribedTopicMetadata);
+ assertEveryPartitionGetsAssignment(numPartitions, computedAssignment);
- Set<Uuid> topicsSubscription2 = new LinkedHashSet<>();
- topicsSubscription2.add(TOPIC_2_UUID);
- topicsSubscription2.add(TOPIC_3_UUID);
-
- members2.put(MEMBER_A, new MemberSubscriptionAndAssignmentImpl(
- Optional.empty(),
- Optional.empty(),
- topicsSubscription2,
- // Utilizing the assignment from current assignment
- new Assignment(mkAssignment(
- mkTopicAssignment(TOPIC_1_UUID, 0, 2),
- mkTopicAssignment(TOPIC_2_UUID, 1)))
- ));
-
- members2.put(MEMBER_B, new MemberSubscriptionAndAssignmentImpl(
- Optional.empty(),
- Optional.empty(),
- topicsSubscription2,
- new Assignment(mkAssignment(
- mkTopicAssignment(TOPIC_1_UUID, 1),
- mkTopicAssignment(TOPIC_2_UUID, 0)))
- ));
+ for (int member = 0; member < numMembers; member++) {
+ String newMemberId = "M" + member;
+ members.put(newMemberId, new MemberSubscriptionAndAssignmentImpl(
+ Optional.empty(),
+ Optional.empty(),
+ topicsSubscription,
+ new
Assignment(computedAssignment.members().get(newMemberId).partitions()))
+ );
+ }
- members2.put(MEMBER_C, new MemberSubscriptionAndAssignmentImpl(
- Optional.empty(),
- Optional.empty(),
- topicsSubscription2,
- Assignment.EMPTY
- ));
+ // Decrease the number of members one a time, checking that the
partitions are assigned as expected
+ for (int member = numMembers - 1; member > 0; member--) {
+ String newMemberId = "M" + member;
+ members.remove(newMemberId);
- GroupSpec groupSpec2 = new GroupSpecImpl(
- members2,
- HOMOGENEOUS,
- Map.of()
- );
- SubscribedTopicDescriberImpl subscribedTopicMetadata2 = new
SubscribedTopicDescriberImpl(
- metadataImage2
- );
-
- GroupAssignment computedAssignment2 = assignor.assign(
- groupSpec2,
- subscribedTopicMetadata2
- );
+ groupSpec = new GroupSpecImpl(
+ members,
+ HOMOGENEOUS,
+ new HashMap<>()
+ );
- // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of
MEMBER_C is 67.
- // Step 1 -> T2:0 -> MEMBER_A, T2:1 -> MEMBER_B, T3:0 -> MEMBER_C by
hash assignment
- // Step 2 -> T3:1 -> MEMBER_A, T3:2 -> MEMBER_B by round-robin
assignment
- // Step 3 -> no new addition by current assignment since T2:0 and T2:1
were already a part of new assignment.
- Map<String, Map<Uuid, Set<Integer>>> expectedAssignment2 = new
HashMap<>();
- expectedAssignment2.put(MEMBER_A, mkAssignment(
- mkTopicAssignment(TOPIC_2_UUID, 0),
- mkTopicAssignment(TOPIC_3_UUID, 1)
- ));
- expectedAssignment2.put(MEMBER_B, mkAssignment(
- mkTopicAssignment(TOPIC_2_UUID, 1),
- mkTopicAssignment(TOPIC_3_UUID, 2)
- ));
- expectedAssignment2.put(MEMBER_C, mkAssignment(
- mkTopicAssignment(TOPIC_3_UUID, 0)
- ));
+ computedAssignment = assignor.assign(groupSpec,
subscribedTopicMetadata);
+ assertEveryPartitionGetsAssignment(numPartitions,
computedAssignment);
- // T2: 2 partitions + T3: 3 partitions = 5 partitions
- assertEveryPartitionGetsAssignment(5, computedAssignment2);
- assertAssignment(expectedAssignment2, computedAssignment2);
+ computedAssignment.members().forEach((memberId, partitions) ->
members.put(memberId, new MemberSubscriptionAndAssignmentImpl(
+ Optional.empty(),
+ Optional.empty(),
+ topicsSubscription,
+ new Assignment(partitions.partitions())
+ )));
+ }
}
@Test
@@ -905,25 +802,7 @@ public class SimpleAssignorTest {
subscribedTopicMetadata1
);
- // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66. Hashcode of
MEMBER_C is 67.
- // Step 1 -> T2:2 -> member_A, T3:0 -> member_B, T2:2 -> member_C by
hash assignment.
- // Step 2 -> T1:0, T1:1, T1:2, T2:0 -> member_A, T3:1, -> member_B,
T2:1 -> member_C by round-robin assignment.
- // Step 3 -> no new assignment gets added by current assignment since
it is empty.
- Map<String, Map<Uuid, Set<Integer>>> expectedAssignment1 = new
HashMap<>();
- expectedAssignment1.put(MEMBER_A, mkAssignment(
- mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
- mkTopicAssignment(TOPIC_2_UUID, 0, 2)
- ));
- expectedAssignment1.put(MEMBER_B, mkAssignment(
- mkTopicAssignment(TOPIC_3_UUID, 0, 1)
- ));
- expectedAssignment1.put(MEMBER_C, mkAssignment(
- mkTopicAssignment(TOPIC_2_UUID, 1, 2)
- ));
-
- // T1: 3 partitions + T2: 3 partitions + T3: 2 partitions = 8
partitions
assertEveryPartitionGetsAssignment(8, computedAssignment1);
- assertAssignment(expectedAssignment1, computedAssignment1);
// New assignment setup - 2 members A - {T1, T2, T3}, B - {T3, T4}.
@@ -977,23 +856,7 @@ public class SimpleAssignorTest {
subscribedTopicMetadata2
);
- // Hashcode of MEMBER_A is 65. Hashcode of MEMBER_B is 66.
- // Step 1 -> T1:1 -> member_A, T3:0 -> member_B by hash assignment.
- // Step 2 -> T2:1 -> member_A, T4:0 -> member_B by round-robin
assignment.
- // Step 3 -> T1:0, T1:2, T2:0 -> member_A, T3:1 -> member_B by
current assignment.
- Map<String, Map<Uuid, Set<Integer>>> expectedAssignment2 = new
HashMap<>();
- expectedAssignment2.put(MEMBER_A, mkAssignment(
- mkTopicAssignment(TOPIC_1_UUID, 0, 1, 2),
- mkTopicAssignment(TOPIC_2_UUID, 0, 1, 2)
- ));
- expectedAssignment2.put(MEMBER_B, mkAssignment(
- mkTopicAssignment(TOPIC_3_UUID, 0, 1),
- mkTopicAssignment(TOPIC_4_UUID, 0)
- ));
-
- // T1: 3 partitions + T2: 3 partitions + T3: 2 partitions + T4: 1
partition = 9 partitions
assertEveryPartitionGetsAssignment(9, computedAssignment2);
- assertAssignment(expectedAssignment2, computedAssignment2);
}
private void assertAssignment(
@@ -1019,18 +882,6 @@ public class SimpleAssignorTest {
});
}
- private void assertFinalAssignment(
- Map<String, Set<TopicIdPartition>> expectedAssignment,
- Map<String, Set<TopicIdPartition>> computedAssignment
- ) {
- assertEquals(expectedAssignment.size(), computedAssignment.size());
- expectedAssignment.forEach((memberId, partitions) -> {
- Set<TopicIdPartition> computedPartitions =
computedAssignment.getOrDefault(memberId, Set.of());
- assertEquals(partitions.size(), computedPartitions.size());
- partitions.forEach(member ->
assertTrue(computedPartitions.contains(member)));
- });
- }
-
private void assertEveryPartitionGetsAssignment(
int expectedPartitions,
GroupAssignment computedGroupAssignment
diff --git
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ShareGroupAssignorBenchmark.java
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ShareGroupAssignorBenchmark.java
index 6c6db6d75b1..8febd4f0488 100644
---
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ShareGroupAssignorBenchmark.java
+++
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ShareGroupAssignorBenchmark.java
@@ -95,7 +95,7 @@ public class ShareGroupAssignorBenchmark {
@Param({"1", "10", "100"})
private int partitionCount;
- @Param({"10", "100"})
+ @Param({"1", "10", "100"})
private int topicCount;
@Param({"HOMOGENEOUS", "HETEROGENEOUS"})