apoorvmittal10 commented on code in PR #19977:
URL: https://github.com/apache/kafka/pull/19977#discussion_r2155592583


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleHomogeneousAssignmentBuilder.java:
##########
@@ -196,10 +196,10 @@ public GroupAssignment build() {
 
         revokeUnassignablePartitions();
 
-        revokeOversharedPartitions();
-
         revokeOverfilledMembers();
 
+        revokeOversharedPartitions();

Review Comment:
   Does the order matter, if yes then shall we please write in comments so 
future refactoring can consider that? The reason I asked is beacuse I noticed 
that you moved the calls from last commit, hence wanted to be sure.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java:
##########
@@ -56,155 +58,80 @@
  * Balance is prioritized above stickiness.
  */
 public class SimpleAssignor implements ShareGroupPartitionAssignor {
-
+    private static final Logger LOG = 
LoggerFactory.getLogger(SimpleAssignor.class);

Review Comment:
   nit: almost all files in Kafka uses `log`.
   ```suggestion
       private static final Logger log = 
LoggerFactory.getLogger(SimpleAssignor.class);
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleAssignor.java:
##########
@@ -56,155 +58,80 @@
  * Balance is prioritized above stickiness.
  */
 public class SimpleAssignor implements ShareGroupPartitionAssignor {
-
+    private static final Logger LOG = 
LoggerFactory.getLogger(SimpleAssignor.class);
     private static final String SIMPLE_ASSIGNOR_NAME = "simple";
 
+    /**
+     * Unique name for this assignor.
+     */
     @Override
     public String name() {
         return SIMPLE_ASSIGNOR_NAME;
     }
 
+    /**
+     * Assigns partitions to group members based on the given assignment 
specification and topic metadata.
+     *
+     * @param groupSpec                The assignment spec which includes 
member metadata.
+     * @param subscribedTopicDescriber The topic and partition metadata 
describer.
+     * @return The new assignment for the group.
+     */

Review Comment:
   Seems same as defined in `PartitionAssignor`, do we require this?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleHomogeneousAssignmentBuilder.java:
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
+import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
+import 
org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
+import org.apache.kafka.server.common.TopicIdPartition;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.assignor.SimpleAssignor.computeTargetPartitions;
+import static 
org.apache.kafka.coordinator.group.assignor.SimpleAssignor.newHashMap;
+import static 
org.apache.kafka.coordinator.group.assignor.SimpleAssignor.newHashSet;

Review Comment:
   nit: static methods reference back to tha caller. May be better to put these 
helper static methods in AssignorHelpers.java.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SimpleHomogeneousAssignmentBuilder.java:
##########
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.api.assignor.GroupAssignment;
+import org.apache.kafka.coordinator.group.api.assignor.GroupSpec;
+import org.apache.kafka.coordinator.group.api.assignor.MemberAssignment;
+import 
org.apache.kafka.coordinator.group.api.assignor.SubscribedTopicDescriber;
+import org.apache.kafka.coordinator.group.modern.MemberAssignmentImpl;
+import org.apache.kafka.server.common.TopicIdPartition;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static 
org.apache.kafka.coordinator.group.assignor.SimpleAssignor.computeTargetPartitions;
+import static 
org.apache.kafka.coordinator.group.assignor.SimpleAssignor.newHashMap;
+import static 
org.apache.kafka.coordinator.group.assignor.SimpleAssignor.newHashSet;
+
+/**
+ * The homogeneous simple assignment builder is used to generate the target 
assignment for a share group with
+ * all its members subscribed to the same set of topics.
+ * <p>
+ * Assignments are done according to the following principles:
+ * <ol>
+ *   <li>Balance:          Ensure partitions are distributed equally among all 
members.
+ *                         The difference in assignments sizes between any two 
members
+ *                         should not exceed one partition.</li>
+ *   <li>Stickiness:       Minimize partition movements among members by 
retaining
+ *                         as much of the existing assignment as possible.</li>
+ * </ol>
+ * <p>
+ * Balance is prioritized above stickiness.
+ */
+public class SimpleHomogeneousAssignmentBuilder {
+
+    /**
+     * The list of all the topic Ids that the share group is subscribed to.
+     */
+    private final Set<Uuid> subscribedTopicIds;
+
+    /**
+     * The list of members in the consumer group.
+     */
+    private final List<String> memberIds;
+
+    /**
+     * Maps member ids to their indices in the memberIds list.
+     */
+    private final Map<String, Integer> memberIndices;
+
+    /**
+     * The list of all the topic-partitions assignable for the share group.
+     */
+    private final List<TopicIdPartition> targetPartitions;
+
+    /**
+     * The number of members in the share group.
+     */
+    private final int numGroupMembers;
+
+    /**
+     * The desired sharing for each target partition.
+     * For entirely balanced assignment, we would expect (numTargetPartitions 
/ numGroupMembers) partitions per member, rounded upwards.
+     * That can be expressed as:  Math.ceil(numTargetPartitions / (double) 
numGroupMembers)
+     */
+    private final int desiredSharing;
+
+    /**
+     * The desired number of assignments for each share group member.
+     * <p>
+     * Members are stored as integer indices into the memberIds array.
+     */
+    private final int[] desiredAssignmentCount;
+
+    /**
+     * The share group assignment from the group metadata specification at the 
start of the assignment operation.
+     * <p>
+     * Members are stored as integer indices into the memberIds array.
+     */
+    private final Map<Integer, Map<Uuid, Set<Integer>>> oldGroupAssignment;
+
+    /**
+     * The share group assignment calculated iteratively by the assignment 
operation. Entries in this map override those
+     * in the old group assignment map.
+     * <p>
+     * Members are stored as integer indices into the memberIds array.
+     */
+    private final Map<Integer, Map<Uuid, Set<Integer>>> newGroupAssignment;
+
+    /**
+     * The final assignment keyed by topic-partition mapping to member.
+     * <p>
+     * Members are stored as integer indices into the memberIds array.
+     */
+    private final Map<TopicIdPartition, Set<Integer>> 
finalAssignmentByPartition;
+
+    /**
+     * The final assignment keyed by member ID mapping to topic-partitions.
+     * <p>
+     * Members are stored as integer indices into the memberIds array.
+     */
+    private final Map<Integer, Set<TopicIdPartition>> finalAssignmentByMember;
+
+    /**
+     * The set of members which have too few assigned partitions.
+     * <p>
+     * Members are stored as integer indices into the memberIds array.
+     */
+    private final Set<Integer> unfilledMembers;
+
+    /**
+     * The set of members which have too many assigned partitions.
+     * <p>
+     * Members are stored as integer indices into the memberIds array.
+     */
+    private final Set<Integer> overfilledMembers;
+
+    public SimpleHomogeneousAssignmentBuilder(GroupSpec groupSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+        this.subscribedTopicIds = 
groupSpec.memberSubscription(groupSpec.memberIds().iterator().next()).subscribedTopicIds();
+

Review Comment:
   Do we need a check here as we had earlier?
   
   ```
   if (subscribedTopicIds.isEmpty())
               return new GroupAssignment(Map.of());



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to