You must use a ProcessFunction for this, the timestamps are not exposed in any way to map/flatmap functions.

On 10.04.2018 12:29, Ben Yan wrote:
Hi Fabian.

If I use ProcessFunction , I can get it! But I want to know that how to get Kafka timestamp in like flatmap and map methods of datastream using scala programming language.
Thanks!

Best
Ben

On Apr 4, 2018, at 7:00 PM, Fabian Hueske <fhue...@gmail.com <mailto:fhue...@gmail.com>> wrote:

Hi Navneeth,

Flink's KafkaConsumer automatically attaches Kafka's ingestion timestamp if you configure EventTime for an application [1]. Since Flink treats record timestamps as meta data, they are not directly accessible by most functions. You can implement a ProcessFunction [2] to access the timestamp of a record via the ProcessFunction's Context object.

Best, Fabian

[1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/process_function.html#the-processfunction

2018-03-30 7:45 GMT+02:00 Ben Yan <yan.xiao.bin.m...@gmail.com <mailto:yan.xiao.bin.m...@gmail.com>>:

    hi,
    Is that what you mean?
    See :
    
https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel&focusedCommentId=16377145#comment-16377145
    
<https://issues.apache.org/jira/browse/FLINK-8500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16377145#comment-16377145>


    Best
    Ben

    On 30 Mar 2018, at 12:23 PM, Navneeth Krishnan
    <reachnavnee...@gmail.com <mailto:reachnavnee...@gmail.com>> wrote:

    Hi,

    Is there way to get the kafka timestamp in deserialization
    schema? All records are written to kafka with timestamp and I
    would like to set that timestamp to every record that is
    ingested. Thanks.




Reply via email to