Hi Amir,
Are you overriding getAllowedTimestampSkew? The below (setup) code causes an
exception, as a timestamp (ingestion time, which is not what I want) is
assigned just upon reading from Kafka, via the code in the Flink runner that I
linked to. If I do override getAllowedTimestampSkew then my windowing seems to
completely fail, not correctly gathering events into event-timed based windows.
HeatpipeDeserializationSchema<POJOTYPE> deserSchema = new
CustomDeserializationSchema<>(POJOTYPE.class, canon,
deserPropsForOptions(options), topic);
FlinkKafkaConsumer08<POJOTYPE> kafkaConsumer = new
FlinkKafkaConsumer08<>(topic, deserSchema, getKafkaProps(options));
p.apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
.apply(ParDo.of(new DoFn<MyType, KV<String, POJOTYPE>>() {
public void processElement(ProcessContext c) {
if(c.element() == null) return;
c.outputWithTimestamp(KV.of(c.element().getKeyVal(),
c.element()), new Instant(c.element().getTimeOfEvent()));
}
}))
Thanks much for the reply,
David
> On Jul 7, 2016, at 4:15 PM, amir bahmanyari <[email protected]> wrote:
>
> Hi David,
> I am doing pretty much the same thing using Beam KafkaIO.
> For the simple thing I am doing, its working as expected.
> Can you provide the code how you are invoking/receiving from Kafka pls?
> Cheers
>
>
> From: David Desberg <[email protected]>
> To: [email protected]
> Sent: Thursday, July 7, 2016 12:54 PM
> Subject: Event time processing with Flink runner and Kafka source
>
> Hi all,
>
> I’m struggling to get a basic Beam application setup, windowed based upon
> event time. I’m reading from an UnboundedFlinkSource of a FlinkKafkaConsumer
> to begin my pipeline. To set up event time processing, I applied a DoFn
> transformation (via ParDo) that calls ProcessContext.outputWithTimestamp
> using a timestamp extracted from each Kafka message. However, this results in
> an exception telling me to override getAllowedTimestampSkew, since evidently
> the messages are already timestamped and I am moving these timestamps back in
> time, but only shifting to the future is allowed. getAllowedTimestampSkew,
> however, is deprecated, and if I do override it and allow skew, the windowing
> I am applying later in the pipeline fails. I decided to backtrack and look at
> how the timestamps are even being assigned initially, since the Flink source
> has no concept of the structure of my messages and thus shouldn’t know how to
> assign any time at all. I found that it turns out that the pipeline runner
> marks each incoming message with ingestion time, in a manner that cannot be
> overridden/is not configurable (see
> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L273
>
> <https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L273>)
>
> Why is this the case? Since part of the point of Beam is to allow event-time
> processing, I’m sure I’m missing something here. How can I correctly ingest
> message from Kafka and stamp them with event time, rather than ingestion
> time?
>
>