[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16481515#comment-16481515
 ] 

Fred Teunissen commented on FLINK-8500:
---------------------------------------

I agree that 'separation of concerns' is a good principle, but in this 
situation I have a problem with it. The interface DeserializationSchema<T> is 
responsible for deserialisation of the message, but in our case the messages 
are encrypted and every topic has its own decryption key. So we need at least 
the topic name during deserialisation, so currently we are using the interface 
KeyedDeserializationSchema<T>. I don't think this is kafka specific, so it 
should be solved in the "common deserialization schema" interface.  

Another concern I have is that by having a two-step approach for creating the 
records from kafka messages, you get more memory allocations and thus more 
garbage collection cycles.

Like [~StephanEwen] said, this issue can be fixed for now by introducing a new 
default method on the interface KeyedDeserializationSchema<T>, but for the long 
run a new `common connector framework` should be designed and implemented. This 
new `common connector framework` should also address some other issues like 
'temporarily idleness of partitions', 'customisable partition assignments' .

When you agree, I can make a new PR with the new default method on the 
interface KeyedDeserializationSchema<T>.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-8500
>                 URL: https://issues.apache.org/jira/browse/FLINK-8500
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.4.0
>            Reporter: yanxiaobin
>            Priority: Major
>             Fix For: 1.6.0
>
>         Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to