junrao commented on code in PR #19964:
URL: https://github.com/apache/kafka/pull/19964#discussion_r2146568617


##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -609,7 +607,22 @@ private void handleProduceResponse(ClientResponse 
response, Map<TopicPartition,
                                 .collect(Collectors.toList()),
                             p.errorMessage(),
                             p.currentLeader());
-                    ProducerBatch batch = batches.get(tp);
+                    ProducerBatch batch = null;
+                    // Version 13 drop topic name and add support to topic id.
+                    // We need to find batch based on topic id and partition 
index only as
+                    // topic name in the response might be empty.
+                    List<ProducerBatch> matchedBatchesForTopicId = 
batches.entrySet().stream()
+                            .filter(entry -> entry.getKey().same(new 
TopicIdPartition(r.topicId(), p.index(), r.name())))
+                            .map(Map.Entry::getValue)
+                            .collect(Collectors.toList());
+
+                    if (matchedBatchesForTopicId.size() > 1) {
+                        matchedBatchesForTopicId.forEach(matchedBatch ->
+                                failBatch(matchedBatch, new 
RuntimeException("More than one batch with same topic id and partition."), 
false));

Review Comment:
   If this is unexpected, we want to throw IllegalStateException as in other 
places in this file



##########
clients/src/main/java/org/apache/kafka/common/TopicIdPartition.java:
##########
@@ -78,6 +78,18 @@ public TopicPartition topicPartition() {
         return topicPartition;
     }
 
+    /**
+     * @return true if topic has same topicId and partition index as topic 
names some time might be empty.
+    */
+    public boolean same(TopicIdPartition tpId) {
+        if (tpId.topic().isEmpty()) {
+            return topicId.equals(tpId.topicId) &&
+                    topicPartition.partition() == tpId.partition();
+        } else {
+            return this.equals(tpId);

Review Comment:
   If topic name is not empty, it means that the topicId is zero. In this case, 
we just want to match based on topic name, right?



##########
clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java:
##########
@@ -609,7 +607,22 @@ private void handleProduceResponse(ClientResponse 
response, Map<TopicPartition,
                                 .collect(Collectors.toList()),
                             p.errorMessage(),
                             p.currentLeader());
-                    ProducerBatch batch = batches.get(tp);
+                    ProducerBatch batch = null;
+                    // Version 13 drop topic name and add support to topic id.
+                    // We need to find batch based on topic id and partition 
index only as
+                    // topic name in the response might be empty.
+                    List<ProducerBatch> matchedBatchesForTopicId = 
batches.entrySet().stream()
+                            .filter(entry -> entry.getKey().same(new 
TopicIdPartition(r.topicId(), p.index(), r.name())))

Review Comment:
   Could we just find the first match?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to