I think Kafka 0.8 usage will continue for some time. The way I am thinking
of adding support for 0.8 in Beam to wrapper around Flink's 0.8 consumer[1]
to implement a subset of 0.9 Consumer interface [2]. Beam KafkaIO allows
users to plug-in their own Consumer implementation.

If anyone want to implement this, I am more than happy to guide and review
the code. I am not sure when I can get to this myself.

[1]
https://github.com/apache/flink/blob/master/flink-streaming-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java
[2]
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java#L31

On Fri, Jul 8, 2016 at 12:10 AM, Aljoscha Krettek <[email protected]>
wrote:

> Hi,
> I'm afraid there is no option for Kafka 0.8 right now. The API changed
> quite a bit between 0.8 and 0.9 and the old API is somewhat cumbersome to
> program against. If there is a strong need for that someone could maybe
> whip up something based on the 0.9 KafkaIO.
>
> Regarding UnboundedFlinkSource: I would strongly suggest not to use this
> since it is not well integrated with Beam and you cannot to proper
> event-time windowing. Each runner has a set of custom sources that only
> work with that specific runner because the selection of Beam-native sources
> was a bit sparse in the beginning. Now might be a good time to get rid of
> the special sources (for all runners). They make it impossible to run a
> Pipeline on any runner, which is one of the main ideas behind Beam, IMHO.
>
> Cheers,
> Aljoscha
>
> On Fri, 8 Jul 2016 at 01:56 David Desberg <[email protected]> wrote:
>
>> I see. Are there any options for Kafka 0.8? Thanks for the heads up.
>>
>> On Jul 7, 2016, at 4:54 PM, Raghu Angadi <[email protected]> wrote:
>>
>> David,
>>
>> note that KafkaIO in Beam requires Kafka server version should be >= 0.9
>>
>> On Thu, Jul 7, 2016 at 4:27 PM, David Desberg <[email protected]>
>> wrote:
>>
>>> Dan,
>>>
>>> Yeah, it’s setting it to the ingestion time. I will look into KafkaIO,
>>> as it looks to provide exactly the functionality I want. I was wondering
>>> how to set the timestamp correctly, at the source. Thank you for your help!
>>>
>>> David
>>>
>>> On Jul 7, 2016, at 4:25 PM, Dan Halperin <[email protected]> wrote:
>>>
>>> Hi David,
>>>
>>> In Beam pipelines, the event time is initially set on the source.
>>> Downstream code can make an event *later* just fine, but, making it
>>> *earlier* might move it before the current watermark. This would effective
>>> tur data that we believe is on-time into late data, and would in general be
>>> very bad! Allowed lateness is a feature that lets you move data earlier by
>>> a fixed amount, so if you have a tight bound on the time set by the source,
>>> this can sometimes help. But it's generally discouraged in favor of proper
>>> timestamps in the first place.
>>>
>>> My guess is that UnboundedFlinkSource is using the *processing time*,
>>> aka current time when the element is received, rather than any event time
>>> provided by the element. It might be possible using that source to provide
>>> the element time.
>>>
>>> Alternately, I think you should be using KafkaIO and setting the event
>>> time there using withTimestampFn:
>>> https://github.com/apache/incubator-beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L136
>>>
>>> This way the elements will come into the system from Kafka with good
>>> timestamps, and you don't need a downstream DoFn to transport them back in
>>> time.
>>>
>>> Thanks,
>>> Dan
>>>
>>> On Thu, Jul 7, 2016 at 4:15 PM, amir bahmanyari <[email protected]>
>>> wrote:
>>>
>>>> Hi David,
>>>> I am doing pretty much the same thing  using Beam KafkaIO.
>>>> For the simple thing I am doing, its working as expected.
>>>> Can you provide the code how you are invoking/receiving from Kafka pls?
>>>> Cheers
>>>>
>>>>
>>>> ------------------------------
>>>> *From:* David Desberg <[email protected]>
>>>> *To:* [email protected]
>>>> *Sent:* Thursday, July 7, 2016 12:54 PM
>>>> *Subject:* Event time processing with Flink runner and Kafka source
>>>>
>>>> Hi all,
>>>>
>>>> I’m struggling to get a basic Beam application setup, windowed based
>>>> upon event time. I’m reading from an UnboundedFlinkSource of a
>>>> FlinkKafkaConsumer to begin my pipeline. To set up event time processing, I
>>>> applied a DoFn transformation (via ParDo) that calls
>>>> ProcessContext.outputWithTimestamp using a timestamp extracted from each
>>>> Kafka message. However, this results in an exception telling me to
>>>> override getAllowedTimestampSkew, since evidently the messages are already
>>>> timestamped and I am moving these timestamps back in time, but only
>>>> shifting to the future is allowed. getAllowedTimestampSkew, however, is
>>>> deprecated, and if I do override it and allow skew, the windowing I am
>>>> applying later in the pipeline fails. I decided to backtrack and look at
>>>> how the timestamps are even being assigned initially, since the Flink
>>>> source has no concept of the structure of my messages and thus shouldn’t
>>>> know how to assign any time at all. I found that it turns out that the
>>>> pipeline runner marks each incoming message with ingestion time, in a
>>>> manner that cannot be overridden/is not configurable (see
>>>> https://github.com/apache/incubator-beam/blob/master/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java#L273
>>>> )
>>>>
>>>> Why is this the case? Since part of the point of Beam is to allow
>>>> event-time processing, I’m sure I’m missing something here. How can I
>>>> correctly ingest message from Kafka and stamp them with event time, rather
>>>> than ingestion time?
>>>>
>>>>
>>>>
>>>
>>>
>>
>>

Reply via email to