junrao commented on code in PR #12049:
URL: https://github.com/apache/kafka/pull/12049#discussion_r866393904


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1403,25 +1452,54 @@ public boolean isDone() {
     }
 
     /**
-     * A callback called when producer request is complete. It in turn calls 
user-supplied callback (if given) and
-     * notifies producer interceptors about the request completion.
+     * Callbacks that are called by the RecordAccumulator append functions:
+     *  - user callback
+     *  - interceptor callbacks
+     *  - partition callback
      */
-    private static class InterceptorCallback<K, V> implements Callback {
+    private class AppendCallbacks<K, V> implements 
RecordAccumulator.AppendCallbacks {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
-        private final TopicPartition tp;
+        private final ProducerRecord<K, V> record;
+        protected int partition = RecordMetadata.UNKNOWN_PARTITION;
 
-        private InterceptorCallback(Callback userCallback, 
ProducerInterceptors<K, V> interceptors, TopicPartition tp) {
+        private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, 
V> interceptors, ProducerRecord<K, V> record) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
-            this.tp = tp;
+            this.record = record;
         }
 
+        @Override
         public void onCompletion(RecordMetadata metadata, Exception exception) 
{
-            metadata = metadata != null ? metadata : new RecordMetadata(tp, 
-1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
+            if (metadata == null) {
+                metadata = new RecordMetadata(topicPartition(), -1, -1, 
RecordBatch.NO_TIMESTAMP, -1, -1);
+            }
             this.interceptors.onAcknowledgement(metadata, exception);
             if (this.userCallback != null)
                 this.userCallback.onCompletion(metadata, exception);
         }
+
+        @Override
+        public void setPartition(int partition) {
+            assert partition != RecordMetadata.UNKNOWN_PARTITION;
+            this.partition = partition;
+
+            if (log.isTraceEnabled()) {
+                // Log the message here, because we don't know the partition 
before that.
+                log.trace("Attempting to append record {} with callback {} to 
topic {} partition {}", record, userCallback, record.topic(), partition);
+            }
+        }
+
+        public int getPartition() {
+            return partition;
+        }
+
+        public TopicPartition topicPartition() {
+            if (record == null)

Review Comment:
   Hmm, by returning a null, we are breaking the contract that 
RecordMetadata.topic() and RecordMetadata.partition() is never null in producer 
callback and interceptor. Is that a new or existing test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to