rreddy-22 commented on code in PR #14481:
URL: https://github.com/apache/kafka/pull/14481#discussion_r1384454036


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/GeneralUniformAssignmentBuilder.java:
##########
@@ -14,17 +14,828 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.kafka.coordinator.group.assignor;
 
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.common.TopicIdPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.stream.Collectors;
+
 public class GeneralUniformAssignmentBuilder extends 
AbstractUniformAssignmentBuilder {
     private static final Logger LOG = 
LoggerFactory.getLogger(GeneralUniformAssignmentBuilder.class);
 
+    /**
+     * The member metadata obtained from the assignment specification.
+     */
+    private final Map<String, AssignmentMemberSpec> members;
+
+    /**
+     * The topic and partition metadata describer.
+     */
+    private final SubscribedTopicDescriber subscribedTopicDescriber;
+
+    /**
+     * The list of all the topic Ids that the consumer group is subscribed to.
+     */
+    private final Set<Uuid> subscriptionIds;
+
+    /**
+     * Rack information.
+     */
+    private final RackInfo rackInfo;
+
+    /**
+     * List of subscribed members for each topic.
+     */
+    private final Map<Uuid, List<String>> membersPerTopic;
+
+    /**
+     * List of all members sorted by their assignment sizes.
+     */
+    private final TreeSet<String> sortedMembersByAssignmentSize;
+
+    /**
+     * The partitions that still need to be assigned.
+     */
+    private final Set<TopicIdPartition> unassignedPartitions;
+
+    /**
+     * All the partitions that are retained from the existing assignment.
+     */
+    private final Set<TopicIdPartition> assignedStickyPartitions;
+
+    /**
+     * Maintains a sorted set of consumers based on how many topic partitions 
are already assigned to them.
+     */
+    private final AssignmentManager assignmentManager;
+
+    /**
+     * Tracks the owner of each partition in the existing assignment on the 
client side.
+     *
+     * Only populated with partitions that weren't retained due to a rack 
mismatch when rack aware strategy is used.
+     */
+    private final Map<TopicIdPartition, String> currentPartitionOwners;
+
+    /**
+     * Tracks the owner of each partition in the target assignment.
+     */
+    private final Map<TopicIdPartition, String> 
partitionOwnerInTargetAssignment;
+
+    /**
+     * Handles all operations related to partition movements during a 
reassignment for balancing the target assignment.
+     */
+    private PartitionMovements partitionMovements;
+
+    /**
+     * The new assignment that will be returned.
+     */
+    private final Map<String, MemberAssignment> targetAssignment;
+
+    public GeneralUniformAssignmentBuilder(AssignmentSpec assignmentSpec, 
SubscribedTopicDescriber subscribedTopicDescriber) {
+        this.members = assignmentSpec.members();
+        this.subscribedTopicDescriber = subscribedTopicDescriber;
+        this.subscriptionIds = assignmentSpec.members().values().stream()
+            .flatMap(memberSpec -> memberSpec.subscribedTopicIds().stream())
+            .peek(topicId -> {
+                int partitionCount = 
subscribedTopicDescriber.numPartitions(topicId);
+                if (partitionCount == -1) {
+                    throw new PartitionAssignorException(
+                        "Members are subscribed to topic " + topicId + " which 
doesn't exist in the topic metadata."
+                    );
+                }
+            })
+            .collect(Collectors.toSet());
+        this.rackInfo = new RackInfo(assignmentSpec, subscribedTopicDescriber, 
subscriptionIds);
+        this.membersPerTopic = new HashMap<>();
+        members.forEach((memberId, memberMetadata) -> {
+            Collection<Uuid> topics = memberMetadata.subscribedTopicIds();
+            topics.forEach(topicId ->
+                membersPerTopic.computeIfAbsent(topicId, k -> new 
ArrayList<>()).add(memberId)
+            );
+        });
+        this.unassignedPartitions = new 
HashSet<>(allTopicIdPartitions(subscriptionIds, subscribedTopicDescriber));
+        this.assignedStickyPartitions = new HashSet<>();
+        this.assignmentManager = new AssignmentManager();
+        this.sortedMembersByAssignmentSize = 
assignmentManager.getSortedMembersByAssignmentSize(members.keySet());
+        this.currentPartitionOwners = new HashMap<>();
+        this.partitionOwnerInTargetAssignment = new HashMap<>();
+        this.targetAssignment = new HashMap<>();
+    }
+
     @Override
     protected GroupAssignment buildAssignment() {
-        return null;
+        if (subscriptionIds.isEmpty()) {
+            LOG.info("The subscription list is empty, returning an empty 
assignment");
+            return new GroupAssignment(Collections.emptyMap());
+        }
+
+        members.keySet().forEach(memberId -> targetAssignment.put(memberId, 
new MemberAssignment(new HashMap<>())));

Review Comment:
   nit: Could you give me a suggestion for the formatting of this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to