Hi David,I am doing pretty much the same thing using Beam KafkaIO.For the
simple thing I am doing, its working as expected.Can you provide the code how
you are invoking/receiving from Kafka pls?
Cheers
From: David Desberg <[email protected]>
To: [email protected]
Sent: Thursday, July 7, 2016 12:54 PM
Subject: Event time processing with Flink runner and Kafka source
Hi all,
I’m struggling to get a basic Beam application setup, windowed based upon event
time. I’m reading from an UnboundedFlinkSource of a FlinkKafkaConsumer to begin
my pipeline. To set up event time processing, I applied a DoFn transformation
(via ParDo) that calls ProcessContext.outputWithTimestamp using a timestamp
extracted from each Kafka message. However, this results in an exception
telling me to override getAllowedTimestampSkew, since evidently the messages
are already timestamped and I am moving these timestamps back in time, but only
shifting to the future is allowed. getAllowedTimestampSkew, however, is
deprecated, and if I do override it and allow skew, the windowing I am applying
later in the pipeline fails. I decided to backtrack and look at how the
timestamps are even being assigned initially, since the Flink source has no
concept of the structure of my messages and thus shouldn’t know how to assign
any time at all. I found that it turns out that the pipeline runner marks each
incoming message with ingestion time, in a manner that cannot be overridden/is
not configurable (see
https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L273)
Why is this the case? Since part of the point of Beam is to allow event-time
processing, I’m sure I’m missing something here. How can I correctly ingest
message from Kafka and stamp them with event time, rather than ingestion time?