[ https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16480505#comment-16480505 ]
ASF GitHub Bot commented on FLINK-8500: --------------------------------------- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/5958 @FredTing we had some offline discussion on how to proceed with this. @aljoscha, @twalthr, or @StephanEwen can probably comment more here if I missed anything. The conflict that Stephan mentioned between a "common deserialization schema" interface and exposing surfacing connector specific information is rooted in the fact that both concerns (deserialization and providing connector specific record meta information) is currently coupled in a single interface. Take for example the Kafka connector's `KeyedDeserializationSchema` - there we try to deserialize the Kafka bytes, as well as provide information such as topic / partition / timestamp etc. to allow the user to enrich their user records for downstream business logic. The first part (deserialization of bytes) should be something common for all connector sources, while the second part is Kafka-specific. Therefore, we should perhaps break this up into two separate interfaces, as follows: ``` // common interface for all sources (we already have this) interface DeserializationSchema<T> { T deserialize(byte[] bytes); } // ... and a Kafka-specific interface that is only used to provide record meta information interface ConsumerRecordMetaInfoProvider<T> { T enrich(T record, ConsumerRecordMetaInfo metaInfo); } ``` The second interface is something that each connector should have independently, and does not handle deserialization of the record bytes. The name, of course, is still open to discussion. What do you think? > Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher) > --------------------------------------------------------------------------- > > Key: FLINK-8500 > URL: https://issues.apache.org/jira/browse/FLINK-8500 > Project: Flink > Issue Type: Improvement > Components: Kafka Connector > Affects Versions: 1.4.0 > Reporter: yanxiaobin > Priority: Major > Fix For: 1.6.0 > > Attachments: image-2018-01-30-14-58-58-167.png, > image-2018-01-31-10-48-59-633.png > > > The method deserialize of KeyedDeserializationSchema needs a parameter > 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, > this is useful! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)