junrao commented on code in PR #12049: URL: https://github.com/apache/kafka/pull/12049#discussion_r867072573
########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -1403,25 +1452,54 @@ public boolean isDone() { } /** - * A callback called when producer request is complete. It in turn calls user-supplied callback (if given) and - * notifies producer interceptors about the request completion. + * Callbacks that are called by the RecordAccumulator append functions: + * - user callback + * - interceptor callbacks + * - partition callback */ - private static class InterceptorCallback<K, V> implements Callback { + private class AppendCallbacks<K, V> implements RecordAccumulator.AppendCallbacks { private final Callback userCallback; private final ProducerInterceptors<K, V> interceptors; - private final TopicPartition tp; + private final ProducerRecord<K, V> record; + protected int partition = RecordMetadata.UNKNOWN_PARTITION; - private InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> interceptors, TopicPartition tp) { + private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) { this.userCallback = userCallback; this.interceptors = interceptors; - this.tp = tp; + this.record = record; } + @Override public void onCompletion(RecordMetadata metadata, Exception exception) { - metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1); + if (metadata == null) { + metadata = new RecordMetadata(topicPartition(), -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1); + } this.interceptors.onAcknowledgement(metadata, exception); if (this.userCallback != null) this.userCallback.onCompletion(metadata, exception); } + + @Override + public void setPartition(int partition) { + assert partition != RecordMetadata.UNKNOWN_PARTITION; + this.partition = partition; + + if (log.isTraceEnabled()) { + // Log the message here, because we don't know the partition before that. + log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition); + } + } + + public int getPartition() { + return partition; + } + + public TopicPartition topicPartition() { + if (record == null) Review Comment: Thanks for the explanation. Since this is the existing behavior, we could keep the code as it is in the PR and improve it in the future. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org