Dan, Yeah, it’s setting it to the ingestion time. I will look into KafkaIO, as it looks to provide exactly the functionality I want. I was wondering how to set the timestamp correctly, at the source. Thank you for your help!
David > On Jul 7, 2016, at 4:25 PM, Dan Halperin <[email protected]> wrote: > > Hi David, > > In Beam pipelines, the event time is initially set on the source. Downstream > code can make an event *later* just fine, but, making it *earlier* might move > it before the current watermark. This would effective tur data that we > believe is on-time into late data, and would in general be very bad! Allowed > lateness is a feature that lets you move data earlier by a fixed amount, so > if you have a tight bound on the time set by the source, this can sometimes > help. But it's generally discouraged in favor of proper timestamps in the > first place. > > My guess is that UnboundedFlinkSource is using the *processing time*, aka > current time when the element is received, rather than any event time > provided by the element. It might be possible using that source to provide > the element time. > > Alternately, I think you should be using KafkaIO and setting the event time > there using withTimestampFn: > https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L136 > > <https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L136> > > This way the elements will come into the system from Kafka with good > timestamps, and you don't need a downstream DoFn to transport them back in > time. > > Thanks, > Dan > > On Thu, Jul 7, 2016 at 4:15 PM, amir bahmanyari <[email protected] > <mailto:[email protected]>> wrote: > Hi David, > I am doing pretty much the same thing using Beam KafkaIO. > For the simple thing I am doing, its working as expected. > Can you provide the code how you are invoking/receiving from Kafka pls? > Cheers > > > From: David Desberg <[email protected] <mailto:[email protected]>> > To: [email protected] <mailto:[email protected]> > Sent: Thursday, July 7, 2016 12:54 PM > Subject: Event time processing with Flink runner and Kafka source > > Hi all, > > I’m struggling to get a basic Beam application setup, windowed based upon > event time. I’m reading from an UnboundedFlinkSource of a FlinkKafkaConsumer > to begin my pipeline. To set up event time processing, I applied a DoFn > transformation (via ParDo) that calls ProcessContext.outputWithTimestamp > using a timestamp extracted from each Kafka message. However, this results in > an exception telling me to override getAllowedTimestampSkew, since evidently > the messages are already timestamped and I am moving these timestamps back in > time, but only shifting to the future is allowed. getAllowedTimestampSkew, > however, is deprecated, and if I do override it and allow skew, the windowing > I am applying later in the pipeline fails. I decided to backtrack and look at > how the timestamps are even being assigned initially, since the Flink source > has no concept of the structure of my messages and thus shouldn’t know how to > assign any time at all. I found that it turns out that the pipeline runner > marks each incoming message with ingestion time, in a manner that cannot be > overridden/is not configurable (see > https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L273 > > <https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L273>) > > Why is this the case? Since part of the point of Beam is to allow event-time > processing, I’m sure I’m missing something here. How can I correctly ingest > message from Kafka and stamp them with event time, rather than ingestion > time? > > >
