[
https://issues.apache.org/jira/browse/KAFKA-4691?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15838619#comment-15838619
]
Francesco Lemma commented on KAFKA-4691:
----------------------------------------
Thanks Matthias. I perfectly understand what you are saying and I agree 100%.
Maybe I'm not expressing myself clearly here. What I'm proposing here is to
extract the call to the interceptors from the {{KafkaProducer}} and put it in
the {{RecordCollectorImpl}}. To avoid affecting non Streams implementation
there could be an overloaded method in the {{KafkaProducer}} similar to this:
{code:java}
//Overloaded method. This will be called by the RecordCollectorImpl with
triggerInterceptors = false
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
callback, boolean triggerInterceptors) {
// intercept the record, which can be potentially modified; this method
does not throw exceptions
ProducerRecord<K, V> interceptedRecord = this.interceptors == null ||
!triggerInterceptors ? record : this.interceptors.onSend(record);
return doSend(interceptedRecord, callback);
}
//This method has the original signature. All calls to this KafkaProducer send
method will ideally keep calling this method
public Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback
callback) {
send(record, callback, true);
}
{code}
Then the {{RecordCollectorImpl.send(...)}} method could potentially be modified
as follows:
{code:java}
@Override
public <K, V> void send(ProducerRecord<K, V> record, Serializer<K>
keySerializer, Serializer<V> valueSerializer,
StreamPartitioner<K, V> partitioner) {
//The visibility of interceptors would need to be changed or other way
to expose them should be implemented.
//This line has been added from the KafkaProducer
ProducerRecord<K, V> interceptedRecord = this.producer.interceptors ==
null || !triggerInterceptors ? record :
this.producer.interceptors.onSend(record);
byte[] keyBytes = keySerializer.serialize(interceptedRecord.topic(),
interceptedRecord.key());
byte[] valBytes = valueSerializer.serialize(interceptedRecord.topic(),
interceptedRecord.value());
Integer partition = interceptedRecord.partition();
if (partition == null && partitioner != null) {
List<PartitionInfo> partitions =
this.producer.partitionsFor(interceptedRecord.topic());
if (partitions != null && partitions.size() > 0)
partition = partitioner.partition(interceptedRecord.key(),
interceptedRecord.value(), partitions.size());
}
ProducerRecord<byte[], byte[]> serializedRecord =
new ProducerRecord<>(interceptedRecord.topic(), partition,
interceptedRecord.timestamp(), keyBytes, valBytes);
final String topic = serializedRecord.topic();
.......
.......
.......
}
{code}
I would be more than willing to make the changes and send a pull request if at
all it makes sense to you guys. I believe that the ProducerInterceptor is a
very valuable functionality and this issue makes it realistically not very
useful within KafkaStreams.
> ProducerInterceptor.onSend() is called after key and value are serialized
> -------------------------------------------------------------------------
>
> Key: KAFKA-4691
> URL: https://issues.apache.org/jira/browse/KAFKA-4691
> Project: Kafka
> Issue Type: Bug
> Components: clients, streams
> Affects Versions: 0.10.1.1
> Reporter: Francesco Lemma
> Labels: easyfix
> Attachments: 2017-01-24 00_50_55-SDG_CR33_DevStudio - Java EE -
> org.apache.kafka.streams.processor.internals.Reco.png
>
>
> According to the JavaDoc
> (https://kafka.apache.org/0101/javadoc/org/apache/kafka/clients/producer/ProducerInterceptor.html)
> " This is called from KafkaProducer.send(ProducerRecord) and
> KafkaProducer.send(ProducerRecord, Callback) methods, before key and value
> get serialized and partition is assigned (if partition is not specified in
> ProducerRecord)".
> Although when using this with Kafka Streams
> (StreamsConfig.producerPrefix(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG)) the
> key and value contained in the record object are already serialized.
> As you can see from the screenshot, the serialization is performed inside
> RecordCollectionImpl.send(ProducerRecord<K, V> record, Serializer<K>
> keySerializer, Serializer<V> valueSerializer,
> StreamPartitioner<K, V> partitioner), effectively
> before calling the send method of the producer which will trigger the
> interceptor.
> This makes it unable to perform any kind of operation involving the key or
> value of the message, unless at least performing an additional
> deserialization.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)