That is actually what Flink does. It sends out a +inf watermark once the source completes. This may be intuitive for the developer but users will need a little more API sugar to conveniently use bounded sources in streaming programs.
On Wed, Mar 30, 2016 at 7:15 PM, Mark Shields <[email protected]> wrote: > Though BoundedSource must indeed be bounded (it supports splitting which > requires the overall size to be known), there's nothing preventing an > UnboundedSource from being bounded in the sense it may advance the watermark > to +inf when it has reached its 'end'. Google's streaming runner doesn't do > anything with that at the moment, but I could imagine that a 'streaming' or > 'incremental batch' runner could shut itself down once all sink watermarks > have gone to +inf. > > On Wed, Mar 30, 2016 at 9:55 AM, Maximilian Michels <[email protected]> wrote: >> >> I agree that ultimately user's shouldn't have to decide which mode >> they're executing. Batch or streaming execution should be a matter of >> a decision behind the scenes. We're not quite there yet. I would argue >> not even on the paper: >> >> >And as Luke said, conceptually the model supports graphs that mix bounded >> > and unbounded PCollections, and I >can imagine runners that could implement >> > that. >> >> The Flink Runner can definitely do that. However, the model is not >> prepared for this. For instance, how do you generate watermarks for >> Bounded sources? As far as I see you'll have to wrap them into an >> unbounded source to support watermark generation or let the backend >> generate watermarks automatically based on the timestamps seen. Please >> correct me if I'm missing something here. >> >> On Wed, Mar 30, 2016 at 6:39 PM, Frances Perry <[email protected]> wrote: >> > >> > >> > On Wed, Mar 30, 2016 at 9:21 AM, Maximilian Michels <[email protected]> >> > wrote: >> >> >> >> Hi Bill, >> >> >> >> By "streaming mode" I meant Beam's built-in flag in PipelineOptions to >> >> indicate batch or stream execution mode. >> > >> > >> > The Beam model concept is bounded/unbounded data. I think of that >> > streaming >> > flag as actually part of the Cloud Dataflow runner -- currently the >> > DataflowPipelineRunner is basically two runners (one batch, one >> > streaming) >> > packaged as one and configured via that flag. >> > >> > Ideally, I'd remove that flag entirely and have the >> > DataflowPipelineRunner >> > (and others) choose how to execute based on properties of the graph. >> > Currently that mapping is all unbounded collections -> streaming, all >> > bounded collections -> batch. But that doesn't have to be the case. An >> > incremental 'batch' runner could totally handle unbounded PCollections. >> > And >> > as Luke said, conceptually the model supports graphs that mix bounded >> > and >> > unbounded PCollections, and I can imagine runners that could implement >> > that. >> > >> > >> >> >> >> Depending on this flag, the >> >> Flink Runner translates to either Flink's DataSet (batch) or >> >> DataStream (stream) API. The DataStream API can also read from bounded >> >> collections like file sources. >> >> >> >> Your mental model is by no means way off. Reading from bounded sources >> >> in a streaming program can make sense. However, the typical uses cases >> >> we have seen for streaming in the past, is to read from an unbounded >> >> source (e.g. Kafka) and process all incoming records using event time >> >> / windows / triggering as they arrive. This is a great advantage over >> >> the so called Lambda architecture where you typically have an >> >> approximate real-time streaming layer and a batch layer which does the >> >> heavy lifting. >> >> >> >> The typical use case I see for reading from a bounded collection in >> >> streaming, is when you have static data (like a ML model or a stop >> >> word list). These would typically be kept as state. On the other hand, >> >> your use case would also work if you assigned timestamps and >> >> watermarks correctly for the HDFS source. You can then union the >> >> streams and process the resulting stream correctly according to time. >> >> That is the beauty of event time. >> >> >> >> By default, the Flink Runner assigns ingestion time for Kafka. That >> >> means that timestamps/watermarks are assigned according to the current >> >> system time. This is probably not what you want when you want to >> >> combine these two streams. You can change timestamps between operators >> >> but you can only assign watermarks at the source. Thus, to assign >> >> watermarks from Kafka or a file source, you would have to implement a >> >> custom beam source. >> >> >> >> Cheers, >> >> Max >> >> >> >> >> >> On Wed, Mar 30, 2016 at 4:37 PM, Bill McCarthy >> >> <[email protected]> wrote: >> >> > Thanks Max, >> >> > >> >> > I'm unsure what you mean by the "batch part" and "streaming >> >> > execution"... >> >> > Are you saying that I have to run my entire pipeline in either batch >> >> > or >> >> > streaming modes? >> >> > >> >> > Perhaps a brief description of what I'm trying to do would help: >> >> > >> >> > 1. I've got some historical intervalized timeseries data >> >> > 2. I've also got some live intervalized timeseries data >> >> > 3. I want to process both those flows of data in a uniform fashion: >> >> > calculating windowed statistics >> >> > 4. My thought was that I'd have the historical data stored in some >> >> > easy >> >> > data >> >> > store for Beam (e.g. HDFS) >> >> > 5. I'd put live data on Kafka >> >> > 6. Then load up 2 PCollections, one from HDFS and one from Kafka >> >> > 7. Then perform a Flatten transform to get them in one PCollection >> >> > 8. Then run my windowing over the flattened PCollection >> >> > 9. ... >> >> > >> >> > I like this approach, because my transforms don't have to care about >> >> > whether >> >> > data is live or historical and I can have one processing pipeline for >> >> > both. >> >> > >> >> > Am I barking up the wrong tree? Is my mental model way off, with >> >> > respect >> >> > to >> >> > how to combine historical and live data? >> >> > >> >> > Thanks >> >> > >> >> > Bill >> >> > >> >> > On Mar 30, 2016, at 5:59 AM, Maximilian Michels <[email protected]> >> >> > wrote: >> >> > >> >> > Hi Bill, >> >> > >> >> > The batch part of the Flink Runners supports reading from a finite >> >> > collection, but I'm assuming you're working with the streaming >> >> > execution >> >> > of >> >> > the Flink runner. We haven't implemented support in the Runner yet >> >> > but >> >> > Flink >> >> > natively supports reading from finite sources. So it looks fairly >> >> > easy >> >> > to >> >> > implement. I've filed a JIRA issue and would like to look into this >> >> > later >> >> > today. >> >> > >> >> > I've recently added a test case (UnboundedSourceITCase) which throws >> >> > a >> >> > custom exception when the end of the output has been reached. >> >> > Admittedly, >> >> > that is not a very nice approach but it works. All the more, we >> >> > should >> >> > support finite sources in streaming mode. >> >> > >> >> > Best, >> >> > Max >> >> > >> >> > On Wed, Mar 30, 2016 at 2:43 AM, William McCarthy >> >> > <[email protected]> wrote: >> >> >> >> >> >> Hi, >> >> >> >> >> >> I want to get access to a bounded PCollection in Beam on the Flink >> >> >> runner. >> >> >> Ideally, I’d pull from HDFS. Is that supported? If not, what would >> >> >> be >> >> >> the >> >> >> best way for me to get something bounded, for testing purposes? >> >> >> >> >> >> Thanks, >> >> >> >> >> >> Bill >> >> > >> >> > >> > >> > > >
