Hi, I'm afraid there is no option for Kafka 0.8 right now. The API changed quite a bit between 0.8 and 0.9 and the old API is somewhat cumbersome to program against. If there is a strong need for that someone could maybe whip up something based on the 0.9 KafkaIO.
Regarding UnboundedFlinkSource: I would strongly suggest not to use this since it is not well integrated with Beam and you cannot to proper event-time windowing. Each runner has a set of custom sources that only work with that specific runner because the selection of Beam-native sources was a bit sparse in the beginning. Now might be a good time to get rid of the special sources (for all runners). They make it impossible to run a Pipeline on any runner, which is one of the main ideas behind Beam, IMHO. Cheers, Aljoscha On Fri, 8 Jul 2016 at 01:56 David Desberg <[email protected]> wrote: > I see. Are there any options for Kafka 0.8? Thanks for the heads up. > > On Jul 7, 2016, at 4:54 PM, Raghu Angadi <[email protected]> wrote: > > David, > > note that KafkaIO in Beam requires Kafka server version should be >= 0.9 > > On Thu, Jul 7, 2016 at 4:27 PM, David Desberg <[email protected]> > wrote: > >> Dan, >> >> Yeah, it’s setting it to the ingestion time. I will look into KafkaIO, as >> it looks to provide exactly the functionality I want. I was wondering how >> to set the timestamp correctly, at the source. Thank you for your help! >> >> David >> >> On Jul 7, 2016, at 4:25 PM, Dan Halperin <[email protected]> wrote: >> >> Hi David, >> >> In Beam pipelines, the event time is initially set on the source. >> Downstream code can make an event *later* just fine, but, making it >> *earlier* might move it before the current watermark. This would effective >> tur data that we believe is on-time into late data, and would in general be >> very bad! Allowed lateness is a feature that lets you move data earlier by >> a fixed amount, so if you have a tight bound on the time set by the source, >> this can sometimes help. But it's generally discouraged in favor of proper >> timestamps in the first place. >> >> My guess is that UnboundedFlinkSource is using the *processing time*, aka >> current time when the element is received, rather than any event time >> provided by the element. It might be possible using that source to provide >> the element time. >> >> Alternately, I think you should be using KafkaIO and setting the event >> time there using withTimestampFn: >> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L136 >> >> This way the elements will come into the system from Kafka with good >> timestamps, and you don't need a downstream DoFn to transport them back in >> time. >> >> Thanks, >> Dan >> >> On Thu, Jul 7, 2016 at 4:15 PM, amir bahmanyari <[email protected]> >> wrote: >> >>> Hi David, >>> I am doing pretty much the same thing using Beam KafkaIO. >>> For the simple thing I am doing, its working as expected. >>> Can you provide the code how you are invoking/receiving from Kafka pls? >>> Cheers >>> >>> >>> ------------------------------ >>> *From:* David Desberg <[email protected]> >>> *To:* [email protected] >>> *Sent:* Thursday, July 7, 2016 12:54 PM >>> *Subject:* Event time processing with Flink runner and Kafka source >>> >>> Hi all, >>> >>> I’m struggling to get a basic Beam application setup, windowed based >>> upon event time. I’m reading from an UnboundedFlinkSource of a >>> FlinkKafkaConsumer to begin my pipeline. To set up event time processing, I >>> applied a DoFn transformation (via ParDo) that calls >>> ProcessContext.outputWithTimestamp using a timestamp extracted from each >>> Kafka message. However, this results in an exception telling me to >>> override getAllowedTimestampSkew, since evidently the messages are already >>> timestamped and I am moving these timestamps back in time, but only >>> shifting to the future is allowed. getAllowedTimestampSkew, however, is >>> deprecated, and if I do override it and allow skew, the windowing I am >>> applying later in the pipeline fails. I decided to backtrack and look at >>> how the timestamps are even being assigned initially, since the Flink >>> source has no concept of the structure of my messages and thus shouldn’t >>> know how to assign any time at all. I found that it turns out that the >>> pipeline runner marks each incoming message with ingestion time, in a >>> manner that cannot be overridden/is not configurable (see >>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L273 >>> ) >>> >>> Why is this the case? Since part of the point of Beam is to allow >>> event-time processing, I’m sure I’m missing something here. How can I >>> correctly ingest message from Kafka and stamp them with event time, rather >>> than ingestion time? >>> >>> >>> >> >> > >
