I'll take a look into the ProcessFunction, thanks for the suggestion.

-Jia

On Wed, May 17, 2017 at 12:33 AM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
wrote:

> Hi Jia,
>
> Actually just realized you can access the timestamp of records via the
> more powerful `ProcessFunction` [1].
> That’ll be a bit easier to use than implementing your own custom operator,
> which is quite low-level.
>
> Cheers,
> Gordon
>
> [1] https://ci.apache.org/projects/flink/flink-docs-
> release-1.2/dev/stream/process_function.html
>
> On 17 May 2017 at 1:45:38 PM, Jia Teoh (jiat...@gmail.com) wrote:
>
> Hi Gordon,
>
> The timestamps are required for application logic. Thank you for
> clarifying the custom operators - seems I mistakenly thought of the
> functions that are passed to the operators rather than the operators
> themselves. AbstractStreamOperator and the other classes you mentioned seem
> like exactly what I'm looking for, so I will take a look into implementing
> a custom one for my use case.
>
> Thank you,
> Jia
>
> On Tue, May 16, 2017 at 9:52 PM, Tzu-Li (Gordon) Tai <tzuli...@apache.org>
> wrote:
>
>> Hi Jia,
>>
>> How exactly do you want to use the Kafka timestamps? Do you want to
>> access them and alter them with new values as the record timestamp? Or do
>> you want to use them for some application logic in your functions?
>>
>> If its the former, you should be able to do that by using timestamp /
>> watermark extractors. They come with an interface that exposes the current
>> timestamp of the record. For Kafka 0.10, that timestamp would be the Kafka
>> record’s timestamp if it hasn’t been explicitly assigned any other
>> timestamp yet.
>>
>> If its the latter, then I think currently you have to use custom
>> operators as Robert mentioned.
>> Custom operators are classes that extend the `AbstractStreamOperator`
>> base class as one of `OneInputStreamOperator` or `TwoInputStreamOperator`
>> interfaces.
>> You can take a look at the basic `StreamMap` or `StreamFlatMap` classes
>> for an example, which are the underlying operators for the map and flatMap
>> functions.
>>
>> At the operator level, you’ll have access to a `StreamRecord` in the
>> `processElement` function which wraps the record value (which you get when
>> implementing functions) as well as the internal timestamp that comes with
>> the record.
>>
>> Cheers,
>> Gordon
>>
>>
>> On 17 May 2017 at 4:27:36 AM, Jia Teoh (jiat...@gmail.com) wrote:
>>
>> Hi Robert,
>>
>> Thanks for the reply. I ended up implementing an extension of the Kafka
>> fetcher and consumer so that the deserialization API can include the
>> timestamp field, which is sufficient for my specific use case. I can share
>> the code if desired but it seems like it's an intentional design decision
>> not to expose the timestamp in the deserialization API.
>>
>> I noticed you mentioned that I could use a custom operator to access the
>> record event time. Could you elaborate on what you mean by "operator"? I
>> initially thought that referred to DataStream.map/reduce/etc, but none of
>> those functions provide arguments that can be used to extract the embedded
>> timestamp.
>>
>> Thanks,
>> Jia Teoh
>>
>> On Fri, May 12, 2017 at 9:25 AM, Robert Metzger <rmetz...@apache.org>
>> wrote:
>>
>>> Hi Jia,
>>>
>>> The Kafka 0.10 connector internally relies on the Kafka09 fetcher, but
>>> it is extensible / pluggable so that also the Kafka 0.9 Fetcher can read
>>> the event timestamps from Kafka 10.
>>> We don't expose the timestamp through the deserilaization API, because
>>> we set it internally in Flink. (there is a "hidden" field with each record
>>> containing the event time of the event)
>>>
>>> With a custom operator you can access the event time of a record.
>>>
>>> On Fri, May 12, 2017 at 3:26 AM, Jia Teoh <jiat...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Is there a way to retrieve the timestamps that Kafka associates with
>>>> each key-value pair within Flink? I would like to be able to use these as
>>>> values within my application flow, and defining them before or after Kafka
>>>> is not acceptable for the use case due to the latency involved in sending
>>>> or receiving from Kafka.
>>>>
>>>> It seems that Flink supports Kafka event time (link
>>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010>)
>>>> but after a brief trace it seems that KafkaConsumer010 still relies on the
>>>> Kafka09Fetcher
>>>> <https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09Fetcher.java#L137>
>>>>  for
>>>> iterating through each Kafka record and deserializing it. The
>>>> KeyedDeserializationSchema api does not seem to have support for including
>>>> timestamp as additional metadata (just offset, topic, and partition) so
>>>> something such as JSONKeyValueDeserializationSchema will not return
>>>> the Kafka-specified timestamp.
>>>>
>>>> For reference, I am using Kafka 0.10.2 and the Flink-Streaming API +
>>>> Kafka Connector (1.2.1).
>>>>
>>>> Thanks,
>>>> Jia Teoh
>>>>
>>>
>>>
>>
>

Reply via email to