Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
tedyu commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1666957266 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -65,191 +92,225 @@ public String name() { } /** - * Pair of memberId and remaining partitions to meet the quota. + * Metadata for a topic including partition and subscription details. */ -private static class MemberWithRemainingAssignments { +private static class TopicMetadata { +private final Uuid topicId; +private final int numPartitions; +private int numMembers; +private int minQuota = -1; +private int extraPartitions = -1; +private int nextRange = 0; + /** - * Member Id. + * Constructs a new TopicMetadata instance. + * + * @param topicId The topic Id. + * @param numPartitions The number of partitions. + * @param numMembersThe number of subscribed members. */ -private final String memberId; +private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) { +this.topicId = topicId; +this.numPartitions = numPartitions; +this.numMembers = numMembers; +} /** - * Number of partitions required to meet the assignment quota. + * Computes the minimum partition quota per member and the extra partitions, if not already computed. */ -private final int remaining; +private void maybeComputeQuota() { +if (minQuota != -1) return; -public MemberWithRemainingAssignments(String memberId, int remaining) { -this.memberId = memberId; -this.remaining = remaining; +// The minimum number of partitions each member should receive for a balanced assignment. +minQuota = numPartitions / numMembers; + +// Extra partitions to be distributed one to each member. +extraPartitions = numPartitions % numMembers; +} + +@Override +public String toString() { +return "TopicMetadata(topicId=" + topicId + +", numPartitions=" + numPartitions + +", numMembers=" + numMembers + +", minQuota=" + minQuota + +", extraPartitions=" + extraPartitions + +", nextRange=" + nextRange + +')'; } } /** - * Returns a map of topic Ids to a list of members subscribed to them, - * based on the given assignment specification and metadata. - * - * @param groupSpec The specification required for group assignments. - * @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters. - * @return A map of topic Ids to a list of member Ids subscribed to them. - * - * @throws PartitionAssignorException If a member is subscribed to a non-existent topic. + * Assigns partitions to members of a homogeneous group. All members are subscribed to the same set of topics. + * Assignment will be co-partitioned when all the topics have an equal number of partitions. */ -private Map> membersPerTopic( -final GroupSpec groupSpec, -final SubscribedTopicDescriber subscribedTopicDescriber -) { -Map> membersPerTopic = new HashMap<>(); - -if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { -Collection allMembers = groupSpec.memberIds(); -Collection topics = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()) -.subscribedTopicIds(); - -for (Uuid topicId : topics) { -if (subscribedTopicDescriber.numPartitions(topicId) == -1) { -throw new PartitionAssignorException("Member is subscribed to a non-existent topic"); -} -membersPerTopic.put(topicId, allMembers); +private GroupAssignment assignHomogeneousGroup( +GroupSpec groupSpec, +SubscribedTopicDescriber subscribedTopicDescriber +) throws PartitionAssignorException { +List memberIds = sortMemberIds(groupSpec); Review Comment: It seems this sorting can be delayed. On line 159, there may be exception thrown. the sorting can be done when the loop starting on line 157 finishes. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
C0urante commented on PR #16504: URL: https://github.com/apache/kafka/pull/16504#issuecomment-2209498308 Ah, looks like a fix has been published: https://github.com/apache/kafka/pull/16526 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
C0urante commented on PR #16504: URL: https://github.com/apache/kafka/pull/16504#issuecomment-2209490277 @dajac @rreddy-22 It looks like this commit broke the build since there were new tests added [here](https://github.com/apache/kafka/blob/376365d9da8099e3eb9175090cc456f55985fcb0/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java) a few hours before this was merged that were not updated to use the new `MemberSubscriptionAndAssignmentImpl` constructor. Can we either revert this commit or publish a fix PR ASAP? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
dajac merged PR #16504: URL: https://github.com/apache/kafka/pull/16504 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
dajac commented on PR #16504: URL: https://github.com/apache/kafka/pull/16504#issuecomment-2209035197 Here are the results of the benchmarks based on the last commit: ``` Benchmark (assignmentType) (assignorType) (isRackAware) (memberCount) (partitionsToMemberRatio) (subscriptionType) (topicCount) Mode CntScoreError Units ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false100 10 HOMOGENEOUS 100 avgt50.052 ± 0.001 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false100 10 HOMOGENEOUS 1000 avgt50.454 ± 0.003 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 1000 10 HOMOGENEOUS 100 avgt50.476 ± 0.046 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 1000 10 HOMOGENEOUS 1000 avgt53.102 ± 0.055 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 1 10 HOMOGENEOUS 100 avgt55.640 ± 0.223 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 1 10 HOMOGENEOUS 1000 avgt5 37.947 ± 1.000 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false100 10 HETEROGENEOUS 100 avgt50.172 ± 0.001 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false100 10 HETEROGENEOUS 1000 avgt51.882 ± 0.006 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 1000 10 HETEROGENEOUS 100 avgt51.730 ± 0.036 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 1000 10 HETEROGENEOUS 1000 avgt5 17.654 ± 1.160 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 1 10 HETEROGENEOUS 100 avgt5 18.595 ± 0.316 ms/op ServerSideAssignorBenchmark.doAssignment INCREMENTAL RANGE false 1 10 HETEROGENEOUS 1000 avgt5 172.398 ± 2.251 ms/op JMH benchmarks done Benchmark (memberCount) (partitionsToMemberRatio) (topicCount) Mode Cnt Score Error Units TargetAssignmentBuilderBenchmark.build 100 10 100 avgt5 0.071 ± 0.004 ms/op TargetAssignmentBuilderBenchmark.build 100 10 1000 avgt5 0.428 ± 0.026 ms/op TargetAssignmentBuilderBenchmark.build 1000 10 100 avgt5 0.659 ± 0.028 ms/op TargetAssignmentBuilderBenchmark.build 1000 10 1000 avgt5 3.346 ± 0.102 ms/op TargetAssignmentBuilderBenchmark.build1 10 100 avgt5 8.947 ± 0.386 ms/op TargetAssignmentBuilderBenchmark.build1 10 1000 avgt5 40.240 ± 3.113 ms/op JMH benchmarks done ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
dajac commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1663741172 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -65,191 +94,228 @@ public String name() { } /** - * Pair of memberId and remaining partitions to meet the quota. + * Metadata for a topic including partition and subscription details. */ -private static class MemberWithRemainingAssignments { +private static class TopicMetadata { +private final Uuid topicId; +private final int numPartitions; +private int numMembers; +private int minQuota = -1; +private int extraPartitions = -1; +private int nextRange = 0; + /** - * Member Id. + * Constructs a new TopicMetadata instance. + * + * @param topicId The topic Id. + * @param numPartitions The number of partitions. + * @param numMembersThe number of subscribed members. */ -private final String memberId; +private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) { +this.topicId = topicId; +this.numPartitions = numPartitions; +this.numMembers = numMembers; +} /** - * Number of partitions required to meet the assignment quota. + * Computes the minimum partition quota per member and the extra partitions, if not already computed. */ -private final int remaining; +private void maybeComputeQuota() { +if (minQuota != -1) return; -public MemberWithRemainingAssignments(String memberId, int remaining) { -this.memberId = memberId; -this.remaining = remaining; +// The minimum number of partitions each member should receive for a balanced assignment. +minQuota = numPartitions / numMembers; + +// Extra partitions to be distributed one to each member. +extraPartitions = numPartitions % numMembers; +} + +@Override +public String toString() { +return "TopicMetadata(topicId=" + topicId + +", numPartitions=" + numPartitions + +", numMembers=" + numMembers + +", minQuota=" + minQuota + +", extraPartitions=" + extraPartitions + +", nextRange=" + nextRange + +')'; } } /** - * Returns a map of topic Ids to a list of members subscribed to them, - * based on the given assignment specification and metadata. - * - * @param groupSpec The specification required for group assignments. - * @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters. - * @return A map of topic Ids to a list of member Ids subscribed to them. - * - * @throws PartitionAssignorException If a member is subscribed to a non-existent topic. + * Assigns partitions to members of a homogeneous group. All members are subscribed to the same set of topics. + * Assignment will be co-partitioned when all the topics have an equal number of partitions. */ -private Map> membersPerTopic( -final GroupSpec groupSpec, -final SubscribedTopicDescriber subscribedTopicDescriber -) { -Map> membersPerTopic = new HashMap<>(); - -if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { -Collection allMembers = groupSpec.memberIds(); -Collection topics = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()) -.subscribedTopicIds(); - -for (Uuid topicId : topics) { -if (subscribedTopicDescriber.numPartitions(topicId) == -1) { -throw new PartitionAssignorException("Member is subscribed to a non-existent topic"); -} -membersPerTopic.put(topicId, allMembers); +private GroupAssignment assignHomogeneousGroup( +GroupSpec groupSpec, +SubscribedTopicDescriber subscribedTopicDescriber +) throws PartitionAssignorException { +List memberIds = sortMemberIds(groupSpec); + +MemberSubscription subs = groupSpec.memberSubscription(memberIds.get(0)); +Set subscribedTopics = new HashSet<>(subs.subscribedTopicIds()); Review Comment: It looks like copying the `subscribedTopicIds` is not necessary here because we only iterate on it once. What do you think? ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -65,191 +94,228 @@ public String name() { } /** - * Pair of memberId and remaining partitions to meet the quota. + * Metadata for a topic including partition and subscription details. */ -private static class MemberWithR
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
rreddy-22 commented on PR #16504: URL: https://github.com/apache/kafka/pull/16504#issuecomment-2205291263 ``` TargetAssignmentBuilderBenchmark.build100 10 100 avgt50.432 ± 0.003 ms/op TargetAssignmentBuilderBenchmark.build100 10 1000 avgt54.139 ± 0.013 ms/op TargetAssignmentBuilderBenchmark.build 1000 10 100 avgt54.332 ± 0.049 ms/op TargetAssignmentBuilderBenchmark.build 1000 10 1000 avgt5 43.449 ± 0.058 ms/op TargetAssignmentBuilderBenchmark.build 1 10 100 avgt5 47.766 ± 0.389 ms/op TargetAssignmentBuilderBenchmark.build 1 10 1000 avgt5 487.833 ± 3.459 ms/op ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
rreddy-22 commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1663544304 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -381,9 +558,12 @@ public void testReassignmentNumConsumersGreaterThanNumPartitionsWhenOneConsumerA mkTopicAssignment(topic1Uuid, 1), mkTopicAssignment(topic2Uuid, 1) )); +expectedAssignment.put(memberC, mkAssignment( +mkTopicAssignment(topic1Uuid), +mkTopicAssignment(topic2Uuid) +)); -// Consumer C shouldn't get any assignment, due to stickiness A, B retain their assignments -assertNull(computedAssignment.members().get(memberC)); +// Consumer C shouldn't get any assignment. Review Comment: It still doesn't get any assignment -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
rreddy-22 commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1663541148 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -91,7 +96,10 @@ public void testOneConsumerNoTopic() { subscribedTopicMetadata ); -assertEquals(Collections.emptyMap(), groupAssignment.members()); +Map expectedAssignment = new HashMap<>(); +expectedAssignment.put(memberA, new MemberAssignmentImpl(Collections.emptyMap())); Review Comment: can you help me understand why? There's no topic right -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
rreddy-22 commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1663541148 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -91,7 +96,10 @@ public void testOneConsumerNoTopic() { subscribedTopicMetadata ); -assertEquals(Collections.emptyMap(), groupAssignment.members()); +Map expectedAssignment = new HashMap<>(); +expectedAssignment.put(memberA, new MemberAssignmentImpl(Collections.emptyMap())); Review Comment: why..? There's no topic right -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
rreddy-22 commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1663537165 ## group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java: ## @@ -730,7 +922,20 @@ private void assertAssignment( assertEquals(expectedAssignment.size(), computedGroupAssignment.members().size()); for (String memberId : computedGroupAssignment.members().keySet()) { Map> computedAssignmentForMember = computedGroupAssignment.members().get(memberId).partitions(); -assertEquals(expectedAssignment.get(memberId), computedAssignmentForMember); +Map> expectedAssignmentForMember = expectedAssignment.get(memberId); + +// Compare the content of the maps by entries. +assertEquals(expectedAssignmentForMember.size(), computedAssignmentForMember.size()); +for (Map.Entry> entry : expectedAssignmentForMember.entrySet()) { +Set expectedSet = entry.getValue(); +Set computedSet = computedAssignmentForMember.get(entry.getKey()); + +// Convert both sets to HashSet for comparison. +Set normalizedExpectedSet = new HashSet<>(expectedSet); +Set normalizedComputedSet = new HashSet<>(computedSet); + +assertEquals(normalizedExpectedSet, normalizedComputedSet); +} Review Comment: yeah, I think it didn't work initially, but changed it now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
rreddy-22 commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1663330737 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; + +/** + * A {@code RangeSet} represents a range of integers from {@code from} (inclusive) + * to {@code to} (exclusive). + * This implementation provides a view over a continuous range of integers without actually storing them. + */ +public class RangeSet implements Set { +private final int from; +private final int to; + +/** + * Constructs a {@code RangeSet} with the specified range. + * + * @param from The starting value (inclusive) of the range. + * @param toThe ending value (exclusive) of the range. + */ +public RangeSet(int from, int to) { +this.from = from; +this.to = to; +} + +@Override +public int size() { +return to - from; +} + +@Override +public boolean isEmpty() { +return size() == 0; +} + +@Override +public boolean contains(Object o) { +if (o instanceof Integer) { +int value = (Integer) o; +return value >= from && value < to; +} +return false; +} + +@Override +public Iterator iterator() { +return new Iterator() { +private int current = from; + +@Override +public boolean hasNext() { +return current < to; +} + +@Override +public Integer next() { +if (!hasNext()) throw new NoSuchElementException(); +return current++; +} +}; +} + +@Override +public Object[] toArray() { +throw new UnsupportedOperationException(); +} + +@Override +public T[] toArray(T[] a) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean add(Integer integer) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean remove(Object o) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean containsAll(Collection c) { +for (Object o : c) { +if (!contains(o)) return false; +} +return true; +} + +@Override +public boolean addAll(Collection c) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean retainAll(Collection c) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean removeAll(Collection c) { +throw new UnsupportedOperationException(); +} + +@Override +public void clear() { +throw new UnsupportedOperationException(); +} + +@Override +public String toString() { +StringBuilder sb = new StringBuilder(); +sb.append("["); +for (int i = from; i < to; i++) { +sb.append(i); +if (i < to - 1) { +sb.append(", "); +} +} +sb.append("]"); +return sb.toString(); Review Comment: Okie I changed it! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
rreddy-22 commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1663307155 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -65,191 +94,233 @@ public String name() { } /** - * Pair of memberId and remaining partitions to meet the quota. + * Metadata for a topic including partition and subscription details. */ -private static class MemberWithRemainingAssignments { +private static class TopicMetadata { +public final Uuid topicId; +public final int numPartitions; +public int numMembers; + +public int minQuota = -1; +public int extraPartitions = -1; +public int nextRange = 0; + +/** + * Constructs a new TopicMetadata instance. + * + * @param topicId The topic Id. + * @param numPartitions The number of partitions. + * @param numMembersThe number of subscribed members. + */ +private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) { +this.topicId = topicId; +this.numPartitions = numPartitions; +this.numMembers = numMembers; +} + /** - * Member Id. + * Factory method to create a TopicMetadata instance. + * + * @param topicId The topic Id. + * @param numPartitions The number of partitions. + * @param numMembersThe number of subscribed members. + * @return A new TopicMetadata instance. */ -private final String memberId; +public static TopicMetadata create(Uuid topicId, int numPartitions, int numMembers) { +return new TopicMetadata(topicId, numPartitions, numMembers); +} /** - * Number of partitions required to meet the assignment quota. + * Computes the minimum partition quota per member and the extra partitions, if not already computed. */ -private final int remaining; +void maybeComputeQuota() { +// The minimum number of partitions each member should receive for a balanced assignment. +if (minQuota != -1) return; +minQuota = numPartitions / numMembers; + +// Extra partitions to be distributed one to each member. +extraPartitions = numPartitions % numMembers; +} -public MemberWithRemainingAssignments(String memberId, int remaining) { -this.memberId = memberId; -this.remaining = remaining; +@Override +public String toString() { +return "TopicMetadata{" + +"topicId=" + topicId + +", numPartitions=" + numPartitions + +", numMembers=" + numMembers + +", minQuota=" + minQuota + +", extraPartitions=" + extraPartitions + +", nextRange=" + nextRange + +'}'; } } /** - * Returns a map of topic Ids to a list of members subscribed to them, - * based on the given assignment specification and metadata. - * - * @param groupSpec The specification required for group assignments. - * @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters. - * @return A map of topic Ids to a list of member Ids subscribed to them. - * - * @throws PartitionAssignorException If a member is subscribed to a non-existent topic. + * Assigns partitions to members of a homogeneous group. All members are subscribed to the same set of topics. + * Assignment will be co-partitioned when all the topics have an equal number of partitions. */ -private Map> membersPerTopic( -final GroupSpec groupSpec, -final SubscribedTopicDescriber subscribedTopicDescriber -) { -Map> membersPerTopic = new HashMap<>(); - -if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { -Collection allMembers = groupSpec.memberIds(); -Collection topics = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()) -.subscribedTopicIds(); - -for (Uuid topicId : topics) { -if (subscribedTopicDescriber.numPartitions(topicId) == -1) { -throw new PartitionAssignorException("Member is subscribed to a non-existent topic"); -} -membersPerTopic.put(topicId, allMembers); +private GroupAssignment assignHomogeneousGroup( +GroupSpec groupSpec, +SubscribedTopicDescriber subscribedTopicDescriber +) throws PartitionAssignorException { +List memberIds = sortMemberIds(groupSpec); + +MemberSubscription subs = groupSpec.memberSubscription(memberIds.get(0)); +Set subscribedTopics = new HashSet<>(subs.subscribed
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
rreddy-22 commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1663277435 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; + +/** + * A {@code RangeSet} represents a range of integers from {@code from} (inclusive) + * to {@code to} (exclusive). + * This implementation provides a view over a continuous range of integers without actually storing them. + */ +public class RangeSet implements Set { +private final int from; +private final int to; + +/** + * Constructs a {@code RangeSet} with the specified range. + * + * @param from The starting value (inclusive) of the range. + * @param toThe ending value (exclusive) of the range. + */ +public RangeSet(int from, int to) { +this.from = from; +this.to = to; +} + +@Override +public int size() { +return to - from; +} + +@Override +public boolean isEmpty() { +return size() == 0; +} + +@Override +public boolean contains(Object o) { +if (o instanceof Integer) { +int value = (Integer) o; +return value >= from && value < to; +} +return false; +} + +@Override +public Iterator iterator() { +return new Iterator() { +private int current = from; + +@Override +public boolean hasNext() { +return current < to; +} + +@Override +public Integer next() { +if (!hasNext()) throw new NoSuchElementException(); +return current++; +} +}; +} + +@Override +public Object[] toArray() { +throw new UnsupportedOperationException(); +} + +@Override +public T[] toArray(T[] a) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean add(Integer integer) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean remove(Object o) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean containsAll(Collection c) { +for (Object o : c) { +if (!contains(o)) return false; +} +return true; +} + +@Override +public boolean addAll(Collection c) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean retainAll(Collection c) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean removeAll(Collection c) { +throw new UnsupportedOperationException(); +} + +@Override +public void clear() { +throw new UnsupportedOperationException(); +} + +@Override +public String toString() { +StringBuilder sb = new StringBuilder(); +sb.append("["); +for (int i = from; i < to; i++) { +sb.append(i); +if (i < to - 1) { +sb.append(", "); +} +} +sb.append("]"); +return sb.toString(); +} + +/** + * Compares the specified object with this set for equality. Returns {@code true} + * if the specified object is also a set, the two sets have the same size, and + * every member of the specified set is contained in this set. + * + * @param o object to be compared for equality with this set + * @return {@code true} if the specified object is equal to this set + */ +@Override +public boolean equals(Object o) { +if (this == o) return true; +if (!(o instanceof Set)) return false; + +Set otherSet = (Set) o; +if (otherSet.size() != this.size()) return false; + +for (int i = from; i < to; i++) { +if (!otherSet.contains(i)) return false; +} Review Comment: yess makes sense -- This is an automated message
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
rreddy-22 commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1663251568 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -65,191 +94,233 @@ public String name() { } /** - * Pair of memberId and remaining partitions to meet the quota. + * Metadata for a topic including partition and subscription details. */ -private static class MemberWithRemainingAssignments { +private static class TopicMetadata { +public final Uuid topicId; +public final int numPartitions; +public int numMembers; + +public int minQuota = -1; +public int extraPartitions = -1; +public int nextRange = 0; + +/** + * Constructs a new TopicMetadata instance. + * + * @param topicId The topic Id. + * @param numPartitions The number of partitions. + * @param numMembersThe number of subscribed members. + */ +private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) { +this.topicId = topicId; +this.numPartitions = numPartitions; +this.numMembers = numMembers; +} + /** - * Member Id. + * Factory method to create a TopicMetadata instance. + * + * @param topicId The topic Id. + * @param numPartitions The number of partitions. + * @param numMembersThe number of subscribed members. + * @return A new TopicMetadata instance. */ -private final String memberId; +public static TopicMetadata create(Uuid topicId, int numPartitions, int numMembers) { +return new TopicMetadata(topicId, numPartitions, numMembers); +} /** - * Number of partitions required to meet the assignment quota. + * Computes the minimum partition quota per member and the extra partitions, if not already computed. */ -private final int remaining; +void maybeComputeQuota() { +// The minimum number of partitions each member should receive for a balanced assignment. +if (minQuota != -1) return; +minQuota = numPartitions / numMembers; + +// Extra partitions to be distributed one to each member. +extraPartitions = numPartitions % numMembers; +} -public MemberWithRemainingAssignments(String memberId, int remaining) { -this.memberId = memberId; -this.remaining = remaining; +@Override +public String toString() { +return "TopicMetadata{" + +"topicId=" + topicId + +", numPartitions=" + numPartitions + +", numMembers=" + numMembers + +", minQuota=" + minQuota + +", extraPartitions=" + extraPartitions + +", nextRange=" + nextRange + +'}'; } } /** - * Returns a map of topic Ids to a list of members subscribed to them, - * based on the given assignment specification and metadata. - * - * @param groupSpec The specification required for group assignments. - * @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters. - * @return A map of topic Ids to a list of member Ids subscribed to them. - * - * @throws PartitionAssignorException If a member is subscribed to a non-existent topic. + * Assigns partitions to members of a homogeneous group. All members are subscribed to the same set of topics. + * Assignment will be co-partitioned when all the topics have an equal number of partitions. */ -private Map> membersPerTopic( -final GroupSpec groupSpec, -final SubscribedTopicDescriber subscribedTopicDescriber -) { -Map> membersPerTopic = new HashMap<>(); - -if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { -Collection allMembers = groupSpec.memberIds(); -Collection topics = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()) -.subscribedTopicIds(); - -for (Uuid topicId : topics) { -if (subscribedTopicDescriber.numPartitions(topicId) == -1) { -throw new PartitionAssignorException("Member is subscribed to a non-existent topic"); -} -membersPerTopic.put(topicId, allMembers); +private GroupAssignment assignHomogeneousGroup( +GroupSpec groupSpec, +SubscribedTopicDescriber subscribedTopicDescriber +) throws PartitionAssignorException { +List memberIds = sortMemberIds(groupSpec); + +MemberSubscription subs = groupSpec.memberSubscription(memberIds.get(0)); +Set subscribedTopics = new HashSet<>(subs.subscribed
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
rreddy-22 commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1663248450 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -65,191 +94,233 @@ public String name() { } /** - * Pair of memberId and remaining partitions to meet the quota. + * Metadata for a topic including partition and subscription details. */ -private static class MemberWithRemainingAssignments { +private static class TopicMetadata { +public final Uuid topicId; +public final int numPartitions; +public int numMembers; + +public int minQuota = -1; +public int extraPartitions = -1; +public int nextRange = 0; + +/** + * Constructs a new TopicMetadata instance. + * + * @param topicId The topic Id. + * @param numPartitions The number of partitions. + * @param numMembersThe number of subscribed members. + */ +private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) { +this.topicId = topicId; +this.numPartitions = numPartitions; +this.numMembers = numMembers; +} + /** - * Member Id. + * Factory method to create a TopicMetadata instance. + * + * @param topicId The topic Id. + * @param numPartitions The number of partitions. + * @param numMembersThe number of subscribed members. + * @return A new TopicMetadata instance. */ -private final String memberId; +public static TopicMetadata create(Uuid topicId, int numPartitions, int numMembers) { +return new TopicMetadata(topicId, numPartitions, numMembers); +} /** - * Number of partitions required to meet the assignment quota. + * Computes the minimum partition quota per member and the extra partitions, if not already computed. */ -private final int remaining; +void maybeComputeQuota() { +// The minimum number of partitions each member should receive for a balanced assignment. +if (minQuota != -1) return; +minQuota = numPartitions / numMembers; + +// Extra partitions to be distributed one to each member. +extraPartitions = numPartitions % numMembers; +} -public MemberWithRemainingAssignments(String memberId, int remaining) { -this.memberId = memberId; -this.remaining = remaining; +@Override +public String toString() { +return "TopicMetadata{" + +"topicId=" + topicId + +", numPartitions=" + numPartitions + +", numMembers=" + numMembers + +", minQuota=" + minQuota + +", extraPartitions=" + extraPartitions + +", nextRange=" + nextRange + +'}'; } } /** - * Returns a map of topic Ids to a list of members subscribed to them, - * based on the given assignment specification and metadata. - * - * @param groupSpec The specification required for group assignments. - * @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters. - * @return A map of topic Ids to a list of member Ids subscribed to them. - * - * @throws PartitionAssignorException If a member is subscribed to a non-existent topic. + * Assigns partitions to members of a homogeneous group. All members are subscribed to the same set of topics. + * Assignment will be co-partitioned when all the topics have an equal number of partitions. */ -private Map> membersPerTopic( -final GroupSpec groupSpec, -final SubscribedTopicDescriber subscribedTopicDescriber -) { -Map> membersPerTopic = new HashMap<>(); - -if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { -Collection allMembers = groupSpec.memberIds(); -Collection topics = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()) -.subscribedTopicIds(); - -for (Uuid topicId : topics) { -if (subscribedTopicDescriber.numPartitions(topicId) == -1) { -throw new PartitionAssignorException("Member is subscribed to a non-existent topic"); -} -membersPerTopic.put(topicId, allMembers); +private GroupAssignment assignHomogeneousGroup( +GroupSpec groupSpec, +SubscribedTopicDescriber subscribedTopicDescriber +) throws PartitionAssignorException { +List memberIds = sortMemberIds(groupSpec); + +MemberSubscription subs = groupSpec.memberSubscription(memberIds.get(0)); +Set subscribedTopics = new HashSet<>(subs.subscribed
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
dajac commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1662942561 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java: ## @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.coordinator.group.assignor; + +import java.util.Collection; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Set; + +/** + * A {@code RangeSet} represents a range of integers from {@code from} (inclusive) + * to {@code to} (exclusive). + * This implementation provides a view over a continuous range of integers without actually storing them. + */ +public class RangeSet implements Set { +private final int from; +private final int to; + +/** + * Constructs a {@code RangeSet} with the specified range. + * + * @param from The starting value (inclusive) of the range. + * @param toThe ending value (exclusive) of the range. + */ +public RangeSet(int from, int to) { +this.from = from; +this.to = to; +} + +@Override +public int size() { +return to - from; +} + +@Override +public boolean isEmpty() { +return size() == 0; +} + +@Override +public boolean contains(Object o) { +if (o instanceof Integer) { +int value = (Integer) o; +return value >= from && value < to; +} +return false; +} + +@Override +public Iterator iterator() { +return new Iterator() { +private int current = from; + +@Override +public boolean hasNext() { +return current < to; +} + +@Override +public Integer next() { +if (!hasNext()) throw new NoSuchElementException(); +return current++; +} +}; +} + +@Override +public Object[] toArray() { +throw new UnsupportedOperationException(); +} + +@Override +public T[] toArray(T[] a) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean add(Integer integer) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean remove(Object o) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean containsAll(Collection c) { +for (Object o : c) { +if (!contains(o)) return false; +} +return true; +} + +@Override +public boolean addAll(Collection c) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean retainAll(Collection c) { +throw new UnsupportedOperationException(); +} + +@Override +public boolean removeAll(Collection c) { +throw new UnsupportedOperationException(); +} + +@Override +public void clear() { +throw new UnsupportedOperationException(); +} + +@Override +public String toString() { +StringBuilder sb = new StringBuilder(); +sb.append("["); +for (int i = from; i < to; i++) { +sb.append(i); +if (i < to - 1) { +sb.append(", "); +} +} +sb.append("]"); +return sb.toString(); Review Comment: The short version seems better to me but it may be a personal preference. I’d also like to have the type printed out. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]
dajac commented on code in PR #16504: URL: https://github.com/apache/kafka/pull/16504#discussion_r1662897722 ## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java: ## @@ -65,191 +94,233 @@ public String name() { } /** - * Pair of memberId and remaining partitions to meet the quota. + * Metadata for a topic including partition and subscription details. */ -private static class MemberWithRemainingAssignments { +private static class TopicMetadata { +public final Uuid topicId; +public final int numPartitions; +public int numMembers; + +public int minQuota = -1; +public int extraPartitions = -1; +public int nextRange = 0; + +/** + * Constructs a new TopicMetadata instance. + * + * @param topicId The topic Id. + * @param numPartitions The number of partitions. + * @param numMembersThe number of subscribed members. + */ +private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) { +this.topicId = topicId; +this.numPartitions = numPartitions; +this.numMembers = numMembers; +} + /** - * Member Id. + * Factory method to create a TopicMetadata instance. + * + * @param topicId The topic Id. + * @param numPartitions The number of partitions. + * @param numMembersThe number of subscribed members. + * @return A new TopicMetadata instance. */ -private final String memberId; +public static TopicMetadata create(Uuid topicId, int numPartitions, int numMembers) { +return new TopicMetadata(topicId, numPartitions, numMembers); +} /** - * Number of partitions required to meet the assignment quota. + * Computes the minimum partition quota per member and the extra partitions, if not already computed. */ -private final int remaining; +void maybeComputeQuota() { +// The minimum number of partitions each member should receive for a balanced assignment. +if (minQuota != -1) return; +minQuota = numPartitions / numMembers; + +// Extra partitions to be distributed one to each member. +extraPartitions = numPartitions % numMembers; +} -public MemberWithRemainingAssignments(String memberId, int remaining) { -this.memberId = memberId; -this.remaining = remaining; +@Override +public String toString() { +return "TopicMetadata{" + +"topicId=" + topicId + +", numPartitions=" + numPartitions + +", numMembers=" + numMembers + +", minQuota=" + minQuota + +", extraPartitions=" + extraPartitions + +", nextRange=" + nextRange + +'}'; } } /** - * Returns a map of topic Ids to a list of members subscribed to them, - * based on the given assignment specification and metadata. - * - * @param groupSpec The specification required for group assignments. - * @param subscribedTopicDescriber The metadata describer for subscribed topics and clusters. - * @return A map of topic Ids to a list of member Ids subscribed to them. - * - * @throws PartitionAssignorException If a member is subscribed to a non-existent topic. + * Assigns partitions to members of a homogeneous group. All members are subscribed to the same set of topics. + * Assignment will be co-partitioned when all the topics have an equal number of partitions. */ -private Map> membersPerTopic( -final GroupSpec groupSpec, -final SubscribedTopicDescriber subscribedTopicDescriber -) { -Map> membersPerTopic = new HashMap<>(); - -if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) { -Collection allMembers = groupSpec.memberIds(); -Collection topics = groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()) -.subscribedTopicIds(); - -for (Uuid topicId : topics) { -if (subscribedTopicDescriber.numPartitions(topicId) == -1) { -throw new PartitionAssignorException("Member is subscribed to a non-existent topic"); -} -membersPerTopic.put(topicId, allMembers); +private GroupAssignment assignHomogeneousGroup( +GroupSpec groupSpec, +SubscribedTopicDescriber subscribedTopicDescriber +) throws PartitionAssignorException { +List memberIds = sortMemberIds(groupSpec); + +MemberSubscription subs = groupSpec.memberSubscription(memberIds.get(0)); +Set subscribedTopics = new HashSet<>(subs.subscribedTopi