That is actually what Flink does. It sends out a +inf watermark once
the source completes. This may be intuitive for the developer but
users will need a little more API sugar to conveniently use bounded
sources in streaming programs.

On Wed, Mar 30, 2016 at 7:15 PM, Mark Shields <[email protected]> wrote:
> Though BoundedSource must indeed be bounded (it supports splitting which
> requires the overall size to be known), there's nothing preventing an
> UnboundedSource from being bounded in the sense it may advance the watermark
> to +inf when it has reached its 'end'. Google's streaming runner doesn't do
> anything with that at the moment, but I could imagine that a 'streaming' or
> 'incremental batch' runner could shut itself down once all sink watermarks
> have gone to +inf.
>
> On Wed, Mar 30, 2016 at 9:55 AM, Maximilian Michels <[email protected]> wrote:
>>
>> I agree that ultimately user's shouldn't have to decide which mode
>> they're executing. Batch or streaming execution should be a matter of
>> a decision behind the scenes. We're not quite there yet. I would argue
>> not even on the paper:
>>
>> >And as Luke said, conceptually the model supports graphs that mix bounded
>> > and unbounded PCollections, and I >can imagine runners that could implement
>> > that.
>>
>> The Flink Runner can definitely do that. However, the model is not
>> prepared for this. For instance, how do you generate watermarks for
>> Bounded sources? As far as I see you'll have to wrap them into an
>> unbounded source to support watermark generation or let the backend
>> generate watermarks automatically based on the timestamps seen. Please
>> correct me if I'm missing something here.
>>
>> On Wed, Mar 30, 2016 at 6:39 PM, Frances Perry <[email protected]> wrote:
>> >
>> >
>> > On Wed, Mar 30, 2016 at 9:21 AM, Maximilian Michels <[email protected]>
>> > wrote:
>> >>
>> >> Hi Bill,
>> >>
>> >> By "streaming mode" I meant Beam's built-in flag in PipelineOptions to
>> >> indicate batch or stream execution mode.
>> >
>> >
>> > The Beam model concept is bounded/unbounded data. I think of that
>> > streaming
>> > flag as actually part of the Cloud Dataflow runner -- currently the
>> > DataflowPipelineRunner is basically two runners (one batch, one
>> > streaming)
>> > packaged as one and configured via that flag.
>> >
>> > Ideally, I'd remove that flag entirely and have the
>> > DataflowPipelineRunner
>> > (and others) choose how to execute based on properties of the graph.
>> > Currently that mapping is all unbounded collections -> streaming, all
>> > bounded collections -> batch. But that doesn't have to be the case. An
>> > incremental 'batch' runner could totally handle unbounded PCollections.
>> > And
>> > as Luke said, conceptually the model supports graphs that mix bounded
>> > and
>> > unbounded PCollections, and I can imagine runners that could implement
>> > that.
>> >
>> >
>> >>
>> >> Depending on this flag, the
>> >> Flink Runner translates to either Flink's DataSet (batch) or
>> >> DataStream (stream) API. The DataStream API can also read from bounded
>> >> collections like file sources.
>> >>
>> >> Your mental model is by no means way off. Reading from bounded sources
>> >> in a streaming program can make sense. However, the typical uses cases
>> >> we have seen for streaming in the past, is to read from an unbounded
>> >> source (e.g. Kafka) and process all incoming records using event time
>> >> / windows / triggering as they arrive. This is a great advantage over
>> >> the so called Lambda architecture where you typically have an
>> >> approximate real-time streaming layer and a batch layer which does the
>> >> heavy lifting.
>> >>
>> >> The typical use case I see for reading from a bounded collection in
>> >> streaming, is when you have static data (like a ML model or a stop
>> >> word list). These would typically be kept as state. On the other hand,
>> >> your use case would also work if you assigned timestamps and
>> >> watermarks correctly for the HDFS source. You can then union the
>> >> streams and process the resulting stream correctly according to time.
>> >> That is the beauty of event time.
>> >>
>> >> By default, the Flink Runner assigns ingestion time for Kafka. That
>> >> means that timestamps/watermarks are assigned according to the current
>> >> system time. This is probably not what you want when you want to
>> >> combine these two streams. You can change timestamps between operators
>> >> but you can only assign watermarks at the source. Thus, to assign
>> >> watermarks from Kafka or a file source, you would have to implement a
>> >> custom beam source.
>> >>
>> >> Cheers,
>> >> Max
>> >>
>> >>
>> >> On Wed, Mar 30, 2016 at 4:37 PM, Bill McCarthy
>> >> <[email protected]> wrote:
>> >> > Thanks Max,
>> >> >
>> >> > I'm unsure what you mean by the "batch part" and "streaming
>> >> > execution"...
>> >> > Are you saying that I have to run my entire pipeline in either batch
>> >> > or
>> >> > streaming modes?
>> >> >
>> >> > Perhaps a brief description of what I'm trying to do would help:
>> >> >
>> >> > 1. I've got some historical intervalized timeseries data
>> >> > 2. I've also got some live intervalized timeseries data
>> >> > 3. I want to process both those flows of data in a uniform fashion:
>> >> > calculating windowed statistics
>> >> > 4. My thought was that I'd have the historical data stored in some
>> >> > easy
>> >> > data
>> >> > store for Beam (e.g. HDFS)
>> >> > 5. I'd put live data on Kafka
>> >> > 6. Then load up 2 PCollections, one from HDFS and one from Kafka
>> >> > 7. Then perform a Flatten transform to get them in one PCollection
>> >> > 8. Then run my windowing over the flattened PCollection
>> >> > 9. ...
>> >> >
>> >> > I like this approach, because my transforms don't have to care about
>> >> > whether
>> >> > data is live or historical and I can have one processing pipeline for
>> >> > both.
>> >> >
>> >> > Am I barking up the wrong tree? Is my mental model way off, with
>> >> > respect
>> >> > to
>> >> > how to combine historical and live data?
>> >> >
>> >> > Thanks
>> >> >
>> >> > Bill
>> >> >
>> >> > On Mar 30, 2016, at 5:59 AM, Maximilian Michels <[email protected]>
>> >> > wrote:
>> >> >
>> >> > Hi Bill,
>> >> >
>> >> > The batch part of the Flink Runners supports reading from a finite
>> >> > collection, but I'm assuming you're working with the streaming
>> >> > execution
>> >> > of
>> >> > the Flink runner. We haven't implemented support in the Runner yet
>> >> > but
>> >> > Flink
>> >> > natively supports reading from finite sources. So it looks fairly
>> >> > easy
>> >> > to
>> >> > implement. I've filed a JIRA issue and would like to look into this
>> >> > later
>> >> > today.
>> >> >
>> >> > I've recently added a test case (UnboundedSourceITCase) which throws
>> >> > a
>> >> > custom exception when the end of the output has been reached.
>> >> > Admittedly,
>> >> > that is not a very nice approach but it works. All the more, we
>> >> > should
>> >> > support finite sources in streaming mode.
>> >> >
>> >> > Best,
>> >> > Max
>> >> >
>> >> > On Wed, Mar 30, 2016 at 2:43 AM, William McCarthy
>> >> > <[email protected]> wrote:
>> >> >>
>> >> >> Hi,
>> >> >>
>> >> >> I want to get access to a bounded PCollection in Beam on the Flink
>> >> >> runner.
>> >> >> Ideally, I’d pull from HDFS. Is that supported? If not, what would
>> >> >> be
>> >> >> the
>> >> >> best way for me to get something bounded, for testing purposes?
>> >> >>
>> >> >> Thanks,
>> >> >>
>> >> >> Bill
>> >> >
>> >> >
>> >
>> >
>
>

Reply via email to