KafkaIO .withTimestampFn event time should do it...I dont have that in my code,
but can try it this eve.Cheers
From: David Desberg <[email protected]>
To: [email protected]
Sent: Thursday, July 7, 2016 4:27 PM
Subject: Re: Event time processing with Flink runner and Kafka source
Dan,
Yeah, it’s setting it to the ingestion time. I will look into KafkaIO, as it
looks to provide exactly the functionality I want. I was wondering how to set
the timestamp correctly, at the source. Thank you for your help!
David
On Jul 7, 2016, at 4:25 PM, Dan Halperin <[email protected]> wrote:
Hi David,
In Beam pipelines, the event time is initially set on the source. Downstream
code can make an event *later* just fine, but, making it *earlier* might move
it before the current watermark. This would effective tur data that we believe
is on-time into late data, and would in general be very bad! Allowed lateness
is a feature that lets you move data earlier by a fixed amount, so if you have
a tight bound on the time set by the source, this can sometimes help. But it's
generally discouraged in favor of proper timestamps in the first place.
My guess is that UnboundedFlinkSource is using the *processing time*, aka
current time when the element is received, rather than any event time provided
by the element. It might be possible using that source to provide the element
time.
Alternately, I think you should be using KafkaIO and setting the event time
there using withTimestampFn:
https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L136
This way the elements will come into the system from Kafka with good
timestamps, and you don't need a downstream DoFn to transport them back in time.
Thanks,Dan
On Thu, Jul 7, 2016 at 4:15 PM, amir bahmanyari <[email protected]> wrote:
Hi David,I am doing pretty much the same thing using Beam KafkaIO.For the
simple thing I am doing, its working as expected.Can you provide the code how
you are invoking/receiving from Kafka pls?
Cheers
From: David Desberg <[email protected]>
To: [email protected]
Sent: Thursday, July 7, 2016 12:54 PM
Subject: Event time processing with Flink runner and Kafka source
Hi all,
I’m struggling to get a basic Beam application setup, windowed based upon event
time. I’m reading from an UnboundedFlinkSource of a FlinkKafkaConsumer to begin
my pipeline. To set up event time processing, I applied a DoFn transformation
(via ParDo) that calls ProcessContext.outputWithTimestamp using a timestamp
extracted from each Kafka message. However, this results in an exception
telling me to override getAllowedTimestampSkew, since evidently the messages
are already timestamped and I am moving these timestamps back in time, but only
shifting to the future is allowed. getAllowedTimestampSkew, however, is
deprecated, and if I do override it and allow skew, the windowing I am applying
later in the pipeline fails. I decided to backtrack and look at how the
timestamps are even being assigned initially, since the Flink source has no
concept of the structure of my messages and thus shouldn’t know how to assign
any time at all. I found that it turns out that the pipeline runner marks each
incoming message with ingestion time, in a manner that cannot be overridden/is
not configurable (see
https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L273)
Why is this the case? Since part of the point of Beam is to allow event-time
processing, I’m sure I’m missing something here. How can I correctly ingest
message from Kafka and stamp them with event time, rather than ingestion time?