Hello Bill,

Just to add to what Max said previously about emitting watermarks based on 
ingestion time at the Kafka source,
soon there will be support for event-time watermarks in the Kafka source in 
Flink, and we believe that we can port
this also to Beam. This could help find a solution for your usecase.

For a more detailed description of what we are planning to integrate in the 
Flink Kafka source, you can have a look
here https://issues.apache.org/jira/browse/FLINK-3375?filter=-1 
<https://issues.apache.org/jira/browse/FLINK-3375?filter=-1>

Cheers,
Kostas

> On Mar 30, 2016, at 8:13 PM, Bill McCarthy <[email protected]> wrote:
> 
> Thanks Max,
> 
> You explanation of the difference between streaming and batch boiling down to 
> the execution mode on the Flink runner is a good one, and has helped me to 
> understand the ramifications better. Thanks for helping me to clear that up 
> in my mind. 
> 
> I also tend to agree with yourself and others on this thread that that is 
> something that would ideally be deduced by the runner implementation, as a 
> function of features of the DAG. On the other hand, setting this manually is 
> certainly not a big deal for me. 
> 
> See my other email to the group, in response to Lukasz's question, for more 
> details on my motivation for doing what I've described. 
> 
> Regarding your final question, I'm unclear on whether I need to have
> 1. a custom Kafka source which approximates event time watermarking or 
> 2. allowed lateness in my windowing.
> I would have thought that I could achieve what I want with the latter, but 
> your point has raised doubts about that. 
> 
> Thanks as always for your helpful response,
> 
> Bill
> 
>> On Mar 30, 2016, at 12:21 PM, Maximilian Michels <[email protected]> wrote:
>> 
>> Hi Bill,
>> 
>> By "streaming mode" I meant Beam's built-in flag in PipelineOptions to
>> indicate batch or stream execution mode. Depending on this flag, the
>> Flink Runner translates to either Flink's DataSet (batch) or
>> DataStream (stream) API. The DataStream API can also read from bounded
>> collections like file sources.
>> 
>> Your mental model is by no means way off. Reading from bounded sources
>> in a streaming program can make sense. However, the typical uses cases
>> we have seen for streaming in the past, is to read from an unbounded
>> source (e.g. Kafka) and process all incoming records using event time
>> / windows / triggering as they arrive. This is a great advantage over
>> the so called Lambda architecture where you typically have an
>> approximate real-time streaming layer and a batch layer which does the
>> heavy lifting.
>> 
>> The typical use case I see for reading from a bounded collection in
>> streaming, is when you have static data (like a ML model or a stop
>> word list). These would typically be kept as state. On the other hand,
>> your use case would also work if you assigned timestamps and
>> watermarks correctly for the HDFS source. You can then union the
>> streams and process the resulting stream correctly according to time.
>> That is the beauty of event time.
>> 
>> By default, the Flink Runner assigns ingestion time for Kafka. That
>> means that timestamps/watermarks are assigned according to the current
>> system time. This is probably not what you want when you want to
>> combine these two streams. You can change timestamps between operators
>> but you can only assign watermarks at the source. Thus, to assign
>> watermarks from Kafka or a file source, you would have to implement a
>> custom beam source.
>> 
>> Cheers,
>> Max
>> 
>> 
>> On Wed, Mar 30, 2016 at 4:37 PM, Bill McCarthy
>> <[email protected]> wrote:
>>> Thanks Max,
>>> 
>>> I'm unsure what you mean by the "batch part" and "streaming execution"...
>>> Are you saying that I have to run my entire pipeline in either batch or
>>> streaming modes?
>>> 
>>> Perhaps a brief description of what I'm trying to do would help:
>>> 
>>> 1. I've got some historical intervalized timeseries data
>>> 2. I've also got some live intervalized timeseries data
>>> 3. I want to process both those flows of data in a uniform fashion:
>>> calculating windowed statistics
>>> 4. My thought was that I'd have the historical data stored in some easy data
>>> store for Beam (e.g. HDFS)
>>> 5. I'd put live data on Kafka
>>> 6. Then load up 2 PCollections, one from HDFS and one from Kafka
>>> 7. Then perform a Flatten transform to get them in one PCollection
>>> 8. Then run my windowing over the flattened PCollection
>>> 9. ...
>>> 
>>> I like this approach, because my transforms don't have to care about whether
>>> data is live or historical and I can have one processing pipeline for both.
>>> 
>>> Am I barking up the wrong tree? Is my mental model way off, with respect to
>>> how to combine historical and live data?
>>> 
>>> Thanks
>>> 
>>> Bill
>>> 
>>> On Mar 30, 2016, at 5:59 AM, Maximilian Michels <[email protected]> wrote:
>>> 
>>> Hi Bill,
>>> 
>>> The batch part of the Flink Runners supports reading from a finite
>>> collection, but I'm assuming you're working with the streaming execution of
>>> the Flink runner. We haven't implemented support in the Runner yet but Flink
>>> natively supports reading from finite sources. So it looks fairly easy to
>>> implement. I've filed a JIRA issue and would like to look into this later
>>> today.
>>> 
>>> I've recently added a test case (UnboundedSourceITCase) which throws a
>>> custom exception when the end of the output has been reached. Admittedly,
>>> that is not a very nice approach but it works. All the more, we should
>>> support finite sources in streaming mode.
>>> 
>>> Best,
>>> Max
>>> 
>>> On Wed, Mar 30, 2016 at 2:43 AM, William McCarthy
>>> <[email protected]> wrote:
>>>> 
>>>> Hi,
>>>> 
>>>> I want to get access to a bounded PCollection in Beam on the Flink runner.
>>>> Ideally, I’d pull from HDFS. Is that supported? If not, what would be the
>>>> best way for me to get something bounded, for testing purposes?
>>>> 
>>>> Thanks,
>>>> 
>>>> Bill
>>> 
>>> 

Reply via email to