[ 
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16583407#comment-16583407
 ] 

ASF GitHub Bot commented on FLINK-8500:
---------------------------------------

tzulitai commented on a change in pull request #6105: [FLINK-8500] Get the 
timestamp of the Kafka message from kafka consumer
URL: https://github.com/apache/flink/pull/6105#discussion_r210810967
 
 

 ##########
 File path: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchema.java
 ##########
 @@ -45,6 +45,22 @@
         */
        T deserialize(byte[] messageKey, byte[] message, String topic, int 
partition, long offset) throws IOException;
 
+       /**
+        * Deserializes the byte message.
+        *
+        * @param messageKey the key as a byte array (null if no key has been 
set).
+        * @param message The message, as a byte array (null if the message was 
empty or deleted).
+        * @param partition The partition the message has originated from.
+        * @param offset the offset of the message in the original source (for 
example the Kafka offset).
+        * @param timestamp the timestamp of the consumer record
+        * @param timestampType The timestamp type, could be NO_TIMESTAMP, 
CREATE_TIME or INGEST_TIME.
+        *
+        * @return The deserialized message as an object (null if the message 
cannot be deserialized).
+        */
+       default T deserialize(byte[] messageKey, byte[] message, String topic, 
int partition, long offset, long timestamp, TimestampType timestampType) throws 
IOException {
 
 Review comment:
   This would mean that, if users want to access timestamps, they would have to 
implement both deserialize methods, correct?
   
   This feels a bit awkward. I can think of 2 possible approaches to address 
this:
   1. Let the original non-timestamped deserialize method also be default, but 
have an  empty method. It's still not nice though, since now both deserialize 
methods are default, moreover the second deserialize method relies on the first 
one.
   
   2. Instead of having a new deserialize method with the timestamp, we have an 
`setTimestamp` method that is only ever called in Kafka 0.10+, which signature 
is `void setTimestamp(T deserializedRecord, long timestamp)`. This might also 
match better what we've discussed in the JIRA ticket, about having separate 
interfaces for "deserialization" and "enrichment".

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Get the timestamp of the Kafka message from kafka consumer(Kafka010Fetcher)
> ---------------------------------------------------------------------------
>
>                 Key: FLINK-8500
>                 URL: https://issues.apache.org/jira/browse/FLINK-8500
>             Project: Flink
>          Issue Type: Improvement
>          Components: Kafka Connector
>    Affects Versions: 1.4.0
>            Reporter: yanxiaobin
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.7.0
>
>         Attachments: image-2018-01-30-14-58-58-167.png, 
> image-2018-01-31-10-48-59-633.png
>
>
> The method deserialize of KeyedDeserializationSchema  needs a parameter 
> 'kafka message timestamp' (from ConsumerRecord) .In some business scenarios, 
> this is useful!
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to