jolshan commented on code in PR #15968: URL: https://github.com/apache/kafka/pull/15968#discussion_r1607363954
########## clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java: ########## @@ -610,7 +611,9 @@ private void handleProduceResponse(ClientResponse response, Map<TopicPartition, // This will be set by completeBatch. Map<TopicPartition, Metadata.LeaderIdAndEpoch> partitionsWithUpdatedLeaderInfo = new HashMap<>(); produceResponse.data().responses().forEach(r -> r.partitionResponses().forEach(p -> { - TopicPartition tp = new TopicPartition(r.name(), p.index()); + // Version 12 drop topic name and add support to topic id. However, metadata can be used to map topic id to topic name. + String topicName = (r.name() == null || r.name().isEmpty()) ? metadata.topicNames().get(r.topicId()) : r.name(); Review Comment: What do we do if metadata has refreshed and is no longer in the metadata? For fetch it is a bit different since we have the session logic, and can handle missing topics. I would recommend writing through a few cases where the server and client have/don't have the topic ID to reason about the upgrade case/downgrade case/deletions/reassignments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org