lianetm commented on code in PR #19885:
URL: https://github.com/apache/kafka/pull/19885#discussion_r2491866942


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -983,38 +984,52 @@ public boolean sameRequest(final OffsetFetchRequestState 
request) {
         }
 
         public NetworkClientDelegate.UnsentRequest toUnsentRequest() {
-            List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = 
requestedPartitions.stream()
-                .collect(Collectors.groupingBy(TopicPartition::topic))
-                .entrySet()
-                .stream()
-                .map(entry -> new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
+            Map<String, Uuid> topicIds = metadata.topicIds();
+            boolean canUseTopicIds = true;
+            List<OffsetFetchRequestData.OffsetFetchRequestTopics> topics = new 
ArrayList<>();
+            Map<String, List<TopicPartition>> tps = 
requestedPartitions.stream().collect(Collectors.groupingBy(TopicPartition::topic));
+            for (Map.Entry<String, List<TopicPartition>> entry : 
tps.entrySet()) {
+                String topic = entry.getKey();
+                Uuid topicId = topicIds.getOrDefault(topic, Uuid.ZERO_UUID);
+                if (Uuid.ZERO_UUID.equals(topicId)) {
+                    canUseTopicIds = false;
+                }
+                topics.add(new 
OffsetFetchRequestData.OffsetFetchRequestTopics()
                     .setName(entry.getKey())
+                    .setTopicId(topicId)
                     .setPartitionIndexes(entry.getValue().stream()
                         .map(TopicPartition::partition)
-                        .collect(Collectors.toList())))
-                .collect(Collectors.toList());
+                        .collect(Collectors.toList())));
+            }
 
+            boolean finalCanUseTopicIds = canUseTopicIds;
             OffsetFetchRequest.Builder builder = memberInfo.memberEpoch
-                .map(epoch -> OffsetFetchRequest.Builder.forTopicNames(
-                    new OffsetFetchRequestData()
+                .map(epoch -> {
+                    OffsetFetchRequestData data = new OffsetFetchRequestData()
                         .setRequireStable(true)
                         .setGroups(List.of(
                             new 
OffsetFetchRequestData.OffsetFetchRequestGroup()
                                 .setGroupId(groupId)
                                 .setMemberId(memberInfo.memberId)
                                 .setMemberEpoch(epoch)
-                                .setTopics(topics))),
-                            throwOnFetchStableOffsetUnsupported))
+                                .setTopics(topics)));
+                    return finalCanUseTopicIds
+                        ? OffsetFetchRequest.Builder.forTopicIdsOrNames(data, 
throwOnFetchStableOffsetUnsupported, true)
+                        : OffsetFetchRequest.Builder.forTopicNames(data, 
throwOnFetchStableOffsetUnsupported);
+                })
                 // Building request without passing member ID/epoch to leave 
the logic to choose
                 // default values when not present on the request builder.
-                .orElseGet(() -> OffsetFetchRequest.Builder.forTopicNames(
-                    new OffsetFetchRequestData()
+                .orElseGet(() -> {
+                    OffsetFetchRequestData data = new OffsetFetchRequestData()
                         .setRequireStable(true)
                         .setGroups(List.of(
                             new 
OffsetFetchRequestData.OffsetFetchRequestGroup()
                                 .setGroupId(groupId)
-                                .setTopics(topics))),
-                    throwOnFetchStableOffsetUnsupported));
+                                .setTopics(topics)));
+                    return finalCanUseTopicIds
+                        ? OffsetFetchRequest.Builder.forTopicIdsOrNames(data, 
throwOnFetchStableOffsetUnsupported, true)
+                        : OffsetFetchRequest.Builder.forTopicNames(data, 
throwOnFetchStableOffsetUnsupported);
+                });

Review Comment:
   this whole section is getting too complicated, let's try to simplify.
   
   We just want to create the builder, the only diff seems to be around the 
group data, that should include epoch or not. So what about extracting that bit 
and leaving the rest as a common code, no dups? Something along these lines:
   
   ```
               OffsetFetchRequestData.OffsetFetchRequestGroup groupData = new 
OffsetFetchRequestData.OffsetFetchRequestGroup()
                       .setGroupId(groupId)
                       .setTopics(topics);
               if (memberInfo.memberEpoch.isPresent()) {
                   groupData = groupData.setMemberId(memberInfo.memberId)
                           .setMemberEpoch(memberInfo.memberEpoch.get());
               }
   
               OffsetFetchRequestData data = new OffsetFetchRequestData()
                       .setRequireStable(true)
                       .setGroups(List.of(groupData));
               OffsetFetchRequest.Builder builder = finalCanUseTopicIds
                       ? OffsetFetchRequest.Builder.forTopicIdsOrNames(data, 
throwOnFetchStableOffsetUnsupported, true)
                       : OffsetFetchRequest.Builder.forTopicNames(data, 
throwOnFetchStableOffsetUnsupported);
   ```
    
   Would that work? Not sure if I'm missing any other diff



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to