Though without the spelling mistake :-) On Wed, Mar 30, 2016 at 10:46 AM, Mark Shields <[email protected]> wrote:
> Makes sense. I wonder if: > s/Bounded/Seekable/g > s/Unbounded/Streamed/g > ? > > On Wed, Mar 30, 2016 at 10:20 AM, Maximilian Michels <[email protected]> > wrote: > >> That is actually what Flink does. It sends out a +inf watermark once >> the source completes. This may be intuitive for the developer but >> users will need a little more API sugar to conveniently use bounded >> sources in streaming programs. >> >> On Wed, Mar 30, 2016 at 7:15 PM, Mark Shields <[email protected]> >> wrote: >> > Though BoundedSource must indeed be bounded (it supports splitting which >> > requires the overall size to be known), there's nothing preventing an >> > UnboundedSource from being bounded in the sense it may advance the >> watermark >> > to +inf when it has reached its 'end'. Google's streaming runner >> doesn't do >> > anything with that at the moment, but I could imagine that a >> 'streaming' or >> > 'incremental batch' runner could shut itself down once all sink >> watermarks >> > have gone to +inf. >> > >> > On Wed, Mar 30, 2016 at 9:55 AM, Maximilian Michels <[email protected]> >> wrote: >> >> >> >> I agree that ultimately user's shouldn't have to decide which mode >> >> they're executing. Batch or streaming execution should be a matter of >> >> a decision behind the scenes. We're not quite there yet. I would argue >> >> not even on the paper: >> >> >> >> >And as Luke said, conceptually the model supports graphs that mix >> bounded >> >> > and unbounded PCollections, and I >can imagine runners that could >> implement >> >> > that. >> >> >> >> The Flink Runner can definitely do that. However, the model is not >> >> prepared for this. For instance, how do you generate watermarks for >> >> Bounded sources? As far as I see you'll have to wrap them into an >> >> unbounded source to support watermark generation or let the backend >> >> generate watermarks automatically based on the timestamps seen. Please >> >> correct me if I'm missing something here. >> >> >> >> On Wed, Mar 30, 2016 at 6:39 PM, Frances Perry <[email protected]> wrote: >> >> > >> >> > >> >> > On Wed, Mar 30, 2016 at 9:21 AM, Maximilian Michels <[email protected]> >> >> > wrote: >> >> >> >> >> >> Hi Bill, >> >> >> >> >> >> By "streaming mode" I meant Beam's built-in flag in PipelineOptions >> to >> >> >> indicate batch or stream execution mode. >> >> > >> >> > >> >> > The Beam model concept is bounded/unbounded data. I think of that >> >> > streaming >> >> > flag as actually part of the Cloud Dataflow runner -- currently the >> >> > DataflowPipelineRunner is basically two runners (one batch, one >> >> > streaming) >> >> > packaged as one and configured via that flag. >> >> > >> >> > Ideally, I'd remove that flag entirely and have the >> >> > DataflowPipelineRunner >> >> > (and others) choose how to execute based on properties of the graph. >> >> > Currently that mapping is all unbounded collections -> streaming, all >> >> > bounded collections -> batch. But that doesn't have to be the case. >> An >> >> > incremental 'batch' runner could totally handle unbounded >> PCollections. >> >> > And >> >> > as Luke said, conceptually the model supports graphs that mix bounded >> >> > and >> >> > unbounded PCollections, and I can imagine runners that could >> implement >> >> > that. >> >> > >> >> > >> >> >> >> >> >> Depending on this flag, the >> >> >> Flink Runner translates to either Flink's DataSet (batch) or >> >> >> DataStream (stream) API. The DataStream API can also read from >> bounded >> >> >> collections like file sources. >> >> >> >> >> >> Your mental model is by no means way off. Reading from bounded >> sources >> >> >> in a streaming program can make sense. However, the typical uses >> cases >> >> >> we have seen for streaming in the past, is to read from an unbounded >> >> >> source (e.g. Kafka) and process all incoming records using event >> time >> >> >> / windows / triggering as they arrive. This is a great advantage >> over >> >> >> the so called Lambda architecture where you typically have an >> >> >> approximate real-time streaming layer and a batch layer which does >> the >> >> >> heavy lifting. >> >> >> >> >> >> The typical use case I see for reading from a bounded collection in >> >> >> streaming, is when you have static data (like a ML model or a stop >> >> >> word list). These would typically be kept as state. On the other >> hand, >> >> >> your use case would also work if you assigned timestamps and >> >> >> watermarks correctly for the HDFS source. You can then union the >> >> >> streams and process the resulting stream correctly according to >> time. >> >> >> That is the beauty of event time. >> >> >> >> >> >> By default, the Flink Runner assigns ingestion time for Kafka. That >> >> >> means that timestamps/watermarks are assigned according to the >> current >> >> >> system time. This is probably not what you want when you want to >> >> >> combine these two streams. You can change timestamps between >> operators >> >> >> but you can only assign watermarks at the source. Thus, to assign >> >> >> watermarks from Kafka or a file source, you would have to implement >> a >> >> >> custom beam source. >> >> >> >> >> >> Cheers, >> >> >> Max >> >> >> >> >> >> >> >> >> On Wed, Mar 30, 2016 at 4:37 PM, Bill McCarthy >> >> >> <[email protected]> wrote: >> >> >> > Thanks Max, >> >> >> > >> >> >> > I'm unsure what you mean by the "batch part" and "streaming >> >> >> > execution"... >> >> >> > Are you saying that I have to run my entire pipeline in either >> batch >> >> >> > or >> >> >> > streaming modes? >> >> >> > >> >> >> > Perhaps a brief description of what I'm trying to do would help: >> >> >> > >> >> >> > 1. I've got some historical intervalized timeseries data >> >> >> > 2. I've also got some live intervalized timeseries data >> >> >> > 3. I want to process both those flows of data in a uniform >> fashion: >> >> >> > calculating windowed statistics >> >> >> > 4. My thought was that I'd have the historical data stored in some >> >> >> > easy >> >> >> > data >> >> >> > store for Beam (e.g. HDFS) >> >> >> > 5. I'd put live data on Kafka >> >> >> > 6. Then load up 2 PCollections, one from HDFS and one from Kafka >> >> >> > 7. Then perform a Flatten transform to get them in one PCollection >> >> >> > 8. Then run my windowing over the flattened PCollection >> >> >> > 9. ... >> >> >> > >> >> >> > I like this approach, because my transforms don't have to care >> about >> >> >> > whether >> >> >> > data is live or historical and I can have one processing pipeline >> for >> >> >> > both. >> >> >> > >> >> >> > Am I barking up the wrong tree? Is my mental model way off, with >> >> >> > respect >> >> >> > to >> >> >> > how to combine historical and live data? >> >> >> > >> >> >> > Thanks >> >> >> > >> >> >> > Bill >> >> >> > >> >> >> > On Mar 30, 2016, at 5:59 AM, Maximilian Michels <[email protected]> >> >> >> > wrote: >> >> >> > >> >> >> > Hi Bill, >> >> >> > >> >> >> > The batch part of the Flink Runners supports reading from a finite >> >> >> > collection, but I'm assuming you're working with the streaming >> >> >> > execution >> >> >> > of >> >> >> > the Flink runner. We haven't implemented support in the Runner yet >> >> >> > but >> >> >> > Flink >> >> >> > natively supports reading from finite sources. So it looks fairly >> >> >> > easy >> >> >> > to >> >> >> > implement. I've filed a JIRA issue and would like to look into >> this >> >> >> > later >> >> >> > today. >> >> >> > >> >> >> > I've recently added a test case (UnboundedSourceITCase) which >> throws >> >> >> > a >> >> >> > custom exception when the end of the output has been reached. >> >> >> > Admittedly, >> >> >> > that is not a very nice approach but it works. All the more, we >> >> >> > should >> >> >> > support finite sources in streaming mode. >> >> >> > >> >> >> > Best, >> >> >> > Max >> >> >> > >> >> >> > On Wed, Mar 30, 2016 at 2:43 AM, William McCarthy >> >> >> > <[email protected]> wrote: >> >> >> >> >> >> >> >> Hi, >> >> >> >> >> >> >> >> I want to get access to a bounded PCollection in Beam on the >> Flink >> >> >> >> runner. >> >> >> >> Ideally, I’d pull from HDFS. Is that supported? If not, what >> would >> >> >> >> be >> >> >> >> the >> >> >> >> best way for me to get something bounded, for testing purposes? >> >> >> >> >> >> >> >> Thanks, >> >> >> >> >> >> >> >> Bill >> >> >> > >> >> >> > >> >> > >> >> > >> > >> > >> > >
