Thanks Max,

You explanation of the difference between streaming and batch boiling down to 
the execution mode on the Flink runner is a good one, and has helped me to 
understand the ramifications better. Thanks for helping me to clear that up in 
my mind. 

I also tend to agree with yourself and others on this thread that that is 
something that would ideally be deduced by the runner implementation, as a 
function of features of the DAG. On the other hand, setting this manually is 
certainly not a big deal for me. 

See my other email to the group, in response to Lukasz's question, for more 
details on my motivation for doing what I've described. 

Regarding your final question, I'm unclear on whether I need to have
1. a custom Kafka source which approximates event time watermarking or 
2. allowed lateness in my windowing.
I would have thought that I could achieve what I want with the latter, but your 
point has raised doubts about that. 

Thanks as always for your helpful response,

Bill

> On Mar 30, 2016, at 12:21 PM, Maximilian Michels <[email protected]> wrote:
> 
> Hi Bill,
> 
> By "streaming mode" I meant Beam's built-in flag in PipelineOptions to
> indicate batch or stream execution mode. Depending on this flag, the
> Flink Runner translates to either Flink's DataSet (batch) or
> DataStream (stream) API. The DataStream API can also read from bounded
> collections like file sources.
> 
> Your mental model is by no means way off. Reading from bounded sources
> in a streaming program can make sense. However, the typical uses cases
> we have seen for streaming in the past, is to read from an unbounded
> source (e.g. Kafka) and process all incoming records using event time
> / windows / triggering as they arrive. This is a great advantage over
> the so called Lambda architecture where you typically have an
> approximate real-time streaming layer and a batch layer which does the
> heavy lifting.
> 
> The typical use case I see for reading from a bounded collection in
> streaming, is when you have static data (like a ML model or a stop
> word list). These would typically be kept as state. On the other hand,
> your use case would also work if you assigned timestamps and
> watermarks correctly for the HDFS source. You can then union the
> streams and process the resulting stream correctly according to time.
> That is the beauty of event time.
> 
> By default, the Flink Runner assigns ingestion time for Kafka. That
> means that timestamps/watermarks are assigned according to the current
> system time. This is probably not what you want when you want to
> combine these two streams. You can change timestamps between operators
> but you can only assign watermarks at the source. Thus, to assign
> watermarks from Kafka or a file source, you would have to implement a
> custom beam source.
> 
> Cheers,
> Max
> 
> 
> On Wed, Mar 30, 2016 at 4:37 PM, Bill McCarthy
> <[email protected]> wrote:
>> Thanks Max,
>> 
>> I'm unsure what you mean by the "batch part" and "streaming execution"...
>> Are you saying that I have to run my entire pipeline in either batch or
>> streaming modes?
>> 
>> Perhaps a brief description of what I'm trying to do would help:
>> 
>> 1. I've got some historical intervalized timeseries data
>> 2. I've also got some live intervalized timeseries data
>> 3. I want to process both those flows of data in a uniform fashion:
>> calculating windowed statistics
>> 4. My thought was that I'd have the historical data stored in some easy data
>> store for Beam (e.g. HDFS)
>> 5. I'd put live data on Kafka
>> 6. Then load up 2 PCollections, one from HDFS and one from Kafka
>> 7. Then perform a Flatten transform to get them in one PCollection
>> 8. Then run my windowing over the flattened PCollection
>> 9. ...
>> 
>> I like this approach, because my transforms don't have to care about whether
>> data is live or historical and I can have one processing pipeline for both.
>> 
>> Am I barking up the wrong tree? Is my mental model way off, with respect to
>> how to combine historical and live data?
>> 
>> Thanks
>> 
>> Bill
>> 
>> On Mar 30, 2016, at 5:59 AM, Maximilian Michels <[email protected]> wrote:
>> 
>> Hi Bill,
>> 
>> The batch part of the Flink Runners supports reading from a finite
>> collection, but I'm assuming you're working with the streaming execution of
>> the Flink runner. We haven't implemented support in the Runner yet but Flink
>> natively supports reading from finite sources. So it looks fairly easy to
>> implement. I've filed a JIRA issue and would like to look into this later
>> today.
>> 
>> I've recently added a test case (UnboundedSourceITCase) which throws a
>> custom exception when the end of the output has been reached. Admittedly,
>> that is not a very nice approach but it works. All the more, we should
>> support finite sources in streaming mode.
>> 
>> Best,
>> Max
>> 
>> On Wed, Mar 30, 2016 at 2:43 AM, William McCarthy
>> <[email protected]> wrote:
>>> 
>>> Hi,
>>> 
>>> I want to get access to a bounded PCollection in Beam on the Flink runner.
>>> Ideally, I’d pull from HDFS. Is that supported? If not, what would be the
>>> best way for me to get something bounded, for testing purposes?
>>> 
>>> Thanks,
>>> 
>>> Bill
>> 
>> 

Reply via email to