Hi David,

In Beam pipelines, the event time is initially set on the source.
Downstream code can make an event *later* just fine, but, making it
*earlier* might move it before the current watermark. This would effective
tur data that we believe is on-time into late data, and would in general be
very bad! Allowed lateness is a feature that lets you move data earlier by
a fixed amount, so if you have a tight bound on the time set by the source,
this can sometimes help. But it's generally discouraged in favor of proper
timestamps in the first place.

My guess is that UnboundedFlinkSource is using the *processing time*, aka
current time when the element is received, rather than any event time
provided by the element. It might be possible using that source to provide
the element time.

Alternately, I think you should be using KafkaIO and setting the event time
there using withTimestampFn:
https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L136

This way the elements will come into the system from Kafka with good
timestamps, and you don't need a downstream DoFn to transport them back in
time.

Thanks,
Dan

On Thu, Jul 7, 2016 at 4:15 PM, amir bahmanyari <[email protected]> wrote:

> Hi David,
> I am doing pretty much the same thing  using Beam KafkaIO.
> For the simple thing I am doing, its working as expected.
> Can you provide the code how you are invoking/receiving from Kafka pls?
> Cheers
>
>
> ------------------------------
> *From:* David Desberg <[email protected]>
> *To:* [email protected]
> *Sent:* Thursday, July 7, 2016 12:54 PM
> *Subject:* Event time processing with Flink runner and Kafka source
>
> Hi all,
>
> I’m struggling to get a basic Beam application setup, windowed based upon
> event time. I’m reading from an UnboundedFlinkSource of a
> FlinkKafkaConsumer to begin my pipeline. To set up event time processing, I
> applied a DoFn transformation (via ParDo) that calls
> ProcessContext.outputWithTimestamp using a timestamp extracted from each
> Kafka message. However, this results in an exception telling me to
> override getAllowedTimestampSkew, since evidently the messages are already
> timestamped and I am moving these timestamps back in time, but only
> shifting to the future is allowed. getAllowedTimestampSkew, however, is
> deprecated, and if I do override it and allow skew, the windowing I am
> applying later in the pipeline fails. I decided to backtrack and look at
> how the timestamps are even being assigned initially, since the Flink
> source has no concept of the structure of my messages and thus shouldn’t
> know how to assign any time at all. I found that it turns out that the
> pipeline runner marks each incoming message with ingestion time, in a
> manner that cannot be overridden/is not configurable (see
> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L273
> )
>
> Why is this the case? Since part of the point of Beam is to allow
> event-time processing, I’m sure I’m missing something here. How can I
> correctly ingest message from Kafka and stamp them with event time, rather
> than ingestion time?
>
>
>

Reply via email to