junrao commented on code in PR #12049:
URL: https://github.com/apache/kafka/pull/12049#discussion_r866187547


##########
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java:
##########
@@ -1403,25 +1452,54 @@ public boolean isDone() {
     }
 
     /**
-     * A callback called when producer request is complete. It in turn calls 
user-supplied callback (if given) and
-     * notifies producer interceptors about the request completion.
+     * Callbacks that are called by the RecordAccumulator append functions:
+     *  - user callback
+     *  - interceptor callbacks
+     *  - partition callback
      */
-    private static class InterceptorCallback<K, V> implements Callback {
+    private class AppendCallbacks<K, V> implements 
RecordAccumulator.AppendCallbacks {
         private final Callback userCallback;
         private final ProducerInterceptors<K, V> interceptors;
-        private final TopicPartition tp;
+        private final ProducerRecord<K, V> record;
+        protected int partition = RecordMetadata.UNKNOWN_PARTITION;
 
-        private InterceptorCallback(Callback userCallback, 
ProducerInterceptors<K, V> interceptors, TopicPartition tp) {
+        private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, 
V> interceptors, ProducerRecord<K, V> record) {
             this.userCallback = userCallback;
             this.interceptors = interceptors;
-            this.tp = tp;
+            this.record = record;
         }
 
+        @Override
         public void onCompletion(RecordMetadata metadata, Exception exception) 
{
-            metadata = metadata != null ? metadata : new RecordMetadata(tp, 
-1, -1, RecordBatch.NO_TIMESTAMP, -1, -1);
+            if (metadata == null) {
+                metadata = new RecordMetadata(topicPartition(), -1, -1, 
RecordBatch.NO_TIMESTAMP, -1, -1);
+            }
             this.interceptors.onAcknowledgement(metadata, exception);
             if (this.userCallback != null)
                 this.userCallback.onCompletion(metadata, exception);
         }
+
+        @Override
+        public void setPartition(int partition) {
+            assert partition != RecordMetadata.UNKNOWN_PARTITION;
+            this.partition = partition;
+
+            if (log.isTraceEnabled()) {
+                // Log the message here, because we don't know the partition 
before that.
+                log.trace("Attempting to append record {} with callback {} to 
topic {} partition {}", record, userCallback, record.topic(), partition);
+            }
+        }
+
+        public int getPartition() {
+            return partition;
+        }
+
+        public TopicPartition topicPartition() {
+            if (record == null)

Review Comment:
   We have a bunch of code in send() that depends on record being not null. 
Perhaps it's better to assert non-null record early in send()?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to