dajac commented on code in PR #14099:
URL: https://github.com/apache/kafka/pull/14099#discussion_r1274915017


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java:
##########
@@ -756,4 +759,24 @@ public void testNewOffsetCommitTombstoneRecord() {
         Record record = 
RecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo", 1);
         assertEquals(expectedRecord, record);
     }
+

Review Comment:
   Should we add a test which verify that partitions with no racks are not put 
into the record?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##########
@@ -131,13 +131,24 @@ public static Record newGroupSubscriptionMetadataRecord(
         Map<String, TopicMetadata> newSubscriptionMetadata
     ) {
         ConsumerGroupPartitionMetadataValue value = new 
ConsumerGroupPartitionMetadataValue();
-        newSubscriptionMetadata.forEach((topicName, topicMetadata) ->
+        newSubscriptionMetadata.forEach((topicName, topicMetadata) -> {
+            List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> 
partitionMetadata = new ArrayList<>();
+            if (!topicMetadata.partitionRacks().isEmpty()) {
+                topicMetadata.partitionRacks().forEach((partition, racks) -> {
+                    partitionMetadata.add(new 
ConsumerGroupPartitionMetadataValue.PartitionMetadata()
+                        .setPartition(partition)
+                        .setRacks(new ArrayList<>(racks))
+                    );
+                });
+            }
+            // If partition rack information is empty, store an empty list in 
the record.

Review Comment:
   nit: Would it make sense to put this comment before the `if` statement?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/RecordHelpersTest.java:
##########
@@ -612,15 +616,14 @@ public void 
testNewGroupMetadataRecordThrowsWhenEmptyAssignment() {
                 MetadataVersion.IBP_3_5_IV2
             ));
     }
-

Review Comment:
   nit: Let's revert this.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/RecordHelpers.java:
##########
@@ -131,13 +131,24 @@ public static Record newGroupSubscriptionMetadataRecord(
         Map<String, TopicMetadata> newSubscriptionMetadata
     ) {
         ConsumerGroupPartitionMetadataValue value = new 
ConsumerGroupPartitionMetadataValue();
-        newSubscriptionMetadata.forEach((topicName, topicMetadata) ->
+        newSubscriptionMetadata.forEach((topicName, topicMetadata) -> {
+            List<ConsumerGroupPartitionMetadataValue.PartitionMetadata> 
partitionMetadata = new ArrayList<>();
+            if (!topicMetadata.partitionRacks().isEmpty()) {
+                topicMetadata.partitionRacks().forEach((partition, racks) -> {
+                    partitionMetadata.add(new 
ConsumerGroupPartitionMetadataValue.PartitionMetadata()
+                        .setPartition(partition)
+                        .setRacks(new ArrayList<>(racks))
+                    );
+                });
+            }
+            // If partition rack information is empty, store an empty list in 
the record.
             value.topics().add(new 
ConsumerGroupPartitionMetadataValue.TopicMetadata()
                 .setTopicId(topicMetadata.id())
                 .setTopicName(topicMetadata.name())
                 .setNumPartitions(topicMetadata.numPartitions())
-            )
-        );
+                .setPartitionMetadata(partitionMetadata.isEmpty() ? 
Collections.emptyList() : partitionMetadata)

Review Comment:
   nit: Why can't we use `partitionMetadata` if it is empty? That seems just 
fine.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java:
##########
@@ -26,18 +26,17 @@
  */
 @InterfaceStability.Unstable
 public interface PartitionAssignor {
-
     /**
      * Unique name for this assignor.
      */
     String name();
 
     /**
-     * Perform the group assignment given the current members and
-     * topic metadata.
+     * Assigns partitions to group members based on the given assignment 
specification and topic metadata.
      *
-     * @param assignmentSpec The assignment spec.
+     * @param assignmentSpec           The assignment spec which included 
member metadata.

Review Comment:
   nit: `includes`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The subscribed topic metadata class is used by the {@link 
PartitionAssignor} to obtain
+ * topic and partition metadata for the topics that the consumer group is 
subscribed to.
+ */
+public class SubscribedTopicMetadata implements SubscribedTopicDescriber {
+    /**
+     * The topic IDs mapped to their corresponding {@link TopicMetadata}
+     * object, which contains topic and partition metadata.
+     */
+    Map<Uuid, TopicMetadata> topicMetadata;
+
+    public SubscribedTopicMetadata(Map<Uuid, TopicMetadata> topicMetadata) {
+        this.topicMetadata = topicMetadata;
+    }
+
+    /**
+     * The number of partitions for the given topic ID.
+     *
+     * @param topicId   Uuid corresponding to the topic.
+     * @return The number of partitions corresponding to the given topic ID,
+     *         or -1 if the topic ID does not exist.
+     */
+    @Override
+    public int numPartitions(Uuid topicId) {
+        return this.topicMetadata.containsKey(topicId) ? 
this.topicMetadata.get(topicId).numPartitions() : -1;

Review Comment:
   nit: It is usually better to write this as follow:
   
   ```
   TopicMetadata topic = this.topicMetadata.get(topicId);
   return topic == null ? -1 : topic.numPartitions();
   ```
   
   This ways you do only one lookup.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/SubscribedTopicDescriber.java:
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.assignor;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.annotation.InterfaceStability;
+
+import java.util.Set;
+
+/**
+ * The subscribed topic describer is used by the {@link PartitionAssignor}
+ * to obtain topic and partition metadata of the subscribed topics.
+ *
+ * The interface is kept in an internal module until KIP-848 is fully
+ * implemented and ready to be released.
+ */
+@InterfaceStability.Unstable
+public interface SubscribedTopicDescriber {
+    /**
+     * The number of partitions for the given topic ID.

Review Comment:
   nit: In AssignmentSpec, you used `member Id`. Should we use `Id` in this 
file as well?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java:
##########
@@ -40,23 +45,31 @@ public class TopicMetadata {
      */
     private final int numPartitions;
 
+    /**
+     * Map of every partition to a set of its rackIds.

Review Comment:
   nit: `rack Ids`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java:
##########
@@ -40,23 +45,31 @@ public class TopicMetadata {
      */
     private final int numPartitions;
 
+    /**
+     * Map of every partition to a set of its rackIds.
+     * If the rack information is unavailable, this is an empty map.

Review Comment:
   nit: Should we say that the map contains mapping for partitions only of the 
partitions has racks or something like this?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The subscribed topic metadata class is used by the {@link 
PartitionAssignor} to obtain
+ * topic and partition metadata for the topics that the consumer group is 
subscribed to.
+ */
+public class SubscribedTopicMetadata implements SubscribedTopicDescriber {
+    /**
+     * The topic IDs mapped to their corresponding {@link TopicMetadata}
+     * object, which contains topic and partition metadata.
+     */
+    Map<Uuid, TopicMetadata> topicMetadata;
+
+    public SubscribedTopicMetadata(Map<Uuid, TopicMetadata> topicMetadata) {
+        this.topicMetadata = topicMetadata;
+    }
+
+    /**
+     * The number of partitions for the given topic ID.
+     *
+     * @param topicId   Uuid corresponding to the topic.
+     * @return The number of partitions corresponding to the given topic ID,
+     *         or -1 if the topic ID does not exist.
+     */
+    @Override
+    public int numPartitions(Uuid topicId) {
+        return this.topicMetadata.containsKey(topicId) ? 
this.topicMetadata.get(topicId).numPartitions() : -1;
+    }
+
+    /**
+     * Returns all the available racks associated with the replicas of the 
given partition.
+     *
+     * @param topicId   Uuid corresponding to the partition's topic.
+     * @param partition Partition number within topic.

Review Comment:
   nit: `Partition Id` or `Partition Index`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The subscribed topic metadata class is used by the {@link 
PartitionAssignor} to obtain
+ * topic and partition metadata for the topics that the consumer group is 
subscribed to.
+ */
+public class SubscribedTopicMetadata implements SubscribedTopicDescriber {
+    /**
+     * The topic IDs mapped to their corresponding {@link TopicMetadata}
+     * object, which contains topic and partition metadata.
+     */
+    Map<Uuid, TopicMetadata> topicMetadata;
+
+    public SubscribedTopicMetadata(Map<Uuid, TopicMetadata> topicMetadata) {
+        this.topicMetadata = topicMetadata;
+    }
+
+    /**
+     * The number of partitions for the given topic ID.
+     *
+     * @param topicId   Uuid corresponding to the topic.
+     * @return The number of partitions corresponding to the given topic ID,
+     *         or -1 if the topic ID does not exist.
+     */
+    @Override
+    public int numPartitions(Uuid topicId) {
+        return this.topicMetadata.containsKey(topicId) ? 
this.topicMetadata.get(topicId).numPartitions() : -1;
+    }
+
+    /**
+     * Returns all the available racks associated with the replicas of the 
given partition.
+     *
+     * @param topicId   Uuid corresponding to the partition's topic.
+     * @param partition Partition number within topic.
+     * @return The set of racks corresponding to the replicas of the topics 
partition.
+     *         If the topic ID does not exist, an empty set is returned
+     */
+    @Override
+    public Set<String> racksForPartition(Uuid topicId, int partition) {
+        return this.topicMetadata.containsKey(topicId) ?
+            this.topicMetadata.get(topicId).partitionRacks().get(partition) :
+            Collections.emptySet();

Review Comment:
   nit: Same comment as before.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java:
##########
@@ -40,23 +45,31 @@ public class TopicMetadata {
      */
     private final int numPartitions;
 
+    /**
+     * Map of every partition to a set of its rackIds.
+     * If the rack information is unavailable, this is an empty map.
+     */
+    private final Map<Integer, Set<String>> partitionRacks;
+
     public TopicMetadata(
         Uuid id,
         String name,
-        int numPartitions
+        int numPartitions,
+        Map<Integer, Set<String>> partitionRacks
     ) {
-        this.id = Objects.requireNonNull(id);
-        if (Uuid.ZERO_UUID.equals(id)) {
-            throw new IllegalArgumentException("Topic id cannot be 
ZERO_UUID.");
-        }
-        this.name = Objects.requireNonNull(name);
-        if (name.isEmpty()) {
-            throw new IllegalArgumentException("Topic name cannot be empty.");
-        }
-        this.numPartitions = numPartitions;
-        if (numPartitions < 0) {
-            throw new IllegalArgumentException("Number of partitions cannot be 
negative.");
-        }
+            this.id = Objects.requireNonNull(id);
+            this.partitionRacks = Objects.requireNonNull(partitionRacks);

Review Comment:
   nit: Could we put this one last in the constructor? The line before it and 
after it goes together.



##########
group-coordinator/src/main/resources/common/message/ConsumerGroupPartitionMetadataValue.json:
##########
@@ -29,7 +29,14 @@
       { "name": "TopicName", "versions": "0+", "type": "string",
         "about": "The topic name." },
       { "name": "NumPartitions", "versions": "0+", "type": "int32",
-        "about": "The number of partitions of the topic." }
+        "about": "The number of partitions of the topic." },
+      { "name": "PartitionMetadata", "versions": "0+", "type": 
"[]PartitionMetadata",
+        "about": "Partitions mapped to a set of racks.", "fields": [

Review Comment:
   nit: Could we update the doc here to explain that we only store partition 
metadata if racks is non-empty?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/TopicMetadata.java:
##########
@@ -106,16 +129,26 @@ public String toString() {
             "id=" + id +
             ", name=" + name +
             ", numPartitions=" + numPartitions +
+            ", partitionRacks=" + partitionRacks +
             ')';
     }
 
     public static TopicMetadata fromRecord(
         ConsumerGroupPartitionMetadataValue.TopicMetadata record
     ) {
+        // Converting the data type from a list stored in the record to a map.
+        Map<Integer, Set<String>> partitionRacks = new HashMap<>();
+        for (ConsumerGroupPartitionMetadataValue.PartitionMetadata 
partitionMetadata : record.partitionMetadata()) {
+            partitionRacks.put(
+                partitionMetadata.partition(),
+                Collections.unmodifiableSet(new 
HashSet<>(partitionMetadata.racks()))
+            );
+        }
+
         return new TopicMetadata(
             record.topicId(),
             record.topicName(),
-            record.numPartitions()
-        );
+            record.numPartitions(),
+            partitionRacks.isEmpty() ? Collections.emptyMap() : 
partitionRacks);

Review Comment:
   nit: You can use `partitionRacks` in all cases here.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##########
@@ -145,13 +147,13 @@ public TargetAssignmentBuilder.TargetAssignmentResult 
build() {
             Map<String, AssignmentMemberSpec> memberSpecs = new HashMap<>();
 
             // All the existing members are prepared.
-            members.forEach((memberId, member) -> {
-                memberSpecs.put(memberId, createAssignmentMemberSpec(
+            members.forEach((memberId, member) -> memberSpecs.put(memberId,
+                createAssignmentMemberSpec(
                     member,
                     targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
                     subscriptionMetadata
-                ));
-            });
+                )

Review Comment:
   nit: It would be better like this if we really want to bing 
`memberSpecs.put` on the previous line. Otherwise, we can just keep the old 
format.
   
   ```
   members.forEach((memberId, member) -> memberSpecs.put(memberId, 
createAssignmentMemberSpec(
       member,
       targetAssignment.getOrDefault(memberId, Assignment.EMPTY),
       subscriptionMetadata
   ));
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupTest.java:
##########
@@ -417,19 +417,23 @@ public void testUpdateSubscriptionMetadata() {
             consumerGroup.computeSubscriptionMetadata(
                 null,
                 null,
-                image.topics()
+                image.topics(),
+                image.cluster()
             )
         );
 
         // Compute while taking into account member 1.
         assertEquals(
             mkMap(
-                mkEntry("foo", new TopicMetadata(fooTopicId, "foo", 1))
+                mkEntry("foo",
+                    new TopicMetadata(fooTopicId, "foo", 1, 
Collections.emptyMap())
+                )

Review Comment:
   nit: This format is weird.
   
   It should be as follow:
   ```
   mkEntry(
       "foo",
       new TopicMetadata(fooTopicId, "foo", 1, Collections.emptyMap())
   )
   ```
   
   or kept on one line. One line seems fine here.
   
   There are many other cases in this file. I won't mention them.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##########
@@ -168,20 +170,24 @@ public TargetAssignmentBuilder.TargetAssignmentResult 
build() {
             });
 
             // Prepare the expected topic metadata.
-            Map<Uuid, AssignmentTopicMetadata> topicMetadata = new HashMap<>();
-            subscriptionMetadata.forEach((topicName, metadata) -> {
-                topicMetadata.put(metadata.id(), new 
AssignmentTopicMetadata(metadata.numPartitions()));
-            });
+            Map<Uuid, TopicMetadata> topicMetadataMap = new HashMap<>();
+            subscriptionMetadata.forEach((topicName, topicMetadata) ->
+                topicMetadataMap.put(topicMetadata.id(), topicMetadata));
 
             // Prepare the expected assignment spec.
             AssignmentSpec assignmentSpec = new AssignmentSpec(
-                memberSpecs,
-                topicMetadata
+                memberSpecs
+            );
+
+            SubscribedTopicMetadata assignmentTopicMetadata = new 
SubscribedTopicMetadata(
+                topicMetadataMap
             );

Review Comment:
   nit: Could you put this right after the `topicMetadataMap` preparation. They 
seems to go together.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##########
@@ -222,8 +231,25 @@ public void testCreateAssignmentMemberSpec() {
 
         Map<String, TopicMetadata> subscriptionMetadata = new HashMap<String, 
TopicMetadata>() {
             {
-                put("foo", new TopicMetadata(fooTopicId, "foo", 5));
-                put("bar", new TopicMetadata(barTopicId, "bar", 5));
+                put("foo", new TopicMetadata(fooTopicId, "foo", 5,
+                    mkMap(
+                        mkEntry(0, Collections.emptySet()),
+                        mkEntry(1, Collections.emptySet()),
+                        mkEntry(2, Collections.emptySet()),
+                        mkEntry(3, Collections.emptySet()),
+                        mkEntry(4, Collections.emptySet())
+                    )

Review Comment:
   As the racks are empty, we don't have to define the partitions, right? Could 
we just used `Collections.emptyMap()`? There are other cases in this file.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##########
@@ -203,7 +209,10 @@ public TargetAssignmentBuilder.TargetAssignmentResult 
build() {
 
             // Verify that the assignor was called once with the expected
             // assignment spec.
-            verify(assignor, times(1)).assign(assignmentSpec);
+            verify(assignor, times(1))
+                .assign(
+                    assignmentSpec, assignmentTopicMetadata
+                );

Review Comment:
   nit: This code format is weird. If you really want to break the line, I 
would format it as follow:
   
   ```
   verify(assignor, times(1)).assign(
       assignmentSpec,
       assignmentTopicMetadata
   );
   ```
   
   or as follow (like you did at L189)
   
   ```
   verify(assignor, times(1))
       .assign(assignmentSpec, assignmentTopicMetadata);
   ```



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TargetAssignmentBuilderTest.java:
##########
@@ -624,8 +745,27 @@ public void testDeleteMember() {
             20
         );
 
-        Uuid fooTopicId = context.addTopicMetadata("foo", 6);
-        Uuid barTopicId = context.addTopicMetadata("bar", 6);
+        Uuid fooTopicId = context.addTopicMetadata("foo", 6,
+            mkMap(
+                mkEntry(0, Collections.emptySet()),
+                mkEntry(1, Collections.emptySet()),
+                mkEntry(2, Collections.emptySet()),
+                mkEntry(3, Collections.emptySet()),
+                mkEntry(4, Collections.emptySet()),
+                mkEntry(5, Collections.emptySet())
+            )
+        );
+
+        Uuid barTopicId = context.addTopicMetadata("bar", 6,

Review Comment:
   Should we have at least one test where we set racks for some partitions?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/SubscribedTopicMetadata.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.consumer;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.coordinator.group.assignor.PartitionAssignor;
+import org.apache.kafka.coordinator.group.assignor.SubscribedTopicDescriber;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * The subscribed topic metadata class is used by the {@link 
PartitionAssignor} to obtain
+ * topic and partition metadata for the topics that the consumer group is 
subscribed to.
+ */
+public class SubscribedTopicMetadata implements SubscribedTopicDescriber {

Review Comment:
   Should we add basic unit tests for this class?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/TopicMetadataTest.java:
##########
@@ -28,24 +30,24 @@ public class TopicMetadataTest {
     @Test
     public void testAttributes() {
         Uuid topicId = Uuid.randomUuid();
-        TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15);
+        TopicMetadata topicMetadata = new TopicMetadata(topicId, "foo", 15, 
Collections.emptyMap());

Review Comment:
   Should we set a map and verify that we get the correct one back?



##########
reviewers.py:
##########
@@ -28,7 +28,7 @@
 def prompt_for_user():
     while True:
         try:
-            user_input = input("\nName or email (case insensitive): ") 
+            user_input = input("\nName or email (case insensitive): ")

Review Comment:
   This file is still here :)



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/assignor/PartitionAssignor.java:
##########
@@ -26,18 +26,17 @@
  */
 @InterfaceStability.Unstable
 public interface PartitionAssignor {
-
     /**
      * Unique name for this assignor.
      */
     String name();
 
     /**
-     * Perform the group assignment given the current members and
-     * topic metadata.
+     * Assigns partitions to group members based on the given assignment 
specification and topic metadata.
      *
-     * @param assignmentSpec The assignment spec.
+     * @param assignmentSpec           The assignment spec which included 
member metadata.
+     * @param subscribedTopicDescriber The topic and partition metadata 
describer {@link SubscribedTopicDescriber}.

Review Comment:
   nit: I think that we don't have to put `{@link SubscribedTopicDescriber}` 
here as the javadoc will automatically link the type of 
`subscribedTopicDescriber`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to