Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-05 Thread via GitHub


tedyu commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1666957266


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -65,191 +92,225 @@ public String name() {
 }
 
 /**
- * Pair of memberId and remaining partitions to meet the quota.
+ * Metadata for a topic including partition and subscription details.
  */
-private static class MemberWithRemainingAssignments {
+private static class TopicMetadata {
+private final Uuid topicId;
+private final int numPartitions;
+private int numMembers;
+private int minQuota = -1;
+private int extraPartitions = -1;
+private int nextRange = 0;
+
 /**
- * Member Id.
+ * Constructs a new TopicMetadata instance.
+ *
+ * @param topicId   The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembersThe number of subscribed members.
  */
-private final String memberId;
+private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) 
{
+this.topicId = topicId;
+this.numPartitions = numPartitions;
+this.numMembers = numMembers;
+}
 
 /**
- * Number of partitions required to meet the assignment quota.
+ * Computes the minimum partition quota per member and the extra 
partitions, if not already computed.
  */
-private final int remaining;
+private void maybeComputeQuota() {
+if (minQuota != -1) return;
 
-public MemberWithRemainingAssignments(String memberId, int remaining) {
-this.memberId = memberId;
-this.remaining = remaining;
+// The minimum number of partitions each member should receive for 
a balanced assignment.
+minQuota = numPartitions / numMembers;
+
+// Extra partitions to be distributed one to each member.
+extraPartitions = numPartitions % numMembers;
+}
+
+@Override
+public String toString() {
+return "TopicMetadata(topicId=" + topicId +
+", numPartitions=" + numPartitions +
+", numMembers=" + numMembers +
+", minQuota=" + minQuota +
+", extraPartitions=" + extraPartitions +
+", nextRange=" + nextRange +
+')';
 }
 }
 
 /**
- * Returns a map of topic Ids to a list of members subscribed to them,
- * based on the given assignment specification and metadata.
- *
- * @param groupSpec The specification required for 
group assignments.
- * @param subscribedTopicDescriber  The metadata describer for 
subscribed topics and clusters.
- * @return A map of topic Ids to a list of member Ids subscribed to them.
- *
- * @throws PartitionAssignorException If a member is subscribed to a 
non-existent topic.
+ * Assigns partitions to members of a homogeneous group. All members are 
subscribed to the same set of topics.
+ * Assignment will be co-partitioned when all the topics have an equal 
number of partitions.
  */
-private Map> membersPerTopic(
-final GroupSpec groupSpec,
-final SubscribedTopicDescriber subscribedTopicDescriber
-) {
-Map> membersPerTopic = new HashMap<>();
-
-if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
-Collection allMembers = groupSpec.memberIds();
-Collection topics = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
-.subscribedTopicIds();
-
-for (Uuid topicId : topics) {
-if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
-throw new PartitionAssignorException("Member is subscribed 
to a non-existent topic");
-}
-membersPerTopic.put(topicId, allMembers);
+private GroupAssignment assignHomogeneousGroup(
+GroupSpec groupSpec,
+SubscribedTopicDescriber subscribedTopicDescriber
+) throws PartitionAssignorException {
+List memberIds = sortMemberIds(groupSpec);

Review Comment:
   It seems this sorting can be delayed. On line 159, there may be exception 
thrown.
   the sorting can be done when the loop starting on line 157 finishes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-04 Thread via GitHub


C0urante commented on PR #16504:
URL: https://github.com/apache/kafka/pull/16504#issuecomment-2209498308

   Ah, looks like a fix has been published: 
https://github.com/apache/kafka/pull/16526


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-04 Thread via GitHub


C0urante commented on PR #16504:
URL: https://github.com/apache/kafka/pull/16504#issuecomment-2209490277

   @dajac @rreddy-22 It looks like this commit broke the build since there were 
new tests added 
[here](https://github.com/apache/kafka/blob/376365d9da8099e3eb9175090cc456f55985fcb0/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignorTest.java)
 a few hours before this was merged that were not updated to use the new 
`MemberSubscriptionAndAssignmentImpl` constructor.
   
   Can we either revert this commit or publish a fix PR ASAP?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-04 Thread via GitHub


dajac merged PR #16504:
URL: https://github.com/apache/kafka/pull/16504


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-04 Thread via GitHub


dajac commented on PR #16504:
URL: https://github.com/apache/kafka/pull/16504#issuecomment-2209035197

   Here are the results of the benchmarks based on the last commit: 
   
   ```
   
   Benchmark   (assignmentType)  
(assignorType)  (isRackAware)  (memberCount)  (partitionsToMemberRatio)  
(subscriptionType)  (topicCount)  Mode  CntScoreError  Units
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL   
RANGE  false100 10 
HOMOGENEOUS   100  avgt50.052 ±  0.001  ms/op
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL   
RANGE  false100 10 
HOMOGENEOUS  1000  avgt50.454 ±  0.003  ms/op
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL   
RANGE  false   1000 10 
HOMOGENEOUS   100  avgt50.476 ±  0.046  ms/op
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL   
RANGE  false   1000 10 
HOMOGENEOUS  1000  avgt53.102 ±  0.055  ms/op
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL   
RANGE  false  1 10 
HOMOGENEOUS   100  avgt55.640 ±  0.223  ms/op
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL   
RANGE  false  1 10 
HOMOGENEOUS  1000  avgt5   37.947 ±  1.000  ms/op
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL   
RANGE  false100 10   
HETEROGENEOUS   100  avgt50.172 ±  0.001  ms/op
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL   
RANGE  false100 10   
HETEROGENEOUS  1000  avgt51.882 ±  0.006  ms/op
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL   
RANGE  false   1000 10   
HETEROGENEOUS   100  avgt51.730 ±  0.036  ms/op
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL   
RANGE  false   1000 10   
HETEROGENEOUS  1000  avgt5   17.654 ±  1.160  ms/op
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL   
RANGE  false  1 10   
HETEROGENEOUS   100  avgt5   18.595 ±  0.316  ms/op
   ServerSideAssignorBenchmark.doAssignment INCREMENTAL   
RANGE  false  1 10   
HETEROGENEOUS  1000  avgt5  172.398 ±  2.251  ms/op
   JMH benchmarks done
   
   Benchmark (memberCount)  
(partitionsToMemberRatio)  (topicCount)  Mode  Cnt   Score   Error  Units
   TargetAssignmentBuilderBenchmark.build  100  
   10   100  avgt5   0.071 ± 0.004  ms/op
   TargetAssignmentBuilderBenchmark.build  100  
   10  1000  avgt5   0.428 ± 0.026  ms/op
   TargetAssignmentBuilderBenchmark.build 1000  
   10   100  avgt5   0.659 ± 0.028  ms/op
   TargetAssignmentBuilderBenchmark.build 1000  
   10  1000  avgt5   3.346 ± 0.102  ms/op
   TargetAssignmentBuilderBenchmark.build1  
   10   100  avgt5   8.947 ± 0.386  ms/op
   TargetAssignmentBuilderBenchmark.build1  
   10  1000  avgt5  40.240 ± 3.113  ms/op
   JMH benchmarks done
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-03 Thread via GitHub


dajac commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1663741172


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -65,191 +94,228 @@ public String name() {
 }
 
 /**
- * Pair of memberId and remaining partitions to meet the quota.
+ * Metadata for a topic including partition and subscription details.
  */
-private static class MemberWithRemainingAssignments {
+private static class TopicMetadata {
+private final Uuid topicId;
+private final int numPartitions;
+private int numMembers;
+private int minQuota = -1;
+private int extraPartitions = -1;
+private int nextRange = 0;
+
 /**
- * Member Id.
+ * Constructs a new TopicMetadata instance.
+ *
+ * @param topicId   The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembersThe number of subscribed members.
  */
-private final String memberId;
+private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) 
{
+this.topicId = topicId;
+this.numPartitions = numPartitions;
+this.numMembers = numMembers;
+}
 
 /**
- * Number of partitions required to meet the assignment quota.
+ * Computes the minimum partition quota per member and the extra 
partitions, if not already computed.
  */
-private final int remaining;
+private void maybeComputeQuota() {
+if (minQuota != -1) return;
 
-public MemberWithRemainingAssignments(String memberId, int remaining) {
-this.memberId = memberId;
-this.remaining = remaining;
+// The minimum number of partitions each member should receive for 
a balanced assignment.
+minQuota = numPartitions / numMembers;
+
+// Extra partitions to be distributed one to each member.
+extraPartitions = numPartitions % numMembers;
+}
+
+@Override
+public String toString() {
+return "TopicMetadata(topicId=" + topicId +
+", numPartitions=" + numPartitions +
+", numMembers=" + numMembers +
+", minQuota=" + minQuota +
+", extraPartitions=" + extraPartitions +
+", nextRange=" + nextRange +
+')';
 }
 }
 
 /**
- * Returns a map of topic Ids to a list of members subscribed to them,
- * based on the given assignment specification and metadata.
- *
- * @param groupSpec The specification required for 
group assignments.
- * @param subscribedTopicDescriber  The metadata describer for 
subscribed topics and clusters.
- * @return A map of topic Ids to a list of member Ids subscribed to them.
- *
- * @throws PartitionAssignorException If a member is subscribed to a 
non-existent topic.
+ * Assigns partitions to members of a homogeneous group. All members are 
subscribed to the same set of topics.
+ * Assignment will be co-partitioned when all the topics have an equal 
number of partitions.
  */
-private Map> membersPerTopic(
-final GroupSpec groupSpec,
-final SubscribedTopicDescriber subscribedTopicDescriber
-) {
-Map> membersPerTopic = new HashMap<>();
-
-if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
-Collection allMembers = groupSpec.memberIds();
-Collection topics = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
-.subscribedTopicIds();
-
-for (Uuid topicId : topics) {
-if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
-throw new PartitionAssignorException("Member is subscribed 
to a non-existent topic");
-}
-membersPerTopic.put(topicId, allMembers);
+private GroupAssignment assignHomogeneousGroup(
+GroupSpec groupSpec,
+SubscribedTopicDescriber subscribedTopicDescriber
+) throws PartitionAssignorException {
+List memberIds = sortMemberIds(groupSpec);
+
+MemberSubscription subs = 
groupSpec.memberSubscription(memberIds.get(0));
+Set subscribedTopics = new HashSet<>(subs.subscribedTopicIds());

Review Comment:
   It looks like copying the `subscribedTopicIds` is not necessary here because 
we only iterate on it once. What do you think?



##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -65,191 +94,228 @@ public String name() {
 }
 
 /**
- * Pair of memberId and remaining partitions to meet the quota.
+ * Metadata for a topic including partition and subscription details.
  */
-private static class 

Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-03 Thread via GitHub


rreddy-22 commented on PR #16504:
URL: https://github.com/apache/kafka/pull/16504#issuecomment-2205291263

   ```
   TargetAssignmentBuilderBenchmark.build100
 10   100  avgt50.432 ± 0.003  ms/op
   TargetAssignmentBuilderBenchmark.build100
 10  1000  avgt54.139 ± 0.013  ms/op
   TargetAssignmentBuilderBenchmark.build   1000
 10   100  avgt54.332 ± 0.049  ms/op
   TargetAssignmentBuilderBenchmark.build   1000
 10  1000  avgt5   43.449 ± 0.058  ms/op
   TargetAssignmentBuilderBenchmark.build  1
 10   100  avgt5   47.766 ± 0.389  ms/op
   TargetAssignmentBuilderBenchmark.build  1
 10  1000  avgt5  487.833 ± 3.459  ms/op
   ```
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-03 Thread via GitHub


rreddy-22 commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1663544304


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -381,9 +558,12 @@ public void 
testReassignmentNumConsumersGreaterThanNumPartitionsWhenOneConsumerA
 mkTopicAssignment(topic1Uuid, 1),
 mkTopicAssignment(topic2Uuid, 1)
 ));
+expectedAssignment.put(memberC, mkAssignment(
+mkTopicAssignment(topic1Uuid),
+mkTopicAssignment(topic2Uuid)
+));
 
-// Consumer C shouldn't get any assignment, due to stickiness A, B 
retain their assignments
-assertNull(computedAssignment.members().get(memberC));
+// Consumer C shouldn't get any assignment.

Review Comment:
   It still doesn't get any assignment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-03 Thread via GitHub


rreddy-22 commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1663541148


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -91,7 +96,10 @@ public void testOneConsumerNoTopic() {
 subscribedTopicMetadata
 );
 
-assertEquals(Collections.emptyMap(), groupAssignment.members());
+Map expectedAssignment = new HashMap<>();
+expectedAssignment.put(memberA, new 
MemberAssignmentImpl(Collections.emptyMap()));

Review Comment:
   can you help me understand why? There's no topic right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-03 Thread via GitHub


rreddy-22 commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1663541148


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -91,7 +96,10 @@ public void testOneConsumerNoTopic() {
 subscribedTopicMetadata
 );
 
-assertEquals(Collections.emptyMap(), groupAssignment.members());
+Map expectedAssignment = new HashMap<>();
+expectedAssignment.put(memberA, new 
MemberAssignmentImpl(Collections.emptyMap()));

Review Comment:
   why..? There's no topic right



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-02 Thread via GitHub


rreddy-22 commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1663537165


##
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/assignor/RangeAssignorTest.java:
##
@@ -730,7 +922,20 @@ private void assertAssignment(
 assertEquals(expectedAssignment.size(), 
computedGroupAssignment.members().size());
 for (String memberId : computedGroupAssignment.members().keySet()) {
 Map> computedAssignmentForMember = 
computedGroupAssignment.members().get(memberId).partitions();
-assertEquals(expectedAssignment.get(memberId), 
computedAssignmentForMember);
+Map> expectedAssignmentForMember = 
expectedAssignment.get(memberId);
+
+// Compare the content of the maps by entries.
+assertEquals(expectedAssignmentForMember.size(), 
computedAssignmentForMember.size());
+for (Map.Entry> entry : 
expectedAssignmentForMember.entrySet()) {
+Set expectedSet = entry.getValue();
+Set computedSet = 
computedAssignmentForMember.get(entry.getKey());
+
+// Convert both sets to HashSet for comparison.
+Set normalizedExpectedSet = new 
HashSet<>(expectedSet);
+Set normalizedComputedSet = new 
HashSet<>(computedSet);
+
+assertEquals(normalizedExpectedSet, normalizedComputedSet);
+}

Review Comment:
   yeah, I think it didn't work initially, but changed it now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-02 Thread via GitHub


rreddy-22 commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1663330737


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * A {@code RangeSet} represents a range of integers from {@code from} 
(inclusive)
+ * to {@code to} (exclusive).
+ * This implementation provides a view over a continuous range of integers 
without actually storing them.
+ */
+public class RangeSet implements Set {
+private final int from;
+private final int to;
+
+/**
+ * Constructs a {@code RangeSet} with the specified range.
+ *
+ * @param from  The starting value (inclusive) of the range.
+ * @param toThe ending value (exclusive) of the range.
+ */
+public RangeSet(int from, int to) {
+this.from = from;
+this.to = to;
+}
+
+@Override
+public int size() {
+return to - from;
+}
+
+@Override
+public boolean isEmpty() {
+return size() == 0;
+}
+
+@Override
+public boolean contains(Object o) {
+if (o instanceof Integer) {
+int value = (Integer) o;
+return value >= from && value < to;
+}
+return false;
+}
+
+@Override
+public Iterator iterator() {
+return new Iterator() {
+private int current = from;
+
+@Override
+public boolean hasNext() {
+return current < to;
+}
+
+@Override
+public Integer next() {
+if (!hasNext()) throw new NoSuchElementException();
+return current++;
+}
+};
+}
+
+@Override
+public Object[] toArray() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public  T[] toArray(T[] a) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean add(Integer integer) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean remove(Object o) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean containsAll(Collection c) {
+for (Object o : c) {
+if (!contains(o)) return false;
+}
+return true;
+}
+
+@Override
+public boolean addAll(Collection c) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean retainAll(Collection c) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean removeAll(Collection c) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void clear() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public String toString() {
+StringBuilder sb = new StringBuilder();
+sb.append("[");
+for (int i = from; i < to; i++) {
+sb.append(i);
+if (i < to - 1) {
+sb.append(", ");
+}
+}
+sb.append("]");
+return sb.toString();

Review Comment:
   Okie I changed it!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-02 Thread via GitHub


rreddy-22 commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1663307155


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -65,191 +94,233 @@ public String name() {
 }
 
 /**
- * Pair of memberId and remaining partitions to meet the quota.
+ * Metadata for a topic including partition and subscription details.
  */
-private static class MemberWithRemainingAssignments {
+private static class TopicMetadata {
+public final Uuid topicId;
+public final int numPartitions;
+public int numMembers;
+
+public int minQuota = -1;
+public int extraPartitions = -1;
+public int nextRange = 0;
+
+/**
+ * Constructs a new TopicMetadata instance.
+ *
+ * @param topicId   The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembersThe number of subscribed members.
+ */
+private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) 
{
+this.topicId = topicId;
+this.numPartitions = numPartitions;
+this.numMembers = numMembers;
+}
+
 /**
- * Member Id.
+ * Factory method to create a TopicMetadata instance.
+ *
+ * @param topicId   The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembersThe number of subscribed members.
+ * @return A new TopicMetadata instance.
  */
-private final String memberId;
+public static TopicMetadata create(Uuid topicId, int numPartitions, 
int numMembers) {
+return new TopicMetadata(topicId, numPartitions, numMembers);
+}
 
 /**
- * Number of partitions required to meet the assignment quota.
+ * Computes the minimum partition quota per member and the extra 
partitions, if not already computed.
  */
-private final int remaining;
+void maybeComputeQuota() {
+// The minimum number of partitions each member should receive for 
a balanced assignment.
+if (minQuota != -1) return;
+minQuota = numPartitions / numMembers;
+
+// Extra partitions to be distributed one to each member.
+extraPartitions = numPartitions % numMembers;
+}
 
-public MemberWithRemainingAssignments(String memberId, int remaining) {
-this.memberId = memberId;
-this.remaining = remaining;
+@Override
+public String toString() {
+return "TopicMetadata{" +
+"topicId=" + topicId +
+", numPartitions=" + numPartitions +
+", numMembers=" + numMembers +
+", minQuota=" + minQuota +
+", extraPartitions=" + extraPartitions +
+", nextRange=" + nextRange +
+'}';
 }
 }
 
 /**
- * Returns a map of topic Ids to a list of members subscribed to them,
- * based on the given assignment specification and metadata.
- *
- * @param groupSpec The specification required for 
group assignments.
- * @param subscribedTopicDescriber  The metadata describer for 
subscribed topics and clusters.
- * @return A map of topic Ids to a list of member Ids subscribed to them.
- *
- * @throws PartitionAssignorException If a member is subscribed to a 
non-existent topic.
+ * Assigns partitions to members of a homogeneous group. All members are 
subscribed to the same set of topics.
+ * Assignment will be co-partitioned when all the topics have an equal 
number of partitions.
  */
-private Map> membersPerTopic(
-final GroupSpec groupSpec,
-final SubscribedTopicDescriber subscribedTopicDescriber
-) {
-Map> membersPerTopic = new HashMap<>();
-
-if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
-Collection allMembers = groupSpec.memberIds();
-Collection topics = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
-.subscribedTopicIds();
-
-for (Uuid topicId : topics) {
-if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
-throw new PartitionAssignorException("Member is subscribed 
to a non-existent topic");
-}
-membersPerTopic.put(topicId, allMembers);
+private GroupAssignment assignHomogeneousGroup(
+GroupSpec groupSpec,
+SubscribedTopicDescriber subscribedTopicDescriber
+) throws PartitionAssignorException {
+List memberIds = sortMemberIds(groupSpec);
+
+MemberSubscription subs = 
groupSpec.memberSubscription(memberIds.get(0));
+Set subscribedTopics = new 

Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-02 Thread via GitHub


rreddy-22 commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1663277435


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * A {@code RangeSet} represents a range of integers from {@code from} 
(inclusive)
+ * to {@code to} (exclusive).
+ * This implementation provides a view over a continuous range of integers 
without actually storing them.
+ */
+public class RangeSet implements Set {
+private final int from;
+private final int to;
+
+/**
+ * Constructs a {@code RangeSet} with the specified range.
+ *
+ * @param from  The starting value (inclusive) of the range.
+ * @param toThe ending value (exclusive) of the range.
+ */
+public RangeSet(int from, int to) {
+this.from = from;
+this.to = to;
+}
+
+@Override
+public int size() {
+return to - from;
+}
+
+@Override
+public boolean isEmpty() {
+return size() == 0;
+}
+
+@Override
+public boolean contains(Object o) {
+if (o instanceof Integer) {
+int value = (Integer) o;
+return value >= from && value < to;
+}
+return false;
+}
+
+@Override
+public Iterator iterator() {
+return new Iterator() {
+private int current = from;
+
+@Override
+public boolean hasNext() {
+return current < to;
+}
+
+@Override
+public Integer next() {
+if (!hasNext()) throw new NoSuchElementException();
+return current++;
+}
+};
+}
+
+@Override
+public Object[] toArray() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public  T[] toArray(T[] a) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean add(Integer integer) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean remove(Object o) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean containsAll(Collection c) {
+for (Object o : c) {
+if (!contains(o)) return false;
+}
+return true;
+}
+
+@Override
+public boolean addAll(Collection c) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean retainAll(Collection c) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean removeAll(Collection c) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void clear() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public String toString() {
+StringBuilder sb = new StringBuilder();
+sb.append("[");
+for (int i = from; i < to; i++) {
+sb.append(i);
+if (i < to - 1) {
+sb.append(", ");
+}
+}
+sb.append("]");
+return sb.toString();
+}
+
+/**
+ * Compares the specified object with this set for equality. Returns 
{@code true}
+ * if the specified object is also a set, the two sets have the same size, 
and
+ * every member of the specified set is contained in this set.
+ *
+ * @param o object to be compared for equality with this set
+ * @return {@code true} if the specified object is equal to this set
+ */
+@Override
+public boolean equals(Object o) {
+if (this == o) return true;
+if (!(o instanceof Set)) return false;
+
+Set otherSet = (Set) o;
+if (otherSet.size() != this.size()) return false;
+
+for (int i = from; i < to; i++) {
+if (!otherSet.contains(i)) return false;
+}

Review Comment:
   yess makes sense



-- 
This is an automated 

Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-02 Thread via GitHub


rreddy-22 commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1663251568


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -65,191 +94,233 @@ public String name() {
 }
 
 /**
- * Pair of memberId and remaining partitions to meet the quota.
+ * Metadata for a topic including partition and subscription details.
  */
-private static class MemberWithRemainingAssignments {
+private static class TopicMetadata {
+public final Uuid topicId;
+public final int numPartitions;
+public int numMembers;
+
+public int minQuota = -1;
+public int extraPartitions = -1;
+public int nextRange = 0;
+
+/**
+ * Constructs a new TopicMetadata instance.
+ *
+ * @param topicId   The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembersThe number of subscribed members.
+ */
+private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) 
{
+this.topicId = topicId;
+this.numPartitions = numPartitions;
+this.numMembers = numMembers;
+}
+
 /**
- * Member Id.
+ * Factory method to create a TopicMetadata instance.
+ *
+ * @param topicId   The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembersThe number of subscribed members.
+ * @return A new TopicMetadata instance.
  */
-private final String memberId;
+public static TopicMetadata create(Uuid topicId, int numPartitions, 
int numMembers) {
+return new TopicMetadata(topicId, numPartitions, numMembers);
+}
 
 /**
- * Number of partitions required to meet the assignment quota.
+ * Computes the minimum partition quota per member and the extra 
partitions, if not already computed.
  */
-private final int remaining;
+void maybeComputeQuota() {
+// The minimum number of partitions each member should receive for 
a balanced assignment.
+if (minQuota != -1) return;
+minQuota = numPartitions / numMembers;
+
+// Extra partitions to be distributed one to each member.
+extraPartitions = numPartitions % numMembers;
+}
 
-public MemberWithRemainingAssignments(String memberId, int remaining) {
-this.memberId = memberId;
-this.remaining = remaining;
+@Override
+public String toString() {
+return "TopicMetadata{" +
+"topicId=" + topicId +
+", numPartitions=" + numPartitions +
+", numMembers=" + numMembers +
+", minQuota=" + minQuota +
+", extraPartitions=" + extraPartitions +
+", nextRange=" + nextRange +
+'}';
 }
 }
 
 /**
- * Returns a map of topic Ids to a list of members subscribed to them,
- * based on the given assignment specification and metadata.
- *
- * @param groupSpec The specification required for 
group assignments.
- * @param subscribedTopicDescriber  The metadata describer for 
subscribed topics and clusters.
- * @return A map of topic Ids to a list of member Ids subscribed to them.
- *
- * @throws PartitionAssignorException If a member is subscribed to a 
non-existent topic.
+ * Assigns partitions to members of a homogeneous group. All members are 
subscribed to the same set of topics.
+ * Assignment will be co-partitioned when all the topics have an equal 
number of partitions.
  */
-private Map> membersPerTopic(
-final GroupSpec groupSpec,
-final SubscribedTopicDescriber subscribedTopicDescriber
-) {
-Map> membersPerTopic = new HashMap<>();
-
-if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
-Collection allMembers = groupSpec.memberIds();
-Collection topics = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
-.subscribedTopicIds();
-
-for (Uuid topicId : topics) {
-if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
-throw new PartitionAssignorException("Member is subscribed 
to a non-existent topic");
-}
-membersPerTopic.put(topicId, allMembers);
+private GroupAssignment assignHomogeneousGroup(
+GroupSpec groupSpec,
+SubscribedTopicDescriber subscribedTopicDescriber
+) throws PartitionAssignorException {
+List memberIds = sortMemberIds(groupSpec);
+
+MemberSubscription subs = 
groupSpec.memberSubscription(memberIds.get(0));
+Set subscribedTopics = new 

Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-02 Thread via GitHub


rreddy-22 commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1663248450


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -65,191 +94,233 @@ public String name() {
 }
 
 /**
- * Pair of memberId and remaining partitions to meet the quota.
+ * Metadata for a topic including partition and subscription details.
  */
-private static class MemberWithRemainingAssignments {
+private static class TopicMetadata {
+public final Uuid topicId;
+public final int numPartitions;
+public int numMembers;
+
+public int minQuota = -1;
+public int extraPartitions = -1;
+public int nextRange = 0;
+
+/**
+ * Constructs a new TopicMetadata instance.
+ *
+ * @param topicId   The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembersThe number of subscribed members.
+ */
+private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) 
{
+this.topicId = topicId;
+this.numPartitions = numPartitions;
+this.numMembers = numMembers;
+}
+
 /**
- * Member Id.
+ * Factory method to create a TopicMetadata instance.
+ *
+ * @param topicId   The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembersThe number of subscribed members.
+ * @return A new TopicMetadata instance.
  */
-private final String memberId;
+public static TopicMetadata create(Uuid topicId, int numPartitions, 
int numMembers) {
+return new TopicMetadata(topicId, numPartitions, numMembers);
+}
 
 /**
- * Number of partitions required to meet the assignment quota.
+ * Computes the minimum partition quota per member and the extra 
partitions, if not already computed.
  */
-private final int remaining;
+void maybeComputeQuota() {
+// The minimum number of partitions each member should receive for 
a balanced assignment.
+if (minQuota != -1) return;
+minQuota = numPartitions / numMembers;
+
+// Extra partitions to be distributed one to each member.
+extraPartitions = numPartitions % numMembers;
+}
 
-public MemberWithRemainingAssignments(String memberId, int remaining) {
-this.memberId = memberId;
-this.remaining = remaining;
+@Override
+public String toString() {
+return "TopicMetadata{" +
+"topicId=" + topicId +
+", numPartitions=" + numPartitions +
+", numMembers=" + numMembers +
+", minQuota=" + minQuota +
+", extraPartitions=" + extraPartitions +
+", nextRange=" + nextRange +
+'}';
 }
 }
 
 /**
- * Returns a map of topic Ids to a list of members subscribed to them,
- * based on the given assignment specification and metadata.
- *
- * @param groupSpec The specification required for 
group assignments.
- * @param subscribedTopicDescriber  The metadata describer for 
subscribed topics and clusters.
- * @return A map of topic Ids to a list of member Ids subscribed to them.
- *
- * @throws PartitionAssignorException If a member is subscribed to a 
non-existent topic.
+ * Assigns partitions to members of a homogeneous group. All members are 
subscribed to the same set of topics.
+ * Assignment will be co-partitioned when all the topics have an equal 
number of partitions.
  */
-private Map> membersPerTopic(
-final GroupSpec groupSpec,
-final SubscribedTopicDescriber subscribedTopicDescriber
-) {
-Map> membersPerTopic = new HashMap<>();
-
-if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
-Collection allMembers = groupSpec.memberIds();
-Collection topics = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
-.subscribedTopicIds();
-
-for (Uuid topicId : topics) {
-if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
-throw new PartitionAssignorException("Member is subscribed 
to a non-existent topic");
-}
-membersPerTopic.put(topicId, allMembers);
+private GroupAssignment assignHomogeneousGroup(
+GroupSpec groupSpec,
+SubscribedTopicDescriber subscribedTopicDescriber
+) throws PartitionAssignorException {
+List memberIds = sortMemberIds(groupSpec);
+
+MemberSubscription subs = 
groupSpec.memberSubscription(memberIds.get(0));
+Set subscribedTopics = new 

Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-02 Thread via GitHub


dajac commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1662942561


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeSet.java:
##
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
+import java.util.Set;
+
+/**
+ * A {@code RangeSet} represents a range of integers from {@code from} 
(inclusive)
+ * to {@code to} (exclusive).
+ * This implementation provides a view over a continuous range of integers 
without actually storing them.
+ */
+public class RangeSet implements Set {
+private final int from;
+private final int to;
+
+/**
+ * Constructs a {@code RangeSet} with the specified range.
+ *
+ * @param from  The starting value (inclusive) of the range.
+ * @param toThe ending value (exclusive) of the range.
+ */
+public RangeSet(int from, int to) {
+this.from = from;
+this.to = to;
+}
+
+@Override
+public int size() {
+return to - from;
+}
+
+@Override
+public boolean isEmpty() {
+return size() == 0;
+}
+
+@Override
+public boolean contains(Object o) {
+if (o instanceof Integer) {
+int value = (Integer) o;
+return value >= from && value < to;
+}
+return false;
+}
+
+@Override
+public Iterator iterator() {
+return new Iterator() {
+private int current = from;
+
+@Override
+public boolean hasNext() {
+return current < to;
+}
+
+@Override
+public Integer next() {
+if (!hasNext()) throw new NoSuchElementException();
+return current++;
+}
+};
+}
+
+@Override
+public Object[] toArray() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public  T[] toArray(T[] a) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean add(Integer integer) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean remove(Object o) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean containsAll(Collection c) {
+for (Object o : c) {
+if (!contains(o)) return false;
+}
+return true;
+}
+
+@Override
+public boolean addAll(Collection c) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean retainAll(Collection c) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public boolean removeAll(Collection c) {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public void clear() {
+throw new UnsupportedOperationException();
+}
+
+@Override
+public String toString() {
+StringBuilder sb = new StringBuilder();
+sb.append("[");
+for (int i = from; i < to; i++) {
+sb.append(i);
+if (i < to - 1) {
+sb.append(", ");
+}
+}
+sb.append("]");
+return sb.toString();

Review Comment:
   The short version seems better to me but it may be a personal preference. 
I’d also like to have the type printed out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] KAFKA-16944: Rewrite Range Assignor [kafka]

2024-07-02 Thread via GitHub


dajac commented on code in PR #16504:
URL: https://github.com/apache/kafka/pull/16504#discussion_r1662897722


##
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/RangeAssignor.java:
##
@@ -65,191 +94,233 @@ public String name() {
 }
 
 /**
- * Pair of memberId and remaining partitions to meet the quota.
+ * Metadata for a topic including partition and subscription details.
  */
-private static class MemberWithRemainingAssignments {
+private static class TopicMetadata {
+public final Uuid topicId;
+public final int numPartitions;
+public int numMembers;
+
+public int minQuota = -1;
+public int extraPartitions = -1;
+public int nextRange = 0;
+
+/**
+ * Constructs a new TopicMetadata instance.
+ *
+ * @param topicId   The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembersThe number of subscribed members.
+ */
+private TopicMetadata(Uuid topicId, int numPartitions, int numMembers) 
{
+this.topicId = topicId;
+this.numPartitions = numPartitions;
+this.numMembers = numMembers;
+}
+
 /**
- * Member Id.
+ * Factory method to create a TopicMetadata instance.
+ *
+ * @param topicId   The topic Id.
+ * @param numPartitions The number of partitions.
+ * @param numMembersThe number of subscribed members.
+ * @return A new TopicMetadata instance.
  */
-private final String memberId;
+public static TopicMetadata create(Uuid topicId, int numPartitions, 
int numMembers) {
+return new TopicMetadata(topicId, numPartitions, numMembers);
+}
 
 /**
- * Number of partitions required to meet the assignment quota.
+ * Computes the minimum partition quota per member and the extra 
partitions, if not already computed.
  */
-private final int remaining;
+void maybeComputeQuota() {
+// The minimum number of partitions each member should receive for 
a balanced assignment.
+if (minQuota != -1) return;
+minQuota = numPartitions / numMembers;
+
+// Extra partitions to be distributed one to each member.
+extraPartitions = numPartitions % numMembers;
+}
 
-public MemberWithRemainingAssignments(String memberId, int remaining) {
-this.memberId = memberId;
-this.remaining = remaining;
+@Override
+public String toString() {
+return "TopicMetadata{" +
+"topicId=" + topicId +
+", numPartitions=" + numPartitions +
+", numMembers=" + numMembers +
+", minQuota=" + minQuota +
+", extraPartitions=" + extraPartitions +
+", nextRange=" + nextRange +
+'}';
 }
 }
 
 /**
- * Returns a map of topic Ids to a list of members subscribed to them,
- * based on the given assignment specification and metadata.
- *
- * @param groupSpec The specification required for 
group assignments.
- * @param subscribedTopicDescriber  The metadata describer for 
subscribed topics and clusters.
- * @return A map of topic Ids to a list of member Ids subscribed to them.
- *
- * @throws PartitionAssignorException If a member is subscribed to a 
non-existent topic.
+ * Assigns partitions to members of a homogeneous group. All members are 
subscribed to the same set of topics.
+ * Assignment will be co-partitioned when all the topics have an equal 
number of partitions.
  */
-private Map> membersPerTopic(
-final GroupSpec groupSpec,
-final SubscribedTopicDescriber subscribedTopicDescriber
-) {
-Map> membersPerTopic = new HashMap<>();
-
-if (groupSpec.subscriptionType().equals(HOMOGENEOUS)) {
-Collection allMembers = groupSpec.memberIds();
-Collection topics = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next())
-.subscribedTopicIds();
-
-for (Uuid topicId : topics) {
-if (subscribedTopicDescriber.numPartitions(topicId) == -1) {
-throw new PartitionAssignorException("Member is subscribed 
to a non-existent topic");
-}
-membersPerTopic.put(topicId, allMembers);
+private GroupAssignment assignHomogeneousGroup(
+GroupSpec groupSpec,
+SubscribedTopicDescriber subscribedTopicDescriber
+) throws PartitionAssignorException {
+List memberIds = sortMemberIds(groupSpec);
+
+MemberSubscription subs = 
groupSpec.memberSubscription(memberIds.get(0));
+Set subscribedTopics = new