This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 9d367ed KAFKA-12841: Fix producer callback handling when partition is
missing (#11689)
9d367ed is described below
commit 9d367ed00ebf1d572a95c24855217c479563d06a
Author: Philip Nee <[email protected]>
AuthorDate: Wed Feb 2 16:03:32 2022 -0800
KAFKA-12841: Fix producer callback handling when partition is missing
(#11689)
Sometimes, the Kafka producer encounters an error prior to selecting a
topic partition. In this case, we
would like to acknowledge the failure in the producer interceptors, if any
are configured. We should also
pass a non-null Metadata object to the producer callback, if there is one.
This PR implements that
behavior. It also updates the JavaDoc to clarify that if a partition cannot
be selected, we will pass
back a partition id of -1 in the metadata. This is in keeping with
KAFKA-3303.
Co-authors: Kirk True <[email protected]>
Reviewers: Colin P. McCabe <[email protected]>
---
.../apache/kafka/clients/producer/Callback.java | 5 ++-
.../kafka/clients/producer/KafkaProducer.java | 13 ++++++-
.../producer/internals/ProducerInterceptors.java | 7 +++-
.../kafka/clients/producer/KafkaProducerTest.java | 43 ++++++++++++++++++++++
4 files changed, 62 insertions(+), 6 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
index ee0610e..236d04b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
@@ -25,10 +25,11 @@ public interface Callback {
/**
* A callback method the user can implement to provide asynchronous
handling of request completion. This method will
* be called when the record sent to the server has been acknowledged.
When exception is not null in the callback,
- * metadata will contain the special -1 value for all fields except for
topicPartition, which will be valid.
+ * metadata will contain the special -1 value for all fields. If
topicPartition cannot be
+ * choosen, a -1 value will be assigned.
*
* @param metadata The metadata for the record that was sent (i.e. the
partition and offset). An empty metadata
- * with -1 value for all fields except for topicPartition
will be returned if an error occurred.
+ * with -1 value for all fields will be returned if an
error occurred.
* @param exception The exception thrown during processing of this record.
Null if no error occurred.
* Possible thrown exceptions include:
*
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ef8a9cc..1d18524 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -982,8 +982,17 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
// for other exceptions throw directly
} catch (ApiException e) {
log.debug("Exception occurred during message send:", e);
- if (callback != null)
- callback.onCompletion(null, e);
+ // producer callback will make sure to call both 'callback' and
interceptor callback
+ if (tp == null) {
+ // set topicPartition to -1 when null
+ tp = ProducerInterceptors.extractTopicPartition(record);
+ }
+
+ Callback interceptCallback = new InterceptorCallback<>(callback,
this.interceptors, tp);
+
+ // The onCompletion callback does expect a non-null metadata, but
one will be created inside
+ // the interceptor's onCompletion implementation before the user's
callback is invoked.
+ interceptCallback.onCompletion(null, e);
this.errors.record();
this.interceptors.onSendError(record, tp, e);
return new FutureFailure(e);
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
index ceec552..dd72409 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
@@ -110,8 +110,7 @@ public class ProducerInterceptors<K, V> implements
Closeable {
interceptor.onAcknowledgement(null, exception);
} else {
if (interceptTopicPartition == null) {
- interceptTopicPartition = new
TopicPartition(record.topic(),
- record.partition() == null ?
RecordMetadata.UNKNOWN_PARTITION : record.partition());
+ interceptTopicPartition =
extractTopicPartition(record);
}
interceptor.onAcknowledgement(new
RecordMetadata(interceptTopicPartition, -1, -1,
RecordBatch.NO_TIMESTAMP, -1, -1),
exception);
@@ -123,6 +122,10 @@ public class ProducerInterceptors<K, V> implements
Closeable {
}
}
+ public static <K, V> TopicPartition
extractTopicPartition(ProducerRecord<K, V> record) {
+ return new TopicPartition(record.topic(), record.partition() == null ?
RecordMetadata.UNKNOWN_PARTITION : record.partition());
+ }
+
/**
* Closes every interceptor in a container.
*/
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 2784f19..1067db9 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -47,6 +47,7 @@ import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.network.Selectable;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
import org.apache.kafka.common.requests.EndTxnResponse;
import org.apache.kafka.common.requests.FindCoordinatorRequest;
@@ -54,6 +55,7 @@ import
org.apache.kafka.common.requests.FindCoordinatorResponse;
import org.apache.kafka.common.requests.InitProducerIdResponse;
import org.apache.kafka.common.requests.JoinGroupRequest;
import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.ProduceResponse;
import org.apache.kafka.common.requests.RequestTestUtils;
import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
@@ -1376,6 +1378,47 @@ public class KafkaProducerTest {
"key".getBytes(StandardCharsets.UTF_8),
"value".getBytes(StandardCharsets.UTF_8)));
}
+ @Test
+ public void testCallbackHandlesError() throws Exception {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+ configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+
+ Time time = new MockTime();
+ ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
+ MockClient client = new MockClient(time, producerMetadata);
+
+ String invalidTopicName = "topic abc"; // Invalid topic name due to
space
+
+ try (Producer<String, String> producer = kafkaProducer(configs, new
StringSerializer(), new StringSerializer(),
+ producerMetadata, client, null, time)) {
+ ProducerRecord<String, String> record = new
ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+ // Here's the important piece of the test. Let's make sure that
the RecordMetadata we get
+ // is non-null and adheres to the onCompletion contract.
+ Callback callBack = (recordMetadata, exception) -> {
+ assertNotNull(exception);
+ assertNotNull(recordMetadata);
+
+ assertNotNull(recordMetadata.topic(), "Topic name should be
valid even on send failure");
+ assertEquals(invalidTopicName, recordMetadata.topic());
+ assertNotNull(recordMetadata.partition(), "Partition should be
valid even on send failure");
+
+ assertFalse(recordMetadata.hasOffset());
+ assertEquals(ProduceResponse.INVALID_OFFSET,
recordMetadata.offset());
+
+ assertFalse(recordMetadata.hasTimestamp());
+ assertEquals(RecordBatch.NO_TIMESTAMP,
recordMetadata.timestamp());
+
+ assertEquals(-1, recordMetadata.serializedKeySize());
+ assertEquals(-1, recordMetadata.serializedValueSize());
+ assertEquals(-1, recordMetadata.partition());
+ };
+
+ producer.send(record, callBack);
+ }
+ }
+
private static final List<String> CLIENT_IDS = new ArrayList<>();
public static class SerializerForClientId implements Serializer<byte[]> {