This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 9d367ed  KAFKA-12841: Fix producer callback handling when partition is 
missing (#11689)
9d367ed is described below

commit 9d367ed00ebf1d572a95c24855217c479563d06a
Author: Philip Nee <[email protected]>
AuthorDate: Wed Feb 2 16:03:32 2022 -0800

    KAFKA-12841: Fix producer callback handling when partition is missing 
(#11689)
    
    Sometimes, the Kafka producer encounters an error prior to selecting a 
topic partition. In this case, we
    would like to acknowledge the failure in the producer interceptors, if any 
are configured. We should also
    pass a non-null Metadata object to the producer callback, if there is one. 
This PR implements that
    behavior. It also updates the JavaDoc to clarify that if a partition cannot 
be selected, we will pass
    back a partition id of -1 in the metadata. This is in keeping with 
KAFKA-3303.
    
    Co-authors: Kirk True <[email protected]>
    Reviewers: Colin P. McCabe <[email protected]>
---
 .../apache/kafka/clients/producer/Callback.java    |  5 ++-
 .../kafka/clients/producer/KafkaProducer.java      | 13 ++++++-
 .../producer/internals/ProducerInterceptors.java   |  7 +++-
 .../kafka/clients/producer/KafkaProducerTest.java  | 43 ++++++++++++++++++++++
 4 files changed, 62 insertions(+), 6 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
index ee0610e..236d04b 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
@@ -25,10 +25,11 @@ public interface Callback {
     /**
      * A callback method the user can implement to provide asynchronous 
handling of request completion. This method will
      * be called when the record sent to the server has been acknowledged. 
When exception is not null in the callback,
-     * metadata will contain the special -1 value for all fields except for 
topicPartition, which will be valid.
+     * metadata will contain the special -1 value for all fields. If 
topicPartition cannot be
+     * choosen, a -1 value will be assigned.
      *
      * @param metadata The metadata for the record that was sent (i.e. the 
partition and offset). An empty metadata
-     *                 with -1 value for all fields except for topicPartition 
will be returned if an error occurred.
+     *                 with -1 value for all fields will be returned if an 
error occurred.
      * @param exception The exception thrown during processing of this record. 
Null if no error occurred.
      *                  Possible thrown exceptions include:
      *
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index ef8a9cc..1d18524 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -982,8 +982,17 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
             // for other exceptions throw directly
         } catch (ApiException e) {
             log.debug("Exception occurred during message send:", e);
-            if (callback != null)
-                callback.onCompletion(null, e);
+            // producer callback will make sure to call both 'callback' and 
interceptor callback
+            if (tp == null) {
+                // set topicPartition to -1 when null
+                tp = ProducerInterceptors.extractTopicPartition(record);
+            }
+
+            Callback interceptCallback = new InterceptorCallback<>(callback, 
this.interceptors, tp);
+
+            // The onCompletion callback does expect a non-null metadata, but 
one will be created inside
+            // the interceptor's onCompletion implementation before the user's 
callback is invoked.
+            interceptCallback.onCompletion(null, e);
             this.errors.record();
             this.interceptors.onSendError(record, tp, e);
             return new FutureFailure(e);
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
index ceec552..dd72409 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
@@ -110,8 +110,7 @@ public class ProducerInterceptors<K, V> implements 
Closeable {
                     interceptor.onAcknowledgement(null, exception);
                 } else {
                     if (interceptTopicPartition == null) {
-                        interceptTopicPartition = new 
TopicPartition(record.topic(),
-                                record.partition() == null ? 
RecordMetadata.UNKNOWN_PARTITION : record.partition());
+                        interceptTopicPartition = 
extractTopicPartition(record);
                     }
                     interceptor.onAcknowledgement(new 
RecordMetadata(interceptTopicPartition, -1, -1,
                                     RecordBatch.NO_TIMESTAMP, -1, -1), 
exception);
@@ -123,6 +122,10 @@ public class ProducerInterceptors<K, V> implements 
Closeable {
         }
     }
 
+    public static <K, V> TopicPartition 
extractTopicPartition(ProducerRecord<K, V> record) {
+        return new TopicPartition(record.topic(), record.partition() == null ? 
RecordMetadata.UNKNOWN_PARTITION : record.partition());
+    }
+
     /**
      * Closes every interceptor in a container.
      */
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 2784f19..1067db9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -47,6 +47,7 @@ import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.network.Selectable;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.requests.AddOffsetsToTxnResponse;
 import org.apache.kafka.common.requests.EndTxnResponse;
 import org.apache.kafka.common.requests.FindCoordinatorRequest;
@@ -54,6 +55,7 @@ import 
org.apache.kafka.common.requests.FindCoordinatorResponse;
 import org.apache.kafka.common.requests.InitProducerIdResponse;
 import org.apache.kafka.common.requests.JoinGroupRequest;
 import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.requests.ProduceResponse;
 import org.apache.kafka.common.requests.RequestTestUtils;
 import org.apache.kafka.common.requests.TxnOffsetCommitRequest;
 import org.apache.kafka.common.requests.TxnOffsetCommitResponse;
@@ -1376,6 +1378,47 @@ public class KafkaProducerTest {
             "key".getBytes(StandardCharsets.UTF_8), 
"value".getBytes(StandardCharsets.UTF_8)));
     }
 
+    @Test
+    public void testCallbackHandlesError() throws Exception {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9000");
+        configs.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, "1000");
+
+        Time time = new MockTime();
+        ProducerMetadata producerMetadata = newMetadata(0, Long.MAX_VALUE);
+        MockClient client = new MockClient(time, producerMetadata);
+
+        String invalidTopicName = "topic abc"; // Invalid topic name due to 
space
+
+        try (Producer<String, String> producer = kafkaProducer(configs, new 
StringSerializer(), new StringSerializer(),
+                producerMetadata, client, null, time)) {
+            ProducerRecord<String, String> record = new 
ProducerRecord<>(invalidTopicName, "HelloKafka");
+
+            // Here's the important piece of the test. Let's make sure that 
the RecordMetadata we get
+            // is non-null and adheres to the onCompletion contract.
+            Callback callBack = (recordMetadata, exception) -> {
+                assertNotNull(exception);
+                assertNotNull(recordMetadata);
+
+                assertNotNull(recordMetadata.topic(), "Topic name should be 
valid even on send failure");
+                assertEquals(invalidTopicName, recordMetadata.topic());
+                assertNotNull(recordMetadata.partition(), "Partition should be 
valid even on send failure");
+
+                assertFalse(recordMetadata.hasOffset());
+                assertEquals(ProduceResponse.INVALID_OFFSET, 
recordMetadata.offset());
+
+                assertFalse(recordMetadata.hasTimestamp());
+                assertEquals(RecordBatch.NO_TIMESTAMP, 
recordMetadata.timestamp());
+
+                assertEquals(-1, recordMetadata.serializedKeySize());
+                assertEquals(-1, recordMetadata.serializedValueSize());
+                assertEquals(-1, recordMetadata.partition());
+            };
+
+            producer.send(record, callBack);
+        }
+    }
+
     private static final List<String> CLIENT_IDS = new ArrayList<>();
 
     public static class SerializerForClientId implements Serializer<byte[]> {

Reply via email to