dajac commented on code in PR #15205:
URL: https://github.com/apache/kafka/pull/15205#discussion_r1463681361


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -578,26 +578,38 @@ public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember(
 
     private static List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitionsFromMap(
         Map<Uuid, Set<Integer>> partitions,
-        Map<String, TopicMetadata> subscriptionMetadata
+        TopicsImage topicsImage
     ) {
-        return partitions.entrySet().stream().map(
-            item -> new ConsumerGroupDescribeResponseData.TopicPartitions()
-                .setTopicId(item.getKey())
-                .setTopicName(lookupTopicNameById(item.getKey(), 
subscriptionMetadata))
-                .setPartitions(new ArrayList<>(item.getValue()))
-        ).collect(Collectors.toList());
+        List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitions = new ArrayList<>();
+        for (Map.Entry<Uuid, Set<Integer>> entry : partitions.entrySet()) {
+            Uuid topicId = entry.getKey();
+            Set<Integer> partitionSet = partitions.get(topicId);
+//        partitions.forEach((topicId, partitionSet) -> {
+            String topicName = lookupTopicNameById(topicId, topicsImage);
+            if (topicName != null) {
+                topicPartitions.add(new 
ConsumerGroupDescribeResponseData.TopicPartitions()
+                    .setTopicId(topicId)
+                    .setTopicName(topicName)
+                    .setPartitions(new ArrayList<>(partitionSet)));
+            } else {
+                // When the topic has been deleted and the group/member hasn't 
updated,
+                // directly remove the topic from the assignment.
+                partitions.remove(topicId, partitionSet);

Review Comment:
   We should not do this. We should only not add it to the reponse.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -578,26 +578,38 @@ public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember(
 
     private static List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitionsFromMap(
         Map<Uuid, Set<Integer>> partitions,
-        Map<String, TopicMetadata> subscriptionMetadata
+        TopicsImage topicsImage
     ) {
-        return partitions.entrySet().stream().map(
-            item -> new ConsumerGroupDescribeResponseData.TopicPartitions()
-                .setTopicId(item.getKey())
-                .setTopicName(lookupTopicNameById(item.getKey(), 
subscriptionMetadata))
-                .setPartitions(new ArrayList<>(item.getValue()))
-        ).collect(Collectors.toList());
+        List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitions = new ArrayList<>();
+        for (Map.Entry<Uuid, Set<Integer>> entry : partitions.entrySet()) {

Review Comment:
   nit: How about using `partitions.foreach`?



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMemberTest.java:
##########
@@ -17,10 +17,11 @@
 package org.apache.kafka.coordinator.group.consumer;

Review Comment:
   Should we add a unit test here to verify that non-existent partitions are 
not provided in the response?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/consumer/ConsumerGroupMember.java:
##########
@@ -578,26 +578,38 @@ public ConsumerGroupDescribeResponseData.Member 
asConsumerGroupDescribeMember(
 
     private static List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitionsFromMap(
         Map<Uuid, Set<Integer>> partitions,
-        Map<String, TopicMetadata> subscriptionMetadata
+        TopicsImage topicsImage
     ) {
-        return partitions.entrySet().stream().map(
-            item -> new ConsumerGroupDescribeResponseData.TopicPartitions()
-                .setTopicId(item.getKey())
-                .setTopicName(lookupTopicNameById(item.getKey(), 
subscriptionMetadata))
-                .setPartitions(new ArrayList<>(item.getValue()))
-        ).collect(Collectors.toList());
+        List<ConsumerGroupDescribeResponseData.TopicPartitions> 
topicPartitions = new ArrayList<>();
+        for (Map.Entry<Uuid, Set<Integer>> entry : partitions.entrySet()) {
+            Uuid topicId = entry.getKey();
+            Set<Integer> partitionSet = partitions.get(topicId);
+//        partitions.forEach((topicId, partitionSet) -> {

Review Comment:
   nit: Could we remove it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to