[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480505#comment-16480505
 ] 

ASF GitHub Bot commented on FLINK-8500:
---------------------------------------

Github user tzulitai commented on the issue:

    https://github.com/apache/flink/pull/5958
  
    @FredTing we had some offline discussion on how to proceed with this.
    @aljoscha, @twalthr, or @StephanEwen can probably comment more here if I 
missed anything.
    
    The conflict that Stephan mentioned between a "common deserialization 
schema" interface and exposing surfacing connector specific information is 
rooted in the fact that both concerns (deserialization and providing connector 
specific record meta information) is currently coupled in a single interface.
    
    Take for example the Kafka connector's `KeyedDeserializationSchema` - there 
we try to deserialize the Kafka bytes, as well as provide information such as 
topic / partition / timestamp etc. to allow the user to enrich their user 
records for downstream business logic. The first part (deserialization of 
bytes) should be something common for all connector sources, while the second 
part is Kafka-specific.
    
    Therefore, we should perhaps break this up into two separate interfaces, as 
follows:
    ```
    // common interface for all sources (we already have this)
    interface DeserializationSchema<T> {
        T deserialize(byte[] bytes);
    }
    
    // ... and a Kafka-specific interface that is only used to provide record 
meta information
    interface ConsumerRecordMetaInfoProvider<T> {
        T enrich(T record, ConsumerRecordMetaInfo metaInfo);
    }
    ```
    
    The second interface is something that each connector should have 
independently, and does not handle deserialization of the record bytes. The 
name, of course, is still open to discussion.
    
    What do you think?


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-8500
>                 URL: https://issues.apache.org/jira/browse/FLINK-8500
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.4.0
>            Reporter: yanxiaobin
>            Priority: Major
>             Fix For: 1.6.0
>
>         Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to