David,

note that KafkaIO in Beam requires Kafka server version should be >= 0.9

On Thu, Jul 7, 2016 at 4:27 PM, David Desberg <[email protected]>
wrote:

> Dan,
>
> Yeah, it’s setting it to the ingestion time. I will look into KafkaIO, as
> it looks to provide exactly the functionality I want. I was wondering how
> to set the timestamp correctly, at the source. Thank you for your help!
>
> David
>
> On Jul 7, 2016, at 4:25 PM, Dan Halperin <[email protected]> wrote:
>
> Hi David,
>
> In Beam pipelines, the event time is initially set on the source.
> Downstream code can make an event *later* just fine, but, making it
> *earlier* might move it before the current watermark. This would effective
> tur data that we believe is on-time into late data, and would in general be
> very bad! Allowed lateness is a feature that lets you move data earlier by
> a fixed amount, so if you have a tight bound on the time set by the source,
> this can sometimes help. But it's generally discouraged in favor of proper
> timestamps in the first place.
>
> My guess is that UnboundedFlinkSource is using the *processing time*, aka
> current time when the element is received, rather than any event time
> provided by the element. It might be possible using that source to provide
> the element time.
>
> Alternately, I think you should be using KafkaIO and setting the event
> time there using withTimestampFn:
> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L136
>
> This way the elements will come into the system from Kafka with good
> timestamps, and you don't need a downstream DoFn to transport them back in
> time.
>
> Thanks,
> Dan
>
> On Thu, Jul 7, 2016 at 4:15 PM, amir bahmanyari <[email protected]>
> wrote:
>
>> Hi David,
>> I am doing pretty much the same thing  using Beam KafkaIO.
>> For the simple thing I am doing, its working as expected.
>> Can you provide the code how you are invoking/receiving from Kafka pls?
>> Cheers
>>
>>
>> ------------------------------
>> *From:* David Desberg <[email protected]>
>> *To:* [email protected]
>> *Sent:* Thursday, July 7, 2016 12:54 PM
>> *Subject:* Event time processing with Flink runner and Kafka source
>>
>> Hi all,
>>
>> I’m struggling to get a basic Beam application setup, windowed based upon
>> event time. I’m reading from an UnboundedFlinkSource of a
>> FlinkKafkaConsumer to begin my pipeline. To set up event time processing, I
>> applied a DoFn transformation (via ParDo) that calls
>> ProcessContext.outputWithTimestamp using a timestamp extracted from each
>> Kafka message. However, this results in an exception telling me to
>> override getAllowedTimestampSkew, since evidently the messages are already
>> timestamped and I am moving these timestamps back in time, but only
>> shifting to the future is allowed. getAllowedTimestampSkew, however, is
>> deprecated, and if I do override it and allow skew, the windowing I am
>> applying later in the pipeline fails. I decided to backtrack and look at
>> how the timestamps are even being assigned initially, since the Flink
>> source has no concept of the structure of my messages and thus shouldn’t
>> know how to assign any time at all. I found that it turns out that the
>> pipeline runner marks each incoming message with ingestion time, in a
>> manner that cannot be overridden/is not configurable (see
>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L273
>> )
>>
>> Why is this the case? Since part of the point of Beam is to allow
>> event-time processing, I’m sure I’m missing something here. How can I
>> correctly ingest message from Kafka and stamp them with event time, rather
>> than ingestion time?
>>
>>
>>
>
>

Reply via email to