This is an automated email from the ASF dual-hosted git repository. dajac pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push: new 92ed1ed5869 KAFKA-16864; Optimize uniform (homogenous) assignor (#16088) 92ed1ed5869 is described below commit 92ed1ed58694d1df9df386a286bb856dbefb683a Author: David Jacot <dja...@confluent.io> AuthorDate: Fri May 31 22:17:59 2024 +0200 KAFKA-16864; Optimize uniform (homogenous) assignor (#16088) This patch optimizes uniform (homogenous) assignor by avoiding creating a copy of all the assignments. Instead, the assignor creates a copy only if the assignment is updated. It is a sort of copy-on-write. This change reduces the overhead of the TargetAssignmentBuilder when ran with the uniform (homogenous) assignor. Trunk: ``` Benchmark (memberCount) (partitionsToMemberRatio) (topicCount) Mode Cnt Score Error Units TargetAssignmentBuilderBenchmark.build 10000 10 100 avgt 5 24.535 ± 1.583 ms/op TargetAssignmentBuilderBenchmark.build 10000 10 1000 avgt 5 24.094 ± 0.223 ms/op JMH benchmarks done ``` ``` Benchmark (assignmentType) (assignorType) (isRackAware) (memberCount) (partitionsToMemberRatio) (subscriptionType) (topicCount) Mode Cnt Score Error Units ServerSideAssignorBenchmark.doAssignment INCREMENTAL UNIFORM false 10000 10 HOMOGENEOUS 100 avgt 5 14.697 ± 0.133 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL UNIFORM false 10000 10 HOMOGENEOUS 1000 avgt 5 15.073 ± 0.135 ms/op JMH benchmarks done ``` Patch: ``` Benchmark (memberCount) (partitionsToMemberRatio) (topicCount) Mode Cnt Score Error Units TargetAssignmentBuilderBenchmark.build 10000 10 100 avgt 5 3.376 ± 0.577 ms/op TargetAssignmentBuilderBenchmark.build 10000 10 1000 avgt 5 3.731 ± 0.359 ms/op JMH benchmarks done ``` ``` Benchmark (assignmentType) (assignorType) (isRackAware) (memberCount) (partitionsToMemberRatio) (subscriptionType) (topicCount) Mode Cnt Score Error Units ServerSideAssignorBenchmark.doAssignment INCREMENTAL UNIFORM false 10000 10 HOMOGENEOUS 100 avgt 5 1.975 ± 0.086 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL UNIFORM false 10000 10 HOMOGENEOUS 1000 avgt 5 2.026 ± 0.190 ms/op JMH benchmarks done ``` Reviewers: Ritika Reddy <rre...@confluent.io>, Jeff Kim <jeff....@confluent.io>, Justine Olshan <jols...@confluent.io> --- .../OptimizedUniformAssignmentBuilder.java | 370 ++++++++------------- .../group/assignor/UniformAssignor.java | 10 +- .../coordinator/group/AssignmentTestUtil.java | 17 +- .../group/CoordinatorRecordHelpersTest.java | 18 +- .../OptimizedUniformAssignmentBuilderTest.java | 123 +++---- .../jmh/assignor/ServerSideAssignorBenchmark.java | 2 +- .../assignor/TargetAssignmentBuilderBenchmark.java | 5 +- 7 files changed, 223 insertions(+), 322 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java index 3ea1361d699..34e82566520 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilder.java @@ -18,29 +18,18 @@ package org.apache.kafka.coordinator.group.assignor; import org.apache.kafka.common.Uuid; import org.apache.kafka.server.common.TopicIdPartition; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; -import java.util.Queue; import java.util.Set; -import java.util.stream.Collectors; -import java.util.stream.IntStream; - -import static java.lang.Math.min; /** - * The optimized uniform assignment builder is used to generate the target assignment for a consumer group with + * The homogenous uniform assignment builder is used to generate the target assignment for a consumer group with * all its members subscribed to the same set of topics. - * It is optimized since the assignment can be done in fewer, less complicated steps compared to when - * the subscriptions are different across the members. * * Assignments are done according to the following principles: * @@ -53,8 +42,17 @@ import static java.lang.Math.min; * The assignment builder prioritizes the properties in the following order: * Balance > Stickiness. */ -public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignmentBuilder { - private static final Logger LOG = LoggerFactory.getLogger(OptimizedUniformAssignmentBuilder.class); +public class OptimizedUniformAssignmentBuilder { + private static final Class<?> UNMODIFIABLE_MAP_CLASS = Collections.unmodifiableMap(new HashMap<>()).getClass(); + private static final Class<?> EMPTY_MAP_CLASS = Collections.emptyMap().getClass(); + + /** + * @return True if the provided map is an UnmodifiableMap or EmptyMap. Those classes are not + * public hence we cannot use the `instanceof` operator. + */ + private static boolean isImmutableMap(Map<?, ?> map) { + return UNMODIFIABLE_MAP_CLASS.isInstance(map) || EMPTY_MAP_CLASS.isInstance(map); + } /** * The assignment specification which includes member metadata. @@ -72,62 +70,53 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment private final Set<Uuid> subscribedTopicIds; /** - * The number of members to receive an extra partition beyond the minimum quota. - * Minimum Quota = Total Partitions / Total Members - * Example: If there are 11 partitions to be distributed among 3 members, - * each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 3) members get an extra partition. + * The members that are below their quota. */ - private int remainingMembersToGetAnExtraPartition; - - /** - * Members mapped to the remaining number of partitions needed to meet the minimum quota. - * Minimum quota = total partitions / total members. - */ - private Map<String, Integer> potentiallyUnfilledMembers; + private final List<MemberWithRemainingQuota> unfilledMembers; /** * The partitions that still need to be assigned. * Initially this contains all the subscribed topics' partitions. */ - private final Set<TopicIdPartition> unassignedPartitions; + private final List<TopicIdPartition> unassignedPartitions; /** * The target assignment. */ private final Map<String, MemberAssignment> targetAssignment; + /** + * The minimum number of partitions that a member must have. + * Minimum quota = total partitions / total members. + */ + private int minimumMemberQuota; + + /** + * The number of members to receive an extra partition beyond the minimum quota. + * Example: If there are 11 partitions to be distributed among 3 members, + * each member gets 3 (11 / 3) [minQuota] partitions and 2 (11 % 3) members get an extra partition. + */ + private int remainingMembersToGetAnExtraPartition; + OptimizedUniformAssignmentBuilder(GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber) { this.groupSpec = groupSpec; this.subscribedTopicDescriber = subscribedTopicDescriber; this.subscribedTopicIds = new HashSet<>(groupSpec.members().values().iterator().next().subscribedTopicIds()); - this.potentiallyUnfilledMembers = new HashMap<>(); - this.unassignedPartitions = new HashSet<>(); + this.unfilledMembers = new ArrayList<>(); + this.unassignedPartitions = new ArrayList<>(); this.targetAssignment = new HashMap<>(); } /** - * Here's the step-by-step breakdown of the assignment process: - * - * <li> Compute the quotas of partitions for each member based on the total partitions and member count.</li> - * <li> Initialize unassigned partitions with all the topic partitions that aren't present in the - * current target assignment.</li> - * <li> For existing assignments, retain partitions based on the determined quota. Add extras to unassigned partitions.</li> - * <li> Identify members that haven't fulfilled their partition quota or are eligible to receive extra partitions.</li> - * <li> Proceed with a round-robin assignment according to quotas. - * For each unassigned partition, locate the first compatible member from the potentially unfilled list.</li> + * Compute the new assignment for the group. */ - @Override - protected GroupAssignment buildAssignment() throws PartitionAssignorException { - int totalPartitionsCount = 0; - + public GroupAssignment build() throws PartitionAssignorException { if (subscribedTopicIds.isEmpty()) { - LOG.debug("The subscription list is empty, returning an empty assignment"); return new GroupAssignment(Collections.emptyMap()); } - // Check if the subscribed topicId is still valid. - // Update unassigned partitions based on the current target assignment - // and topic metadata. + // Compute the list of unassigned partitions. + int totalPartitionsCount = 0; for (Uuid topicId : subscribedTopicIds) { int partitionCount = subscribedTopicDescriber.numPartitions(topicId); if (partitionCount == -1) { @@ -144,216 +133,149 @@ public class OptimizedUniformAssignmentBuilder extends AbstractUniformAssignment } } - // The minimum required quota that each member needs to meet for a balanced assignment. - // This is the same for all members. - final int numberOfMembers = groupSpec.members().size(); - final int minQuota = totalPartitionsCount / numberOfMembers; + // Compute the minimum required quota per member and the number of members + // that should receive an extra partition. + int numberOfMembers = groupSpec.members().size(); + minimumMemberQuota = totalPartitionsCount / numberOfMembers; remainingMembersToGetAnExtraPartition = totalPartitionsCount % numberOfMembers; - groupSpec.members().keySet().forEach(memberId -> - targetAssignment.put(memberId, new MemberAssignment(new HashMap<>()) - )); - - potentiallyUnfilledMembers = assignStickyPartitions(minQuota); + // Revoke the partitions that either are not part of the member's subscriptions or + // exceed the maximum quota assigned to each member. + maybeRevokePartitions(); - unassignedPartitionsRoundRobinAssignment(); - - if (!unassignedPartitions.isEmpty()) { - throw new PartitionAssignorException("Partitions were left unassigned"); - } + // Assign the unassigned partitions to the members with space. + assignRemainingPartitions(); return new GroupAssignment(targetAssignment); } /** - * Retains a set of partitions from the existing assignment and includes them in the target assignment. - * Only relevant partitions that exist in the current topic metadata and subscriptions are considered. + * Revoke the partitions that either are not part of the member's subscriptions or + * exceed the maximum quota assigned to each member. * - * <p> For each member: - * <ol> - * <li> Find the valid current assignment considering topic subscriptions and metadata</li> - * <li> If the current assignment exists, retain partitions up to the minimum quota.</li> - * <li> If the current assignment size is greater than the minimum quota and - * there are members that could get an extra partition, assign the next partition as well.</li> - * <li> Finally, if the member's current assignment size is less than the minimum quota, - * add them to the potentially unfilled members map and track the number of remaining - * partitions required to meet the quota.</li> - * </ol> - * </p> - * - * @return Members mapped to the remaining number of partitions needed to meet the minimum quota, - * including members that are eligible to receive an extra partition. + * This method ensures that the original assignment is not copied if it is not + * altered. */ - private Map<String, Integer> assignStickyPartitions(int minQuota) { - Map<String, Integer> potentiallyUnfilledMembers = new HashMap<>(); - - groupSpec.members().forEach((memberId, assignmentMemberSpec) -> { - List<TopicIdPartition> validCurrentMemberAssignment = validCurrentMemberAssignment( - assignmentMemberSpec.assignedPartitions() - ); - - int currentAssignmentSize = validCurrentMemberAssignment.size(); - // Number of partitions required to meet the minimum quota. - int remaining = minQuota - currentAssignmentSize; - - if (currentAssignmentSize > 0) { - int retainedPartitionsCount = min(currentAssignmentSize, minQuota); - IntStream.range(0, retainedPartitionsCount).forEach(i -> { - TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(i); - addPartitionToAssignment( - targetAssignment, - memberId, - topicIdPartition.topicId(), - topicIdPartition.partitionId() - ); - }); - - if (remaining < 0) { - // The extra partition is located at the last index from the previous step. - if (remainingMembersToGetAnExtraPartition > 0) { - TopicIdPartition topicIdPartition = validCurrentMemberAssignment.get(retainedPartitionsCount++); - addPartitionToAssignment( - targetAssignment, - memberId, - topicIdPartition.topicId(), - topicIdPartition.partitionId() - ); - remainingMembersToGetAnExtraPartition--; + private void maybeRevokePartitions() { + for (Map.Entry<String, AssignmentMemberSpec> entry : groupSpec.members().entrySet()) { + String memberId = entry.getKey(); + AssignmentMemberSpec assignmentMemberSpec = entry.getValue(); + Map<Uuid, Set<Integer>> oldAssignment = assignmentMemberSpec.assignedPartitions(); + Map<Uuid, Set<Integer>> newAssignment = null; + + // The assignor expects to receive the assignment as an immutable map. It leverages + // this knowledge in order to avoid having to copy all assignments. + if (!isImmutableMap(oldAssignment)) { + throw new IllegalStateException("The assignor expect an immutable map."); + } + + int quota = minimumMemberQuota; + if (remainingMembersToGetAnExtraPartition > 0) { + quota++; + remainingMembersToGetAnExtraPartition--; + } + + for (Map.Entry<Uuid, Set<Integer>> topicPartitions : oldAssignment.entrySet()) { + Uuid topicId = topicPartitions.getKey(); + Set<Integer> partitions = topicPartitions.getValue(); + + if (subscribedTopicIds.contains(topicId)) { + if (partitions.size() <= quota) { + quota -= partitions.size(); + } else { + for (Integer partition : partitions) { + if (quota > 0) { + quota--; + } else { + if (newAssignment == null) { + // If the new assignment is null, we create a deep copy of the + // original assignment so that we can alter it. + newAssignment = deepCopy(oldAssignment); + } + // Remove the partition from the new assignment. + Set<Integer> parts = newAssignment.get(topicId); + parts.remove(partition); + if (parts.isEmpty()) { + newAssignment.remove(topicId); + } + // Add the partition to the unassigned set to be re-assigned later on. + unassignedPartitions.add(new TopicIdPartition(topicId, partition)); + } + } } - // Any previously owned partitions that weren't retained due to the quotas - // are added to the unassigned partitions set. - if (retainedPartitionsCount < currentAssignmentSize) { - unassignedPartitions.addAll(validCurrentMemberAssignment.subList( - retainedPartitionsCount, - currentAssignmentSize - )); + } else { + if (newAssignment == null) { + // If the new assignment is null, we create a deep copy of the + // original assignment so that we can alter it. + newAssignment = deepCopy(oldAssignment); } + // Remove the entire topic. + newAssignment.remove(topicId); } } - if (remaining >= 0) { - potentiallyUnfilledMembers.put(memberId, remaining); + if (quota > 0) { + unfilledMembers.add(new MemberWithRemainingQuota(memberId, quota)); } - }); - return potentiallyUnfilledMembers; - } - - /** - * Filters the current assignment of partitions for a given member based on certain criteria. - * - * Any partition that still belongs to the member's subscribed topics list is considered valid. - * - * @param currentMemberAssignment The map of topics to partitions currently assigned to the member. - * - * @return List of valid partitions after applying the filters. - */ - private List<TopicIdPartition> validCurrentMemberAssignment( - Map<Uuid, Set<Integer>> currentMemberAssignment - ) { - List<TopicIdPartition> validCurrentAssignmentList = new ArrayList<>(); - currentMemberAssignment.forEach((topicId, partitions) -> { - if (subscribedTopicIds.contains(topicId)) { - partitions.forEach(partition -> { - TopicIdPartition topicIdPartition = new TopicIdPartition(topicId, partition); - validCurrentAssignmentList.add(topicIdPartition); - }); + if (newAssignment == null) { + targetAssignment.put(memberId, new MemberAssignment(oldAssignment)); } else { - LOG.debug("The topic " + topicId + " is no longer present in the subscribed topics list"); + targetAssignment.put(memberId, new MemberAssignment(newAssignment)); } - }); - - return validCurrentAssignmentList; + } } /** - * Allocates the unassigned partitions to unfilled members in a round-robin fashion. + * Assign the unassigned partitions to the unfilled members. */ - private void unassignedPartitionsRoundRobinAssignment() { - Queue<String> roundRobinMembers = new LinkedList<>(potentiallyUnfilledMembers.keySet()); - - // Partitions are sorted to ensure an even topic wise distribution across members. - // This not only balances the load but also makes partition-to-member mapping more predictable. - List<TopicIdPartition> sortedPartitionsList = unassignedPartitions.stream() - .sorted(Comparator.comparing(TopicIdPartition::topicId).thenComparing(TopicIdPartition::partitionId)) - .collect(Collectors.toList()); - - for (TopicIdPartition topicIdPartition : sortedPartitionsList) { - boolean assigned = false; - - for (int i = 0; i < roundRobinMembers.size() && !assigned; i++) { - String memberId = roundRobinMembers.poll(); - if (potentiallyUnfilledMembers.containsKey(memberId)) { - assigned = maybeAssignPartitionToMember(memberId, topicIdPartition); - } - // Only re-add the member to the end of the queue if it's still available for assignment. - if (potentiallyUnfilledMembers.containsKey(memberId)) { - roundRobinMembers.add(memberId); - } + private void assignRemainingPartitions() { + int unassignedPartitionIndex = 0; + + for (MemberWithRemainingQuota unfilledMember : unfilledMembers) { + String memberId = unfilledMember.memberId; + int remainingQuota = unfilledMember.remainingQuota; + + Map<Uuid, Set<Integer>> newAssignment = targetAssignment.get(memberId).targetPartitions(); + if (isImmutableMap(newAssignment)) { + // If the new assignment is immutable, we must create a deep copy of it + // before altering it. + newAssignment = deepCopy(newAssignment); + targetAssignment.put(memberId, new MemberAssignment(newAssignment)); } - if (assigned) { - unassignedPartitions.remove(topicIdPartition); + for (int i = 0; i < remainingQuota && unassignedPartitionIndex < unassignedPartitions.size(); i++) { + TopicIdPartition unassignedTopicIdPartition = unassignedPartitions.get(unassignedPartitionIndex); + unassignedPartitionIndex++; + newAssignment + .computeIfAbsent(unassignedTopicIdPartition.topicId(), __ -> new HashSet<>()) + .add(unassignedTopicIdPartition.partitionId()); } } - } - /** - * Assigns the specified partition to the given member and updates the potentially unfilled members map. - * Only assign extra partitions once the member has met its minimum quota = total partitions / total members. - * - * <ol> - * <li> If the minimum quota hasn't been met aka remaining > 0 directly assign the partition. - * After assigning the partition, if the min quota has been met aka remaining = 0, remove the member - * if there's no members left to receive an extra partition. Otherwise, keep it in the - * potentially unfilled map. </li> - * <li> If the minimum quota has been met and if there is potential to receive an extra partition, assign it. - * Remove the member from the potentially unfilled map since it has already received the extra partition - * and met the min quota. </li> - * <li> Else, don't assign the partition. </li> - * </ol> - * - * @param memberId The Id of the member to which the partition will be assigned. - * @param topicIdPartition The topicIdPartition to be assigned. - * @return true if the assignment was successful, false otherwise. - */ - private boolean maybeAssignPartitionToMember(String memberId, TopicIdPartition topicIdPartition) { - int remaining = potentiallyUnfilledMembers.get(memberId); - boolean shouldAssign = false; - - // If the member hasn't met the minimum quota, set the flag for assignment. - // If member has met minimum quota and there's an extra partition available, set the flag for assignment. - if (remaining > 0) { - potentiallyUnfilledMembers.put(memberId, --remaining); - shouldAssign = true; - - // If the member meets the minimum quota due to this assignment, - // check if any extra partitions are available. - // Removing the member from the list reduces an iteration for when remaining = 0 but there's no extras left. - if (remaining == 0 && remainingMembersToGetAnExtraPartition == 0) { - potentiallyUnfilledMembers.remove(memberId); - } - } else if (remaining == 0 && remainingMembersToGetAnExtraPartition > 0) { - remainingMembersToGetAnExtraPartition--; - // Each member can only receive one extra partition, once they meet the minimum quota and receive an extra - // partition they can be removed from the potentially unfilled members map. - potentiallyUnfilledMembers.remove(memberId); - shouldAssign = true; + if (unassignedPartitionIndex < unassignedPartitions.size()) { + throw new PartitionAssignorException("Partitions were left unassigned"); } + } - // Assign the partition if flag is set. - if (shouldAssign) { - addPartitionToAssignment( - targetAssignment, - memberId, - topicIdPartition.topicId(), - topicIdPartition.partitionId() - ); - return true; + private static Map<Uuid, Set<Integer>> deepCopy(Map<Uuid, Set<Integer>> map) { + Map<Uuid, Set<Integer>> copy = new HashMap<>(map.size()); + for (Map.Entry<Uuid, Set<Integer>> entry : map.entrySet()) { + copy.put(entry.getKey(), new HashSet<>(entry.getValue())); } + return copy; + } + + private static class MemberWithRemainingQuota { + final String memberId; + final int remainingQuota; - // No assignment possible because the member met the minimum quota but - // number of members to receive an extra partition is zero. - return false; + MemberWithRemainingQuota( + String memberId, + int remainingQuota + ) { + this.memberId = memberId; + this.remainingQuota = remainingQuota; + } } } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java index 7da7c2d8c8a..648b0161d64 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/UniformAssignor.java @@ -66,21 +66,19 @@ public class UniformAssignor implements ConsumerGroupPartitionAssignor { GroupSpec groupSpec, SubscribedTopicDescriber subscribedTopicDescriber ) throws PartitionAssignorException { - AbstractUniformAssignmentBuilder assignmentBuilder; - if (groupSpec.members().isEmpty()) return new GroupAssignment(Collections.emptyMap()); if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { LOG.debug("Detected that all members are subscribed to the same set of topics, invoking the " + "optimized assignment algorithm"); - assignmentBuilder = new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber); + return new OptimizedUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber) + .build(); } else { LOG.debug("Detected that the members are subscribed to different sets of topics, invoking the " + "general assignment algorithm"); - assignmentBuilder = new GeneralUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber); + return new GeneralUniformAssignmentBuilder(groupSpec, subscribedTopicDescriber) + .buildAssignment(); } - - return assignmentBuilder.buildAssignment(); } } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java index 74bb303abd0..ffc5455ceaf 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/AssignmentTestUtil.java @@ -22,12 +22,13 @@ import org.apache.kafka.coordinator.group.assignor.GroupAssignment; import java.util.AbstractMap; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; +import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; -import java.util.TreeSet; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -42,13 +43,13 @@ public class AssignmentTestUtil { ); } - public static Map.Entry<Uuid, Set<Integer>> mkSortedTopicAssignment( + public static Map.Entry<Uuid, Set<Integer>> mkOrderedTopicAssignment( Uuid topicId, Integer... partitions ) { return new AbstractMap.SimpleEntry<>( topicId, - new TreeSet<>(Arrays.asList(partitions)) + new LinkedHashSet<>(Arrays.asList(partitions)) ); } @@ -56,18 +57,18 @@ public class AssignmentTestUtil { public static Map<Uuid, Set<Integer>> mkAssignment(Map.Entry<Uuid, Set<Integer>>... entries) { Map<Uuid, Set<Integer>> assignment = new HashMap<>(); for (Map.Entry<Uuid, Set<Integer>> entry : entries) { - assignment.put(entry.getKey(), entry.getValue()); + assignment.put(entry.getKey(), Collections.unmodifiableSet(entry.getValue())); } - return assignment; + return Collections.unmodifiableMap(assignment); } @SafeVarargs - public static Map<Uuid, Set<Integer>> mkSortedAssignment(Map.Entry<Uuid, Set<Integer>>... entries) { + public static Map<Uuid, Set<Integer>> mkOrderedAssignment(Map.Entry<Uuid, Set<Integer>>... entries) { Map<Uuid, Set<Integer>> assignment = new LinkedHashMap<>(); for (Map.Entry<Uuid, Set<Integer>> entry : entries) { - assignment.put(entry.getKey(), entry.getValue()); + assignment.put(entry.getKey(), Collections.unmodifiableSet(entry.getValue())); } - return assignment; + return Collections.unmodifiableMap(assignment); } /** diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpersTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpersTest.java index 77bad7a4834..900ba318393 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpersTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/CoordinatorRecordHelpersTest.java @@ -69,8 +69,8 @@ import java.util.Set; import java.util.stream.Stream; import static org.apache.kafka.coordinator.group.Assertions.assertRecordEquals; -import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedAssignment; -import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkSortedTopicAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedTopicAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newCurrentAssignmentRecord; import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpers.newCurrentAssignmentTombstoneRecord; @@ -297,7 +297,7 @@ public class CoordinatorRecordHelpersTest { Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); - Map<Uuid, Set<Integer>> partitions = mkSortedAssignment( + Map<Uuid, Set<Integer>> partitions = mkOrderedAssignment( mkTopicAssignment(topicId1, 11, 12, 13), mkTopicAssignment(topicId2, 21, 22, 23) ); @@ -379,14 +379,14 @@ public class CoordinatorRecordHelpersTest { Uuid topicId1 = Uuid.randomUuid(); Uuid topicId2 = Uuid.randomUuid(); - Map<Uuid, Set<Integer>> assigned = mkSortedAssignment( - mkSortedTopicAssignment(topicId1, 11, 12, 13), - mkSortedTopicAssignment(topicId2, 21, 22, 23) + Map<Uuid, Set<Integer>> assigned = mkOrderedAssignment( + mkOrderedTopicAssignment(topicId1, 11, 12, 13), + mkOrderedTopicAssignment(topicId2, 21, 22, 23) ); - Map<Uuid, Set<Integer>> revoking = mkSortedAssignment( - mkSortedTopicAssignment(topicId1, 14, 15, 16), - mkSortedTopicAssignment(topicId2, 24, 25, 26) + Map<Uuid, Set<Integer>> revoking = mkOrderedAssignment( + mkOrderedTopicAssignment(topicId1, 14, 15, 16), + mkOrderedTopicAssignment(topicId2, 24, 25, 26) ); CoordinatorRecord expectedRecord = new CoordinatorRecord( diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java index f21bd63735f..fdc4f5941fa 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/OptimizedUniformAssignmentBuilderTest.java @@ -34,6 +34,7 @@ import java.util.TreeMap; import static org.apache.kafka.common.utils.Utils.mkSet; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.assertAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkAssignment; +import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkOrderedAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.mkTopicAssignment; import static org.apache.kafka.coordinator.group.AssignmentTestUtil.invertedTargetAssignment; import static org.apache.kafka.coordinator.group.CoordinatorRecordHelpersTest.mkMapOfPartitionRacks; @@ -158,12 +159,11 @@ public class OptimizedUniformAssignmentBuilderTest { Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 2), - mkTopicAssignment(topic3Uuid, 1) + mkTopicAssignment(topic1Uuid, 0), + mkTopicAssignment(topic3Uuid, 0, 1) )); expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic1Uuid, 1), - mkTopicAssignment(topic3Uuid, 0) + mkTopicAssignment(topic1Uuid, 1, 2) )); GroupSpec groupSpec = new GroupSpecImpl( @@ -295,30 +295,25 @@ public class OptimizedUniformAssignmentBuilderTest { )); Map<String, AssignmentMemberSpec> members = new TreeMap<>(); - Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>( - mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 1), - mkTopicAssignment(topic2Uuid, 0, 1) - ) - ); + members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), - currentAssignmentForA + mkOrderedAssignment( + mkTopicAssignment(topic1Uuid, 0, 1), + mkTopicAssignment(topic2Uuid, 0, 1) + ) )); - Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>( - mkAssignment( - mkTopicAssignment(topic1Uuid, 2), - mkTopicAssignment(topic2Uuid, 2) - ) - ); members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), - currentAssignmentForB + mkOrderedAssignment( + mkTopicAssignment(topic1Uuid, 2), + mkTopicAssignment(topic2Uuid, 2) + ) )); Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); @@ -366,40 +361,34 @@ public class OptimizedUniformAssignmentBuilderTest { Map<String, AssignmentMemberSpec> members = new TreeMap<>(); - Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>( - mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 2), - mkTopicAssignment(topic2Uuid, 0) - ) - ); members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), - currentAssignmentForA + mkOrderedAssignment( + mkTopicAssignment(topic1Uuid, 0, 2), + mkTopicAssignment(topic2Uuid, 0) + ) )); - Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>( - mkAssignment( - mkTopicAssignment(topic1Uuid, 1), - mkTopicAssignment(topic2Uuid, 1, 2) - ) - ); members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), - currentAssignmentForB + mkOrderedAssignment( + mkTopicAssignment(topic1Uuid, 1), + mkTopicAssignment(topic2Uuid, 1, 2) + ) )); Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 2, 3, 5), - mkTopicAssignment(topic2Uuid, 0, 4) + mkTopicAssignment(topic1Uuid, 0, 2, 3), + mkTopicAssignment(topic2Uuid, 0, 3, 4) )); expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic1Uuid, 1, 4), - mkTopicAssignment(topic2Uuid, 1, 2, 3) + mkTopicAssignment(topic1Uuid, 1, 4, 5), + mkTopicAssignment(topic2Uuid, 1, 2) )); GroupSpec groupSpec = new GroupSpecImpl( @@ -436,26 +425,24 @@ public class OptimizedUniformAssignmentBuilderTest { Map<String, AssignmentMemberSpec> members = new HashMap<>(); - Map<Uuid, Set<Integer>> currentAssignmentForA = new TreeMap<>(mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 2), - mkTopicAssignment(topic2Uuid, 0) - )); members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), - currentAssignmentForA + mkOrderedAssignment( + mkTopicAssignment(topic1Uuid, 0, 2), + mkTopicAssignment(topic2Uuid, 0) + ) )); - Map<Uuid, Set<Integer>> currentAssignmentForB = new TreeMap<>(mkAssignment( - mkTopicAssignment(topic1Uuid, 1), - mkTopicAssignment(topic2Uuid, 1, 2) - )); members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), - currentAssignmentForB + mkOrderedAssignment( + mkTopicAssignment(topic1Uuid, 1), + mkTopicAssignment(topic2Uuid, 1, 2) + ) )); // Add a new member to trigger a re-assignment. @@ -512,38 +499,36 @@ public class OptimizedUniformAssignmentBuilderTest { Map<String, AssignmentMemberSpec> members = new HashMap<>(); - Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment( - mkTopicAssignment(topic1Uuid, 0), - mkTopicAssignment(topic2Uuid, 0) - ); members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), - currentAssignmentForA + mkAssignment( + mkTopicAssignment(topic1Uuid, 0), + mkTopicAssignment(topic2Uuid, 0) + ) )); - Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment( - mkTopicAssignment(topic1Uuid, 1), - mkTopicAssignment(topic2Uuid, 1) - ); members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), mkSet(topic1Uuid, topic2Uuid), - currentAssignmentForB + mkAssignment( + mkTopicAssignment(topic1Uuid, 1), + mkTopicAssignment(topic2Uuid, 1) + ) )); // Member C was removed Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); expectedAssignment.put(memberA, mkAssignment( - mkTopicAssignment(topic1Uuid, 0, 2), - mkTopicAssignment(topic2Uuid, 0) + mkTopicAssignment(topic1Uuid, 0), + mkTopicAssignment(topic2Uuid, 0, 2) )); expectedAssignment.put(memberB, mkAssignment( - mkTopicAssignment(topic1Uuid, 1), - mkTopicAssignment(topic2Uuid, 1, 2) + mkTopicAssignment(topic1Uuid, 1, 2), + mkTopicAssignment(topic2Uuid, 1) )); GroupSpec groupSpec = new GroupSpecImpl( @@ -581,26 +566,24 @@ public class OptimizedUniformAssignmentBuilderTest { // Initial subscriptions were [T1, T2] Map<String, AssignmentMemberSpec> members = new HashMap<>(); - Map<Uuid, Set<Integer>> currentAssignmentForA = mkAssignment( - mkTopicAssignment(topic1Uuid, 0), - mkTopicAssignment(topic2Uuid, 0) - ); members.put(memberA, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), Collections.singleton(topic2Uuid), - currentAssignmentForA + mkAssignment( + mkTopicAssignment(topic1Uuid, 0), + mkTopicAssignment(topic2Uuid, 0) + ) )); - Map<Uuid, Set<Integer>> currentAssignmentForB = mkAssignment( - mkTopicAssignment(topic1Uuid, 1), - mkTopicAssignment(topic2Uuid, 1) - ); members.put(memberB, new AssignmentMemberSpec( Optional.empty(), Optional.empty(), Collections.singleton(topic2Uuid), - currentAssignmentForB + mkAssignment( + mkTopicAssignment(topic1Uuid, 1), + mkTopicAssignment(topic2Uuid, 1) + ) )); Map<String, Map<Uuid, Set<Integer>>> expectedAssignment = new HashMap<>(); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java index 2349350e4ba..77a38ab7f4e 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/ServerSideAssignorBenchmark.java @@ -268,7 +268,7 @@ public class ServerSideAssignorBenchmark { assignmentMemberSpec.instanceId(), assignmentMemberSpec.rackId(), assignmentMemberSpec.subscribedTopicIds(), - memberAssignment.targetPartitions() + Collections.unmodifiableMap(memberAssignment.targetPartitions()) )); }); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java index 511db01c862..7d67f07d3e6 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/assignor/TargetAssignmentBuilderBenchmark.java @@ -184,10 +184,7 @@ public class TargetAssignmentBuilderBenchmark { for (Map.Entry<String, MemberAssignment> entry : groupAssignment.members().entrySet()) { String memberId = entry.getKey(); Map<Uuid, Set<Integer>> topicPartitions = entry.getValue().targetPartitions(); - - Assignment assignment = new Assignment(topicPartitions); - - initialTargetAssignment.put(memberId, assignment); + initialTargetAssignment.put(memberId, new Assignment(topicPartitions)); } return initialTargetAssignment;