jolshan commented on code in PR #15968:
URL: https://github.com/apache/kafka/pull/15968#discussion_r1607363954


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -610,7 +611,9 @@ private void handleProduceResponse(ClientResponse response, 
Map<TopicPartition,
                 // This will be set by completeBatch.
                 Map<TopicPartition, Metadata.LeaderIdAndEpoch> 
partitionsWithUpdatedLeaderInfo = new HashMap<>();
                 produceResponse.data().responses().forEach(r -> 
r.partitionResponses().forEach(p -> {
-                    TopicPartition tp = new TopicPartition(r.name(), 
p.index());
+                    // Version 12 drop topic name and add support to topic id. 
However, metadata can be used to map topic id to topic name.
+                    String topicName = (r.name() == null || 
r.name().isEmpty()) ? metadata.topicNames().get(r.topicId()) : r.name();

Review Comment:
   What do we do if metadata has refreshed and is no longer in the metadata? 
   For fetch it is a bit different since we have the session logic, and can 
handle missing topics.
   
   I would recommend writing through a few cases where the server and client 
have/don't have the topic ID to reason about the upgrade case/downgrade 
case/deletions/reassignments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to