This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new e6b78ae9e5b KAFKA-19397: Ensure consistent metadata usage in produce
request and response (#19964)
e6b78ae9e5b is described below
commit e6b78ae9e5b31e9ffb8ceadab413f68137f9287c
Author: Omnia Ibrahim <[email protected]>
AuthorDate: Fri Jul 4 17:44:09 2025 +0100
KAFKA-19397: Ensure consistent metadata usage in produce request and
response (#19964)
- Metadata doesn't have the full view of topicNames to ids during
rebootstrap of client or when topic has been deleted/recreated. The
solution is to pass down topic id and stop trying to figure it out later
in the logic.
---------
Co-authored-by: Kirk True <[email protected]>
---
.../producer/ProducerSendWhileDeletionTest.java | 40 ++++++++++++++++++++++
.../kafka/clients/producer/internals/Sender.java | 24 ++++++++++---
2 files changed, 59 insertions(+), 5 deletions(-)
diff --git
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
index 7ee809ca46b..e775247d694 100644
---
a/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
+++
b/clients/clients-integration-tests/src/test/java/org/apache/kafka/clients/producer/ProducerSendWhileDeletionTest.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.test.ClusterInstance;
@@ -42,9 +43,12 @@ import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicInteger;
import static
org.apache.kafka.clients.producer.ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG;
import static
org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
@@ -157,6 +161,42 @@ public class ProducerSendWhileDeletionTest {
}
}
+ @ClusterTest
+ public void testSendWhileTopicGetRecreated() {
+ int maxNumTopicRecreationAttempts = 5;
+ var recreateTopicFuture = CompletableFuture.supplyAsync(() -> {
+ var topicIds = new HashSet<Uuid>();
+ while (topicIds.size() < maxNumTopicRecreationAttempts) {
+ try (var admin = cluster.admin()) {
+ if (admin.listTopics().names().get().contains(topic)) {
+ admin.deleteTopics(List.of(topic)).all().get();
+ }
+ topicIds.add(admin.createTopics(List.of(new
NewTopic(topic, 2, (short) 1))).topicId(topic).get());
+ } catch (Exception e) {
+ // ignore
+ }
+ }
+ return topicIds;
+ });
+
+ AtomicInteger numAcks = new AtomicInteger(0);
+ var producerFuture = CompletableFuture.runAsync(() -> {
+ try (var producer = createProducer()) {
+ for (int i = 1; i <= numRecords; i++) {
+ producer.send(new ProducerRecord<>(topic, null, ("value" +
i).getBytes()),
+ (metadata, exception) -> {
+ numAcks.incrementAndGet();
+ });
+ }
+ producer.flush();
+ }
+ });
+ var topicIds = recreateTopicFuture.join();
+ producerFuture.join();
+ assertEquals(maxNumTopicRecreationAttempts, topicIds.size());
+ assertEquals(numRecords, numAcks.intValue());
+ }
+
@ClusterTest
public void testSendWithTopicReassignmentIsMidWay() throws Exception {
var partition0 = new TopicPartition(topic, 0);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 876555d232a..64e8646d6f1 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -565,7 +565,7 @@ public class Sender implements Runnable {
/**
* Handle a produce response
*/
- private void handleProduceResponse(ClientResponse response,
Map<TopicPartition, ProducerBatch> batches, long now) {
+ private void handleProduceResponse(ClientResponse response,
Map<TopicPartition, ProducerBatch> batches, Map<Uuid, String> topicNames, long
now) {
RequestHeader requestHeader = response.requestHeader();
int correlationId = requestHeader.correlationId();
if (response.wasTimedOut()) {
@@ -595,9 +595,6 @@ public class Sender implements Runnable {
// This will be set by completeBatch.
Map<TopicPartition, Metadata.LeaderIdAndEpoch>
partitionsWithUpdatedLeaderInfo = new HashMap<>();
produceResponse.data().responses().forEach(r ->
r.partitionResponses().forEach(p -> {
- // Version 13 drop topic name and add support to topic id.
However, metadata can be used to map topic id to topic name.
- String topicName =
metadata.topicNames().getOrDefault(r.topicId(), r.name());
- TopicPartition tp = new TopicPartition(topicName,
p.index());
ProduceResponse.PartitionResponse partResp = new
ProduceResponse.PartitionResponse(
Errors.forCode(p.errorCode()),
p.baseOffset(),
@@ -609,7 +606,20 @@ public class Sender implements Runnable {
.collect(Collectors.toList()),
p.errorMessage(),
p.currentLeader());
+
+ // Version 13 drop topic name and add support to topic id.
+ // We need to find batch based on topic id and partition
index only as
+ // topic name in the response will be empty.
+ // For older versions, topic id is zero, and we will find
the batch based on the topic name.
+ TopicPartition tp = (!r.topicId().equals(Uuid.ZERO_UUID)
&& topicNames.containsKey(r.topicId())) ?
+ new TopicPartition(topicNames.get(r.topicId()),
p.index()) :
+ new TopicPartition(r.name(), p.index());
+
ProducerBatch batch = batches.get(tp);
+ if (batch == null) {
+ throw new IllegalStateException("Can't find batch
created for topic id " + r.topicId() +
+ " topic name " + r.name() + " partition " +
p.index() + " using " + topicNames);
+ }
completeBatch(batch, partResp, correlationId, now,
partitionsWithUpdatedLeaderInfo);
}));
@@ -892,7 +902,11 @@ public class Sender implements Runnable {
.setTopicData(tpd),
useTransactionV1Version
);
- RequestCompletionHandler callback = response ->
handleProduceResponse(response, recordsByPartition, time.milliseconds());
+ // Fetch topic names from metadata outside callback as topic ids may
change during the callback
+ // for example if topic was recreated.
+ Map<Uuid, String> topicNames = metadata.topicNames();
+
+ RequestCompletionHandler callback = response ->
handleProduceResponse(response, recordsByPartition, topicNames,
time.milliseconds());
String nodeId = Integer.toString(destination);
ClientRequest clientRequest = client.newClientRequest(nodeId,
requestBuilder, now, acks != 0,