This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.1 by this push:
     new e6b78ae9e5b KAFKA-19397: Ensure consistent metadata usage in produce 
request and response (#19964)
e6b78ae9e5b is described below

commit e6b78ae9e5b31e9ffb8ceadab413f68137f9287c
Author: Omnia Ibrahim <[email protected]>
AuthorDate: Fri Jul 4 17:44:09 2025 +0100

    KAFKA-19397: Ensure consistent metadata usage in produce request and 
response (#19964)
    
    - Metadata doesn't have the full view of topicNames to ids during
    rebootstrap of client or when topic has been deleted/recreated. The
    solution is to pass down topic id and stop trying to figure it out later
    in the logic.
    
    ---------
    
    Co-authored-by: Kirk True <[email protected]>
---
 .../producer/ProducerSendWhileDeletionTest.java    | 40 ++++++++++++++++++++++
 .../kafka/clients/producer/internals/Sender.java   | 24 ++++++++++---
 2 files changed, 59 insertions(+), 5 deletions(-)

diff --git 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
index 7ee809ca46b..e775247d694 100644
--- 
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
+++ 
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.admin.TopicDescription;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.test.ClusterInstance;
@@ -42,9 +43,12 @@ import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import static 
org.apache.kafka.clients.producer.ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG;
 import static 
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
@@ -157,6 +161,42 @@ public class ProducerSendWhileDeletionTest {
         }
     }
 
+    @ClusterTest
+    public void testSendWhileTopicGetRecreated() {
+        int maxNumTopicRecreationAttempts = 5;
+        var recreateTopicFuture = CompletableFuture.supplyAsync(() -> {
+            var topicIds = new HashSet<Uuid>();
+            while (topicIds.size() < maxNumTopicRecreationAttempts) {
+                try (var admin = cluster.admin()) {
+                    if (admin.listTopics().names().get().contains(topic)) {
+                        admin.deleteTopics(List.of(topic)).all().get();
+                    }
+                    topicIds.add(admin.createTopics(List.of(new 
NewTopic(topic, 2, (short) 1))).topicId(topic).get());
+                } catch (Exception e) {
+                    // ignore
+                }
+            }
+            return topicIds;
+        });
+
+        AtomicInteger numAcks = new AtomicInteger(0);
+        var producerFuture = CompletableFuture.runAsync(() -> {
+            try (var producer = createProducer()) {
+                for (int i = 1; i <= numRecords; i++) {
+                    producer.send(new ProducerRecord<>(topic, null, ("value" + 
i).getBytes()),
+                            (metadata, exception) -> {
+                                numAcks.incrementAndGet();
+                            });
+                }
+                producer.flush();
+            }
+        });
+        var topicIds = recreateTopicFuture.join();
+        producerFuture.join();
+        assertEquals(maxNumTopicRecreationAttempts, topicIds.size());
+        assertEquals(numRecords, numAcks.intValue());
+    }
+
     @ClusterTest
     public void testSendWithTopicReassignmentIsMidWay() throws Exception {
         var partition0 = new TopicPartition(topic, 0);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 876555d232a..64e8646d6f1 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -565,7 +565,7 @@ public class Sender implements Runnable {
     /**
      * Handle a produce response
      */
-    private void handleProduceResponse(ClientResponse response, 
Map<TopicPartition, ProducerBatch> batches, long now) {
+    private void handleProduceResponse(ClientResponse response, 
Map<TopicPartition, ProducerBatch> batches, Map<Uuid, String> topicNames, long 
now) {
         RequestHeader requestHeader = response.requestHeader();
         int correlationId = requestHeader.correlationId();
         if (response.wasTimedOut()) {
@@ -595,9 +595,6 @@ public class Sender implements Runnable {
                 // This will be set by completeBatch.
                 Map<TopicPartition, Metadata.LeaderIdAndEpoch> 
partitionsWithUpdatedLeaderInfo = new HashMap<>();
                 produceResponse.data().responses().forEach(r -> 
r.partitionResponses().forEach(p -> {
-                    // Version 13 drop topic name and add support to topic id. 
However, metadata can be used to map topic id to topic name.
-                    String topicName = 
metadata.topicNames().getOrDefault(r.topicId(), r.name());
-                    TopicPartition tp = new TopicPartition(topicName, 
p.index());
                     ProduceResponse.PartitionResponse partResp = new 
ProduceResponse.PartitionResponse(
                             Errors.forCode(p.errorCode()),
                             p.baseOffset(),
@@ -609,7 +606,20 @@ public class Sender implements Runnable {
                                 .collect(Collectors.toList()),
                             p.errorMessage(),
                             p.currentLeader());
+
+                    // Version 13 drop topic name and add support to topic id.
+                    // We need to find batch based on topic id and partition 
index only as
+                    // topic name in the response will be empty.
+                    // For older versions, topic id is zero, and we will find 
the batch based on the topic name.
+                    TopicPartition tp = (!r.topicId().equals(Uuid.ZERO_UUID) 
&& topicNames.containsKey(r.topicId())) ?
+                            new TopicPartition(topicNames.get(r.topicId()), 
p.index()) :
+                            new TopicPartition(r.name(), p.index());
+
                     ProducerBatch batch = batches.get(tp);
+                    if (batch == null) {
+                        throw new IllegalStateException("Can't find batch 
created for topic id " + r.topicId() +
+                                " topic name " + r.name() + " partition " + 
p.index() + " using " + topicNames);
+                    }
                     completeBatch(batch, partResp, correlationId, now, 
partitionsWithUpdatedLeaderInfo);
                 }));
 
@@ -892,7 +902,11 @@ public class Sender implements Runnable {
                         .setTopicData(tpd),
                 useTransactionV1Version
         );
-        RequestCompletionHandler callback = response -> 
handleProduceResponse(response, recordsByPartition, time.milliseconds());
+        // Fetch topic names from metadata outside callback as topic ids may 
change during the callback
+        // for example if topic was recreated.
+        Map<Uuid, String> topicNames = metadata.topicNames();
+
+        RequestCompletionHandler callback = response -> 
handleProduceResponse(response, recordsByPartition, topicNames, 
time.milliseconds());
 
         String nodeId = Integer.toString(destination);
         ClientRequest clientRequest = client.newClientRequest(nodeId, 
requestBuilder, now, acks != 0,

Reply via email to