[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16447144#comment-16447144
 ] 

Fred Teunissen commented on FLINK-8500:
---------------------------------------

We want to use the kafka message timestamp in our business logic, so we would 
like to have this timestamp available during the deserialisation of the 
message. But we also need the TimestampType to be able to detect from whom the 
timestamp came (the producer or the kafka broker).

If we don't want to break the interface we can make a new interface 
{{KeyedAndTimestampDeserializationSchema}} extending the interface 
{{KeyedDeserializationSchema}} and add a 
{{KeyedAndTimestampDeserializationSchemaWrapper}} that accepts a 
{{KeyedDeserializationSchema}}. The FlinkKafkaConsumers constructors can be 
extended to accept the new {{KeyedAndTimestampDeserializationSchema}} as 
parameter and wrap the calls with the {{KeyedDeserializationSchema}} interface.

> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-8500
>                 URL: https://issues.apache.org/jira/browse/FLINK-8500
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.4.0
>            Reporter: yanxiaobin
>            Priority: Major
>             Fix For: 1.6.0
>
>         Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to