Yeah that's been the implied source of being able to be continuous, you union with a receiver which produce an infinite number of batches (the "never ending queue stream" but not actually a queuestream since they have some limitations but our own implementation there of).
On Tue, Apr 24, 2018 at 11:54 PM, Reuven Lax <re...@google.com> wrote: > Could we do this behind the scenes by writing a Receiver that publishes > periodic pings? > > On Tue, Apr 24, 2018 at 10:09 PM Eugene Kirpichov <kirpic...@google.com> > wrote: > >> Kenn - I'm arguing that in Spark SDF style computation can not be >> expressed at all, and neither can Beam's timers. >> >> Spark, unlike Flink, does not have a timer facility (only state), and as >> far as I can tell its programming model has no other primitive that can map >> a finite RDD into an infinite DStream - the only way to create a new >> infinite DStream appears to be to write a Receiver. >> >> I cc'd you because I'm wondering whether you've already investigated this >> when considering whether timers can be implemented on the Spark runner. >> >> On Tue, Apr 24, 2018 at 2:53 PM Kenneth Knowles <k...@google.com> wrote: >> >>> I don't think I understand what the limitations of timers are that you >>> are referring to. FWIW I would say implementing other primitives like SDF >>> is an explicit non-goal for Beam state & timers. >>> >>> I got lost at some point in this thread, but is it actually necessary >>> that a bounded PCollection maps to a finite/bounded structure in Spark? >>> Skimming, I'm not sure if the problem is that we can't transliterate Beam >>> to Spark (this might be a good sign) or that we can't express SDF style >>> computation at all (seems far-fetched, but I could be convinced). Does >>> doing a lightweight analysis and just promoting some things to be some kind >>> of infinite representation help? >>> >>> Kenn >>> >>> On Tue, Apr 24, 2018 at 2:37 PM Eugene Kirpichov <kirpic...@google.com> >>> wrote: >>> >>>> Would like to revive this thread one more time. >>>> >>>> At this point I'm pretty certain that Spark can't support this out of >>>> the box and we're gonna have to make changes to Spark. >>>> >>>> Holden, could you advise who would be some Spark experts (yourself >>>> included :) ) who could advise what kind of Spark change would both support >>>> this AND be useful to the regular Spark community (non-Beam) so that it has >>>> a chance of finding support? E.g. is there any plan in Spark regarding >>>> adding timers similar to Flink's or Beam's timers, maybe we could help out >>>> with that? >>>> >>>> +Kenneth Knowles <k...@google.com> because timers suffer from the same >>>> problem. >>>> >>>> On Thu, Apr 12, 2018 at 2:28 PM Eugene Kirpichov <kirpic...@google.com> >>>> wrote: >>>> >>>>> (resurrecting thread as I'm back from leave) >>>>> >>>>> I looked at this mode, and indeed as Reuven points out it seems that >>>>> it affects execution details, but doesn't offer any new APIs. >>>>> Holden - your suggestions of piggybacking an unbounded-per-element SDF >>>>> on top of an infinite stream would work if 1) there was just 1 element and >>>>> 2) the work was guaranteed to be infinite. >>>>> >>>>> Unfortunately, both of these assumptions are insufficient. In >>>>> particular: >>>>> >>>>> - 1: The SDF is applied to a PCollection; the PCollection itself may >>>>> be unbounded; and the unbounded work done by the SDF happens for every >>>>> element. E.g. we might have a Kafka topic on which names of Kafka topics >>>>> arrive, and we may end up concurrently reading a continuously growing >>>>> number of topics. >>>>> - 2: The work per element is not necessarily infinite, it's just *not >>>>> guaranteed to be finite* - the SDF is allowed at any moment to say >>>>> "Okay, this restriction is done for real" by returning stop() from the >>>>> @ProcessElement method. Continuing the Kafka example, e.g., it could do >>>>> that if the topic/partition being watched is deleted. Having an infinite >>>>> stream as a driver of this process would require being able to send a >>>>> signal to the stream to stop itself. >>>>> >>>>> Is it looking like there's any other way this can be done in Spark >>>>> as-is, or are we going to have to make changes to Spark to support this? >>>>> >>>>> On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <hol...@pigscanfly.ca> >>>>> wrote: >>>>> >>>>>> I mean the new mode is very much in the Dataset not the DStream API >>>>>> (although you can use the Dataset API with the old modes too). >>>>>> >>>>>> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> But this new mode isn't a semantic change, right? It's moving away >>>>>>> from micro batches into something that looks a lot like what Flink does >>>>>>> - >>>>>>> continuous processing with asynchronous snapshot boundaries. >>>>>>> >>>>>>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <t...@apache.org> wrote: >>>>>>> >>>>>>>> Hopefully the new "continuous processing mode" in Spark will enable >>>>>>>> SDF implementation (and real streaming)? >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Thomas >>>>>>>> >>>>>>>> >>>>>>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <hol...@pigscanfly.ca >>>>>>>> > wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov < >>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <hol...@pigscanfly.ca> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov < >>>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau < >>>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov < >>>>>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau < >>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov < >>>>>>>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for >>>>>>>>>>>>>>>> Spark runner streaming. Holden, is it correct that Spark >>>>>>>>>>>>>>>> appears to have no >>>>>>>>>>>>>>>> way at all to produce an infinite DStream from a finite RDD? >>>>>>>>>>>>>>>> Maybe we can >>>>>>>>>>>>>>>> somehow dynamically create a new DStream for every initial >>>>>>>>>>>>>>>> restriction, >>>>>>>>>>>>>>>> said DStream being obtained using a Receiver that under the >>>>>>>>>>>>>>>> hood actually >>>>>>>>>>>>>>>> runs the SDF? (this is of course less efficient than a >>>>>>>>>>>>>>>> timer-capable runner >>>>>>>>>>>>>>>> would do, and I have doubts about the fault tolerance) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> So on the streaming side we could simply do it with a fixed >>>>>>>>>>>>>>> number of levels on DStreams. It’s not great but it would work. >>>>>>>>>>>>>>> >>>>>>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF >>>>>>>>>>>>>> demands of the runner. Imagine the following case: a file >>>>>>>>>>>>>> contains a list >>>>>>>>>>>>>> of "master" Kafka topics, on which there are published >>>>>>>>>>>>>> additional Kafka >>>>>>>>>>>>>> topics to read. >>>>>>>>>>>>>> >>>>>>>>>>>>>> PCollection<String> masterTopics = TextIO.read().from( >>>>>>>>>>>>>> masterTopicsFile) >>>>>>>>>>>>>> PCollection<String> nestedTopics = masterTopics.apply(ParDo( >>>>>>>>>>>>>> ReadFromKafkaFn)) >>>>>>>>>>>>>> PCollection<String> records = nestedTopics.apply(ParDo( >>>>>>>>>>>>>> ReadFromKafkaFn)) >>>>>>>>>>>>>> >>>>>>>>>>>>>> This exemplifies both use cases of a streaming SDF that emits >>>>>>>>>>>>>> infinite output for every input: >>>>>>>>>>>>>> - Applying it to a finite set of inputs (in this case to the >>>>>>>>>>>>>> result of reading a text file) >>>>>>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an >>>>>>>>>>>>>> unbounded number of streams being read concurrently, each of the >>>>>>>>>>>>>> streams >>>>>>>>>>>>>> themselves is unbounded too) >>>>>>>>>>>>>> >>>>>>>>>>>>>> Does the multi-level solution you have in mind work for this >>>>>>>>>>>>>> case? I suppose the second case is harder, so we can focus on >>>>>>>>>>>>>> that. >>>>>>>>>>>>>> >>>>>>>>>>>>> So none of those are a splittabledofn right? >>>>>>>>>>>>> >>>>>>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a >>>>>>>>>>>> splittable DoFn and we're trying to figure out how to make Spark >>>>>>>>>>>> run it. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> Ah ok, sorry I saw that and for some reason parsed them as old >>>>>>>>>>> style DoFns in my head. >>>>>>>>>>> >>>>>>>>>>> To effectively allow us to union back into the “same” DStream >>>>>>>>>>> we’d have to end up using Sparks queue streams (or their >>>>>>>>>>> equivalent custom >>>>>>>>>>> source because of some queue stream limitations), which invites some >>>>>>>>>>> reliability challenges. This might be at the point where I should >>>>>>>>>>> send a >>>>>>>>>>> diagram/some sample code since it’s a bit convoluted. >>>>>>>>>>> >>>>>>>>>>> The more I think about the jumps required to make the “simple” >>>>>>>>>>> union approach work, the more it seems just using the statemapping >>>>>>>>>>> for >>>>>>>>>>> steaming is probably more reasonable. Although the state tracking >>>>>>>>>>> in Spark >>>>>>>>>>> can be somewhat expensive so it would probably make sense to >>>>>>>>>>> benchmark to >>>>>>>>>>> see if it meets our needs. >>>>>>>>>>> >>>>>>>>>> So the problem is, I don't think this can be made to work using >>>>>>>>>> mapWithState. It doesn't allow a mapping function that emits infinite >>>>>>>>>> output for an input element, directly or not. >>>>>>>>>> >>>>>>>>> So, provided there is an infinite input (eg pick a never ending >>>>>>>>> queue stream), and each call produces a finite output, we would have >>>>>>>>> an >>>>>>>>> infinite number of calls. >>>>>>>>> >>>>>>>>>> >>>>>>>>>> Dataflow and Flink, for example, had timer support even before >>>>>>>>>> SDFs, and a timer can set another timer and thus end up doing an >>>>>>>>>> infinite >>>>>>>>>> amount of work in a fault tolerant way - so SDF could be implemented >>>>>>>>>> on top >>>>>>>>>> of that. But AFAIK spark doesn't have a similar feature, hence my >>>>>>>>>> concern. >>>>>>>>>> >>>>>>>>> So we can do an inifinite queue stream which would allow us to be >>>>>>>>> triggered at each interval and handle our own persistence. >>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> But these still are both DStream based rather than Dataset which >>>>>>>>>>> we might want to support (depends on what direction folks take with >>>>>>>>>>> the >>>>>>>>>>> runners). >>>>>>>>>>> >>>>>>>>>>> If we wanted to do this in the dataset world looking at a custom >>>>>>>>>>> sink/source would also be an option, (which is effectively what a >>>>>>>>>>> custom >>>>>>>>>>> queue stream like thing for dstreams requires), but the datasource >>>>>>>>>>> APIs are >>>>>>>>>>> a bit influx so if we ended up doing things at the edge of what’s >>>>>>>>>>> allowed >>>>>>>>>>> there’s a good chance we’d have to rewrite it a few times. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>>> Assuming that we have a given dstream though in Spark we can >>>>>>>>>>>>> get the underlying RDD implementation for each microbatch and do >>>>>>>>>>>>> our work >>>>>>>>>>>>> inside of that. >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> More generally this does raise an important question if we >>>>>>>>>>>>>>> want to target datasets instead of rdds/DStreams in which case >>>>>>>>>>>>>>> i would need >>>>>>>>>>>>>>> to do some more poking. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax < >>>>>>>>>>>>>>>> re...@google.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> How would timers be implemented? By outputing and >>>>>>>>>>>>>>>>> reprocessing, the same way you proposed for SDF? >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> i mean the timers could be inside the mappers within the >>>>>>>>>>>>>>> system. Could use a singleton so if a partition is re-executed >>>>>>>>>>>>>>> it doesn’t >>>>>>>>>>>>>>> end up as a straggler. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau < >>>>>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> So the timers would have to be in our own code. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov < >>>>>>>>>>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has >>>>>>>>>>>>>>>>>>> support for state) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax < >>>>>>>>>>>>>>>>>>> re...@google.com> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Could we alternatively use a state mapping function to >>>>>>>>>>>>>>>>>>>> keep track of the computation so far instead of outputting >>>>>>>>>>>>>>>>>>>> V each time? >>>>>>>>>>>>>>>>>>>> (also the progress so far is probably of a different type >>>>>>>>>>>>>>>>>>>> R rather than V). >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau < >>>>>>>>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to add >>>>>>>>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some >>>>>>>>>>>>>>>>>>>>> sketchy thinking >>>>>>>>>>>>>>>>>>>>> about this last year but didn't get very far. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows: >>>>>>>>>>>>>>>>>>>>> For input type T >>>>>>>>>>>>>>>>>>>>> Output type V >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V) >>>>>>>>>>>>>>>>>>>>> and if the computation finishes T will be populated >>>>>>>>>>>>>>>>>>>>> otherwise V will be >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K >>>>>>>>>>>>>>>>>>>>> seconds or listen for a signal on a port >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Once we're done running we take the result and filter >>>>>>>>>>>>>>>>>>>>> for the ones with T and V into seperate collections >>>>>>>>>>>>>>>>>>>>> re-run until finished >>>>>>>>>>>>>>>>>>>>> and then union the results >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> This is maybe not a great design but it was minimally >>>>>>>>>>>>>>>>>>>>> complicated and I figured terrible was a good place to >>>>>>>>>>>>>>>>>>>>> start and improve >>>>>>>>>>>>>>>>>>>>> from. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where >>>>>>>>>>>>>>>>>>>>> this is worse than I remember because its been awhile >>>>>>>>>>>>>>>>>>>>> since I thought about >>>>>>>>>>>>>>>>>>>>> this. >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>> >>>>>>>>>> -- >>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Twitter: https://twitter.com/holdenkarau >>>>>> >>>>> -- Twitter: https://twitter.com/holdenkarau