Repository: kafka
Updated Branches:
  refs/heads/trunk 858047a12 -> 9a836d015


KAFKA-3303; Pass partial record metadata to 
ProducerInterceptor.onAcknowledgement on error

This is a KIP-42 followup.

Currently, If sending the record fails before it gets to the server, 
ProducerInterceptor.onAcknowledgement() is called with metadata == null, and 
non-null exception. However, it is useful to pass topic and partition, if 
known, to ProducerInterceptor.onAcknowledgement() as well. This patch ensures 
that  ProducerInterceptor.onAcknowledgement()  gets record metadata with topic 
and maybe partition. If partition is not set in 'record' and 
KafkaProducer.send() fails before partition gets assigned, then 
ProducerInterceptor.onAcknowledgement() gets RecordMetadata with partition == 
-1. Only time when  ProducerInterceptor.onAcknowledgement() gets null record 
metadata is when the client passes null record to KafkaProducer.send().

Author: Anna Povzner <[email protected]>

Reviewers: Ismael Juma <[email protected]>, Ashish Singh <[email protected]>, 
Jun Rao <[email protected]>

Closes #1015 from apovzner/kip42-3


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9a836d01
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9a836d01
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9a836d01

Branch: refs/heads/trunk
Commit: 9a836d0154efe6ea1effc688567186cb56265bf4
Parents: 858047a
Author: Anna Povzner <[email protected]>
Authored: Wed Mar 16 17:29:29 2016 -0700
Committer: Jun Rao <[email protected]>
Committed: Wed Mar 16 17:29:29 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   | 36 ++++++++----
 .../clients/producer/ProducerInterceptor.java   |  7 ++-
 .../kafka/clients/producer/RecordMetadata.java  |  5 ++
 .../internals/ProducerInterceptors.java         | 35 +++++++++++-
 .../internals/ProducerInterceptorsTest.java     | 58 ++++++++++++++++++++
 .../kafka/test/MockProducerInterceptor.java     |  9 ++-
 .../kafka/api/PlaintextConsumerTest.scala       |  5 +-
 7 files changed, 138 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/9a836d01/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index c87973a..6acc059 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -427,9 +427,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
     public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback 
callback) {
         // intercept the record, which can be potentially modified; this 
method does not throw exceptions
         ProducerRecord<K, V> interceptedRecord = this.interceptors == null ? 
record : this.interceptors.onSend(record);
-        // producer callback will make sure to call both 'callback' and 
interceptor callback
-        Callback interceptCallback = this.interceptors == null ? callback : 
new InterceptorCallback<>(callback, this.interceptors);
-        return doSend(interceptedRecord, interceptCallback);
+        return doSend(interceptedRecord, callback);
     }
 
     /**
@@ -437,6 +435,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * See {@link #send(ProducerRecord, Callback)} for details.
      */
     private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, 
Callback callback) {
+        TopicPartition tp = null;
         try {
             // first make sure the metadata for the topic is available
             long waitedOnMetadataMs = waitOnMetadata(record.topic(), 
this.maxBlockTimeMs);
@@ -460,10 +459,12 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
             int partition = partition(record, serializedKey, serializedValue, 
metadata.fetch());
             int serializedSize = Records.LOG_OVERHEAD + 
Record.recordSize(serializedKey, serializedValue);
             ensureValidRecordSize(serializedSize);
-            TopicPartition tp = new TopicPartition(record.topic(), partition);
+            tp = new TopicPartition(record.topic(), partition);
             long timestamp = record.timestamp() == null ? time.milliseconds() 
: record.timestamp();
             log.trace("Sending record {} with callback {} to topic {} 
partition {}", record, callback, record.topic(), partition);
-            RecordAccumulator.RecordAppendResult result = 
accumulator.append(tp, timestamp, serializedKey, serializedValue, callback, 
remainingWaitMs);
+            // producer callback will make sure to call both 'callback' and 
interceptor callback
+            Callback interceptCallback = this.interceptors == null ? callback 
: new InterceptorCallback<>(callback, this.interceptors, tp);
+            RecordAccumulator.RecordAppendResult result = 
accumulator.append(tp, timestamp, serializedKey, serializedValue, 
interceptCallback, remainingWaitMs);
             if (result.batchIsFull || result.newBatchCreated) {
                 log.trace("Waking up the sender since topic {} partition {} is 
either full or getting a new batch", record.topic(), partition);
                 this.sender.wakeup();
@@ -477,27 +478,29 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
             if (callback != null)
                 callback.onCompletion(null, e);
             this.errors.record();
+            if (this.interceptors != null)
+                this.interceptors.onSendError(record, tp, e);
             return new FutureFailure(e);
         } catch (InterruptedException e) {
             this.errors.record();
             if (this.interceptors != null)
-                this.interceptors.onAcknowledgement(null, e);
+                this.interceptors.onSendError(record, tp, e);
             throw new InterruptException(e);
         } catch (BufferExhaustedException e) {
             this.errors.record();
             this.metrics.sensor("buffer-exhausted-records").record();
             if (this.interceptors != null)
-                this.interceptors.onAcknowledgement(null, e);
+                this.interceptors.onSendError(record, tp, e);
             throw e;
         } catch (KafkaException e) {
             this.errors.record();
             if (this.interceptors != null)
-                this.interceptors.onAcknowledgement(null, e);
+                this.interceptors.onSendError(record, tp, e);
             throw e;
         } catch (Exception e) {
             // we notify interceptor about all exceptions, since onSend is 
called before anything else in this method
             if (this.interceptors != null)
-                this.interceptors.onAcknowledgement(null, e);
+                this.interceptors.onSendError(record, tp, e);
             throw e;
         }
     }
@@ -763,15 +766,24 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
     private static class InterceptorCallback<K, V> implements Callback {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
+        private final TopicPartition tp;
 
-        public InterceptorCallback(Callback userCallback, 
ProducerInterceptors<K, V> interceptors) {
+        public InterceptorCallback(Callback userCallback, 
ProducerInterceptors<K, V> interceptors,
+                                   TopicPartition tp) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
+            this.tp = tp;
         }
 
         public void onCompletion(RecordMetadata metadata, Exception exception) 
{
-            if (this.interceptors != null)
-                this.interceptors.onAcknowledgement(metadata, exception);
+            if (this.interceptors != null) {
+                if (metadata == null) {
+                    this.interceptors.onAcknowledgement(new RecordMetadata(tp, 
-1, -1, Record.NO_TIMESTAMP, -1, -1, -1),
+                                                        exception);
+                } else {
+                    this.interceptors.onAcknowledgement(metadata, exception);
+                }
+            }
             if (this.userCallback != null)
                 this.userCallback.onCompletion(metadata, exception);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a836d01/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
index aa18fdc..e835a69 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerInterceptor.java
@@ -76,7 +76,12 @@ public interface ProducerInterceptor<K, V> extends 
Configurable {
      * This method will generally execute in the background I/O thread, so the 
implementation should be reasonably fast.
      * Otherwise, sending of messages from other threads could be delayed.
      *
-     * @param metadata The metadata for the record that was sent (i.e. the 
partition and offset). Null if an error occurred.
+     * @param metadata The metadata for the record that was sent (i.e. the 
partition and offset).
+     *                 If an error occurred, metadata will contain only valid 
topic and maybe
+     *                 partition. If partition is not given in ProducerRecord 
and an error occurs
+     *                 before partition gets assigned, then partition will be 
set to RecordMetadata.NO_PARTITION.
+     *                 The metadata may be null if the client passed null 
record to
+     *                 {@link 
org.apache.kafka.clients.producer.KafkaProducer#send(ProducerRecord)}.
      * @param exception The exception thrown during processing of this record. 
Null if no error occurred.
      */
     public void onAcknowledgement(RecordMetadata metadata, Exception 
exception);

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a836d01/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
index c60a53d..988da16 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/RecordMetadata.java
@@ -23,6 +23,11 @@ import org.apache.kafka.common.TopicPartition;
  */
 public final class RecordMetadata {
 
+    /**
+     * Partition value for record without partition assigned
+     */
+    public static final int UNKNOWN_PARTITION = -1;
+
     private final long offset;
     // The timestamp of the message.
     // If LogAppendTime is used for the topic, the timestamp will be the 
timestamp returned by the broker.

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a836d01/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
index 9343a2e..8466d3a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerInterceptors.java
@@ -20,6 +20,8 @@ package org.apache.kafka.clients.producer.internals;
 import org.apache.kafka.clients.producer.ProducerInterceptor;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.record.Record;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -76,7 +78,8 @@ public class ProducerInterceptors<K, V> implements Closeable {
      *
      * This method does not throw exceptions. Exceptions thrown by any of 
interceptor methods are caught and ignored.
      *
-     * @param metadata The metadata for the record that was sent (i.e. the 
partition and offset). Null if an error occurred.
+     * @param metadata The metadata for the record that was sent (i.e. the 
partition and offset).
+     *                 If an error occurred, metadata will only contain valid 
topic and maybe partition.
      * @param exception The exception thrown during processing of this record. 
Null if no error occurred.
      */
     public void onAcknowledgement(RecordMetadata metadata, Exception 
exception) {
@@ -91,6 +94,36 @@ public class ProducerInterceptors<K, V> implements Closeable 
{
     }
 
     /**
+     * This method is called when sending the record fails in {@link 
ProducerInterceptor#onSend
+     * (ProducerRecord)} method. This method calls {@link 
ProducerInterceptor#onAcknowledgement(RecordMetadata, Exception)}
+     * method for each interceptor
+     *
+     * @param record The record from client
+     * @param interceptTopicPartition  The topic/partition for the record if 
an error occurred
+     *        after partition gets assigned; the topic part of 
interceptTopicPartition is the same as in record.
+     * @param exception The exception thrown during processing of this record.
+     */
+    public void onSendError(ProducerRecord<K, V> record, TopicPartition 
interceptTopicPartition, Exception exception) {
+        for (ProducerInterceptor<K, V> interceptor : this.interceptors) {
+            try {
+                if (record == null && interceptTopicPartition == null) {
+                    interceptor.onAcknowledgement(null, exception);
+                } else {
+                    if (interceptTopicPartition == null) {
+                        interceptTopicPartition = new 
TopicPartition(record.topic(),
+                                                                     
record.partition() == null ? RecordMetadata.UNKNOWN_PARTITION : 
record.partition());
+                    }
+                    interceptor.onAcknowledgement(new 
RecordMetadata(interceptTopicPartition, -1, -1, Record.NO_TIMESTAMP, -1, -1, 
-1),
+                                                  exception);
+                }
+            } catch (Exception e) {
+                // do not propagate interceptor exceptions, just log
+                log.warn("Error executing interceptor onAcknowledgement 
callback", e);
+            }
+        }
+    }
+
+    /**
      * Closes every interceptor in a container.
      */
     @Override

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a836d01/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
index 5a32dda..2135eb2 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerInterceptorsTest.java
@@ -30,6 +30,9 @@ public class ProducerInterceptorsTest {
     private final TopicPartition tp = new TopicPartition("test", 0);
     private final ProducerRecord<Integer, String> producerRecord = new 
ProducerRecord<>("test", 0, 1, "value");
     private int onAckCount = 0;
+    private int onErrorAckCount = 0;
+    private int onErrorAckWithTopicSetCount = 0;
+    private int onErrorAckWithTopicPartitionSetCount = 0;
     private int onSendCount = 0;
 
     private class AppendProducerInterceptor implements 
ProducerInterceptor<Integer, String> {
@@ -59,6 +62,16 @@ public class ProducerInterceptorsTest {
         @Override
         public void onAcknowledgement(RecordMetadata metadata, Exception 
exception) {
             onAckCount++;
+            if (exception != null) {
+                onErrorAckCount++;
+                // the length check is just to call topic() method and let it 
throw an exception
+                // if RecordMetadata.TopicPartition is null
+                if (metadata != null && metadata.topic().length() >= 0) {
+                    onErrorAckWithTopicSetCount++;
+                    if (metadata.partition() >= 0)
+                        onErrorAckWithTopicPartitionSetCount++;
+                }
+            }
             if (throwExceptionOnAck)
                 throw new KafkaException("Injected exception in 
AppendProducerInterceptor.onAcknowledgement");
         }
@@ -143,5 +156,50 @@ public class ProducerInterceptorsTest {
 
         interceptors.close();
     }
+
+    @Test
+    public void testOnAcknowledgementWithErrorChain() {
+        List<ProducerInterceptor<Integer, String>> interceptorList = new 
ArrayList<>();
+        AppendProducerInterceptor interceptor1 = new 
AppendProducerInterceptor("One");
+        interceptorList.add(interceptor1);
+        ProducerInterceptors<Integer, String> interceptors = new 
ProducerInterceptors<>(interceptorList);
+
+        // verify that metadata contains both topic and partition
+        interceptors.onSendError(producerRecord,
+                                 new TopicPartition(producerRecord.topic(), 
producerRecord.partition()),
+                                 new KafkaException("Test"));
+        assertEquals(1, onErrorAckCount);
+        assertEquals(1, onErrorAckWithTopicPartitionSetCount);
+
+        // verify that metadata contains both topic and partition (because 
record already contains partition)
+        interceptors.onSendError(producerRecord, null, new 
KafkaException("Test"));
+        assertEquals(2, onErrorAckCount);
+        assertEquals(2, onErrorAckWithTopicPartitionSetCount);
+
+        // if producer record does not contain partition, interceptor should 
get partition == -1
+        ProducerRecord<Integer, String> record2 = new 
ProducerRecord<>("test2", null, 1, "value");
+        interceptors.onSendError(record2, null, new KafkaException("Test"));
+        assertEquals(3, onErrorAckCount);
+        assertEquals(3, onErrorAckWithTopicSetCount);
+        assertEquals(2, onErrorAckWithTopicPartitionSetCount);
+
+        // if producer record does not contain partition, but topic/partition 
is passed to
+        // onSendError, then interceptor should get valid partition
+        int reassignedPartition = producerRecord.partition() + 1;
+        interceptors.onSendError(record2,
+                                 new TopicPartition(record2.topic(), 
reassignedPartition),
+                                 new KafkaException("Test"));
+        assertEquals(4, onErrorAckCount);
+        assertEquals(4, onErrorAckWithTopicSetCount);
+        assertEquals(3, onErrorAckWithTopicPartitionSetCount);
+
+        // if both record and topic/partition are null, interceptor should not 
receive metadata
+        interceptors.onSendError(null, null, new KafkaException("Test"));
+        assertEquals(5, onErrorAckCount);
+        assertEquals(4, onErrorAckWithTopicSetCount);
+        assertEquals(3, onErrorAckWithTopicPartitionSetCount);
+
+        interceptors.close();
+    }
 }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a836d01/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java 
b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
index cee1247..9e4d0de 100644
--- a/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
+++ b/clients/src/test/java/org/apache/kafka/test/MockProducerInterceptor.java
@@ -32,6 +32,7 @@ public class MockProducerInterceptor implements 
ProducerInterceptor<String, Stri
     public static final AtomicInteger ONSEND_COUNT = new AtomicInteger(0);
     public static final AtomicInteger ON_SUCCESS_COUNT = new AtomicInteger(0);
     public static final AtomicInteger ON_ERROR_COUNT = new AtomicInteger(0);
+    public static final AtomicInteger ON_ERROR_WITH_METADATA_COUNT = new 
AtomicInteger(0);
     public static final String APPEND_STRING_PROP = "mock.interceptor.append";
     private String appendStr;
 
@@ -64,9 +65,12 @@ public class MockProducerInterceptor implements 
ProducerInterceptor<String, Stri
 
     @Override
     public void onAcknowledgement(RecordMetadata metadata, Exception 
exception) {
-        if (exception != null)
+        if (exception != null) {
             ON_ERROR_COUNT.incrementAndGet();
-        else if (metadata != null)
+            if (metadata != null) {
+                ON_ERROR_WITH_METADATA_COUNT.incrementAndGet();
+            }
+        } else if (metadata != null)
             ON_SUCCESS_COUNT.incrementAndGet();
     }
 
@@ -81,5 +85,6 @@ public class MockProducerInterceptor implements 
ProducerInterceptor<String, Stri
         ONSEND_COUNT.set(0);
         ON_SUCCESS_COUNT.set(0);
         ON_ERROR_COUNT.set(0);
+        ON_ERROR_WITH_METADATA_COUNT.set(0);
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/9a836d01/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 9bdbf6d..8014479 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -568,7 +568,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
       testProducer.send(null, null)
       fail("Should not allow sending a null record")
     } catch {
-      case e: Throwable => assertEquals("Interceptor should be notified about 
exception", 1, MockProducerInterceptor.ON_ERROR_COUNT.intValue()) // this is ok
+      case e: Throwable => {
+        assertEquals("Interceptor should be notified about exception", 1, 
MockProducerInterceptor.ON_ERROR_COUNT.intValue())
+        assertEquals("Interceptor should not receive metadata with an 
exception when record is null", 0, 
MockProducerInterceptor.ON_ERROR_WITH_METADATA_COUNT.intValue())
+      }
     }
 
     // create consumer with interceptor

Reply via email to