I'll take a look into the ProcessFunction, thanks for the suggestion. -Jia
On Wed, May 17, 2017 at 12:33 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> wrote: > Hi Jia, > > Actually just realized you can access the timestamp of records via the > more powerful `ProcessFunction` [1]. > That’ll be a bit easier to use than implementing your own custom operator, > which is quite low-level. > > Cheers, > Gordon > > [1] https://ci.apache.org/projects/flink/flink-docs- > release-1.2/dev/stream/process_function.html > > On 17 May 2017 at 1:45:38 PM, Jia Teoh (jiat...@gmail.com) wrote: > > Hi Gordon, > > The timestamps are required for application logic. Thank you for > clarifying the custom operators - seems I mistakenly thought of the > functions that are passed to the operators rather than the operators > themselves. AbstractStreamOperator and the other classes you mentioned seem > like exactly what I'm looking for, so I will take a look into implementing > a custom one for my use case. > > Thank you, > Jia > > On Tue, May 16, 2017 at 9:52 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org> > wrote: > >> Hi Jia, >> >> How exactly do you want to use the Kafka timestamps? Do you want to >> access them and alter them with new values as the record timestamp? Or do >> you want to use them for some application logic in your functions? >> >> If its the former, you should be able to do that by using timestamp / >> watermark extractors. They come with an interface that exposes the current >> timestamp of the record. For Kafka 0.10, that timestamp would be the Kafka >> record’s timestamp if it hasn’t been explicitly assigned any other >> timestamp yet. >> >> If its the latter, then I think currently you have to use custom >> operators as Robert mentioned. >> Custom operators are classes that extend the `AbstractStreamOperator` >> base class as one of `OneInputStreamOperator` or `TwoInputStreamOperator` >> interfaces. >> You can take a look at the basic `StreamMap` or `StreamFlatMap` classes >> for an example, which are the underlying operators for the map and flatMap >> functions. >> >> At the operator level, you’ll have access to a `StreamRecord` in the >> `processElement` function which wraps the record value (which you get when >> implementing functions) as well as the internal timestamp that comes with >> the record. >> >> Cheers, >> Gordon >> >> >> On 17 May 2017 at 4:27:36 AM, Jia Teoh (jiat...@gmail.com) wrote: >> >> Hi Robert, >> >> Thanks for the reply. I ended up implementing an extension of the Kafka >> fetcher and consumer so that the deserialization API can include the >> timestamp field, which is sufficient for my specific use case. I can share >> the code if desired but it seems like it's an intentional design decision >> not to expose the timestamp in the deserialization API. >> >> I noticed you mentioned that I could use a custom operator to access the >> record event time. Could you elaborate on what you mean by "operator"? I >> initially thought that referred to DataStream.map/reduce/etc, but none of >> those functions provide arguments that can be used to extract the embedded >> timestamp. >> >> Thanks, >> Jia Teoh >> >> On Fri, May 12, 2017 at 9:25 AM, Robert Metzger <rmetz...@apache.org> >> wrote: >> >>> Hi Jia, >>> >>> The Kafka 0.10 connector internally relies on the Kafka09 fetcher, but >>> it is extensible / pluggable so that also the Kafka 0.9 Fetcher can read >>> the event timestamps from Kafka 10. >>> We don't expose the timestamp through the deserilaization API, because >>> we set it internally in Flink. (there is a "hidden" field with each record >>> containing the event time of the event) >>> >>> With a custom operator you can access the event time of a record. >>> >>> On Fri, May 12, 2017 at 3:26 AM, Jia Teoh <jiat...@gmail.com> wrote: >>> >>>> Hi, >>>> >>>> Is there a way to retrieve the timestamps that Kafka associates with >>>> each key-value pair within Flink? I would like to be able to use these as >>>> values within my application flow, and defining them before or after Kafka >>>> is not acceptable for the use case due to the latency involved in sending >>>> or receiving from Kafka. >>>> >>>> It seems that Flink supports Kafka event time (link >>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010>) >>>> but after a brief trace it seems that KafkaConsumer010 still relies on the >>>> Kafka09Fetcher >>>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L137> >>>> for >>>> iterating through each Kafka record and deserializing it. The >>>> KeyedDeserializationSchema api does not seem to have support for including >>>> timestamp as additional metadata (just offset, topic, and partition) so >>>> something such as JSONKeyValueDeserializationSchema will not return >>>> the Kafka-specified timestamp. >>>> >>>> For reference, I am using Kafka 0.10.2 and the Flink-Streaming API + >>>> Kafka Connector (1.2.1). >>>> >>>> Thanks, >>>> Jia Teoh >>>> >>> >>> >> >