[ 
https://issues.apache.org/jira/browse/KAFKA-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15838829#comment-15838829
 ] 

Anna Povzner commented on KAFKA-4691:
-------------------------------------

I agree with [~mjsax] about not changing KafkaProducer API. Instead, not have 
any producer interceptors configured, if we do that change and let Streams 
intercept.

In the case of completely disabling the producer interceptor, and implementing 
this functionality in Streams, RecordCollectorImpl.send() should also call 
interceptor's onAcknowledgement(), in the similar situations as KafkaProducer 
does. E.g. if send() fails, onAcknowledgement() should be called with mostly 
empty RecordMetadata but with topic and partition set. Also, 
onAcknowledgement() should be called from the onCompletion in 
RecordCollectorImpl.send(). It looks like all of that could be implemented in 
RecordCollectorImpl.send(). 

> ProducerInterceptor.onSend() is called after key and value are serialized
> -------------------------------------------------------------------------
>
>                 Key: KAFKA-4691
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4691
>             Project: Kafka
>          Issue Type: Bug
>          Components: clients, streams
>    Affects Versions: 0.10.1.1
>            Reporter: Francesco Lemma
>              Labels: easyfix
>         Attachments: 2017-01-24 00_50_55-SDG_CR33_DevStudio - Java EE - 
> org.apache.kafka.streams.processor.internals.Reco.png
>
>
> According to the JavaDoc 
> (https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html)
>  " This is called from KafkaProducer.send(ProducerRecord) and 
> KafkaProducer.send(ProducerRecord, Callback) methods, before key and value 
> get serialized and partition is assigned (if partition is not specified in 
> ProducerRecord)".
> Although when using this with Kafka Streams 
> (StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG)) the 
> key and value contained in the record object are already serialized.
> As you can see from the screenshot, the serialization is performed inside 
> RecordCollectionImpl.send(ProducerRecord<K, V> record, Serializer<K> 
> keySerializer, Serializer<V> valueSerializer,
>                             StreamPartitioner<K, V> partitioner), effectively 
> before calling the send method of the producer which will trigger the 
> interceptor.
> This makes it unable to perform any kind of operation involving the key or 
> value of the message, unless at least performing an additional 
> deserialization.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to