junrao commented on code in PR #12049: URL: https://github.com/apache/kafka/pull/12049#discussion_r866187547
########## clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java: ########## @@ -1403,25 +1452,54 @@ public boolean isDone() { } /** - * A callback called when producer request is complete. It in turn calls user-supplied callback (if given) and - * notifies producer interceptors about the request completion. + * Callbacks that are called by the RecordAccumulator append functions: + * - user callback + * - interceptor callbacks + * - partition callback */ - private static class InterceptorCallback<K, V> implements Callback { + private class AppendCallbacks<K, V> implements RecordAccumulator.AppendCallbacks { private final Callback userCallback; private final ProducerInterceptors<K, V> interceptors; - private final TopicPartition tp; + private final ProducerRecord<K, V> record; + protected int partition = RecordMetadata.UNKNOWN_PARTITION; - private InterceptorCallback(Callback userCallback, ProducerInterceptors<K, V> interceptors, TopicPartition tp) { + private AppendCallbacks(Callback userCallback, ProducerInterceptors<K, V> interceptors, ProducerRecord<K, V> record) { this.userCallback = userCallback; this.interceptors = interceptors; - this.tp = tp; + this.record = record; } + @Override public void onCompletion(RecordMetadata metadata, Exception exception) { - metadata = metadata != null ? metadata : new RecordMetadata(tp, -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1); + if (metadata == null) { + metadata = new RecordMetadata(topicPartition(), -1, -1, RecordBatch.NO_TIMESTAMP, -1, -1); + } this.interceptors.onAcknowledgement(metadata, exception); if (this.userCallback != null) this.userCallback.onCompletion(metadata, exception); } + + @Override + public void setPartition(int partition) { + assert partition != RecordMetadata.UNKNOWN_PARTITION; + this.partition = partition; + + if (log.isTraceEnabled()) { + // Log the message here, because we don't know the partition before that. + log.trace("Attempting to append record {} with callback {} to topic {} partition {}", record, userCallback, record.topic(), partition); + } + } + + public int getPartition() { + return partition; + } + + public TopicPartition topicPartition() { + if (record == null) Review Comment: We have a bunch of code in send() that depends on record being not null. Perhaps it's better to assert non-null record early in send()? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org