Hi all, I’m struggling to get a basic Beam application setup, windowed based upon event time. I’m reading from an UnboundedFlinkSource of a FlinkKafkaConsumer to begin my pipeline. To set up event time processing, I applied a DoFn transformation (via ParDo) that calls ProcessContext.outputWithTimestamp using a timestamp extracted from each Kafka message. However, this results in an exception telling me to override getAllowedTimestampSkew, since evidently the messages are already timestamped and I am moving these timestamps back in time, but only shifting to the future is allowed. getAllowedTimestampSkew, however, is deprecated, and if I do override it and allow skew, the windowing I am applying later in the pipeline fails. I decided to backtrack and look at how the timestamps are even being assigned initially, since the Flink source has no concept of the structure of my messages and thus shouldn’t know how to assign any time at all. I found that it turns out that the pipeline runner marks each incoming message with ingestion time, in a manner that cannot be overridden/is not configurable (see https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L273 <https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L273>)
Why is this the case? Since part of the point of Beam is to allow event-time processing, I’m sure I’m missing something here. How can I correctly ingest message from Kafka and stamp them with event time, rather than ingestion time?
