I don't think I understand what the limitations of timers are that you are referring to. FWIW I would say implementing other primitives like SDF is an explicit non-goal for Beam state & timers.
I got lost at some point in this thread, but is it actually necessary that a bounded PCollection maps to a finite/bounded structure in Spark? Skimming, I'm not sure if the problem is that we can't transliterate Beam to Spark (this might be a good sign) or that we can't express SDF style computation at all (seems far-fetched, but I could be convinced). Does doing a lightweight analysis and just promoting some things to be some kind of infinite representation help? Kenn On Tue, Apr 24, 2018 at 2:37 PM Eugene Kirpichov <kirpic...@google.com> wrote: > Would like to revive this thread one more time. > > At this point I'm pretty certain that Spark can't support this out of the > box and we're gonna have to make changes to Spark. > > Holden, could you advise who would be some Spark experts (yourself > included :) ) who could advise what kind of Spark change would both support > this AND be useful to the regular Spark community (non-Beam) so that it has > a chance of finding support? E.g. is there any plan in Spark regarding > adding timers similar to Flink's or Beam's timers, maybe we could help out > with that? > > +Kenneth Knowles <k...@google.com> because timers suffer from the same > problem. > > On Thu, Apr 12, 2018 at 2:28 PM Eugene Kirpichov <kirpic...@google.com> > wrote: > >> (resurrecting thread as I'm back from leave) >> >> I looked at this mode, and indeed as Reuven points out it seems that it >> affects execution details, but doesn't offer any new APIs. >> Holden - your suggestions of piggybacking an unbounded-per-element SDF on >> top of an infinite stream would work if 1) there was just 1 element and 2) >> the work was guaranteed to be infinite. >> >> Unfortunately, both of these assumptions are insufficient. In particular: >> >> - 1: The SDF is applied to a PCollection; the PCollection itself may be >> unbounded; and the unbounded work done by the SDF happens for every >> element. E.g. we might have a Kafka topic on which names of Kafka topics >> arrive, and we may end up concurrently reading a continuously growing >> number of topics. >> - 2: The work per element is not necessarily infinite, it's just *not >> guaranteed to be finite* - the SDF is allowed at any moment to say >> "Okay, this restriction is done for real" by returning stop() from the >> @ProcessElement method. Continuing the Kafka example, e.g., it could do >> that if the topic/partition being watched is deleted. Having an infinite >> stream as a driver of this process would require being able to send a >> signal to the stream to stop itself. >> >> Is it looking like there's any other way this can be done in Spark as-is, >> or are we going to have to make changes to Spark to support this? >> >> On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <hol...@pigscanfly.ca> >> wrote: >> >>> I mean the new mode is very much in the Dataset not the DStream API >>> (although you can use the Dataset API with the old modes too). >>> >>> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com> wrote: >>> >>>> But this new mode isn't a semantic change, right? It's moving away from >>>> micro batches into something that looks a lot like what Flink does - >>>> continuous processing with asynchronous snapshot boundaries. >>>> >>>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <t...@apache.org> wrote: >>>> >>>>> Hopefully the new "continuous processing mode" in Spark will enable >>>>> SDF implementation (and real streaming)? >>>>> >>>>> Thanks, >>>>> Thomas >>>>> >>>>> >>>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <hol...@pigscanfly.ca> >>>>> wrote: >>>>> >>>>>> >>>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov < >>>>>> kirpic...@google.com> wrote: >>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <hol...@pigscanfly.ca> >>>>>>> wrote: >>>>>>> >>>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov < >>>>>>>> kirpic...@google.com> wrote: >>>>>>>> >>>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <hol...@pigscanfly.ca> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov < >>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>> >>>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau < >>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>> >>>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov < >>>>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for >>>>>>>>>>>>> Spark runner streaming. Holden, is it correct that Spark appears >>>>>>>>>>>>> to have no >>>>>>>>>>>>> way at all to produce an infinite DStream from a finite RDD? >>>>>>>>>>>>> Maybe we can >>>>>>>>>>>>> somehow dynamically create a new DStream for every initial >>>>>>>>>>>>> restriction, >>>>>>>>>>>>> said DStream being obtained using a Receiver that under the hood >>>>>>>>>>>>> actually >>>>>>>>>>>>> runs the SDF? (this is of course less efficient than a >>>>>>>>>>>>> timer-capable runner >>>>>>>>>>>>> would do, and I have doubts about the fault tolerance) >>>>>>>>>>>>> >>>>>>>>>>>> So on the streaming side we could simply do it with a fixed >>>>>>>>>>>> number of levels on DStreams. It’s not great but it would work. >>>>>>>>>>>> >>>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF >>>>>>>>>>> demands of the runner. Imagine the following case: a file contains >>>>>>>>>>> a list >>>>>>>>>>> of "master" Kafka topics, on which there are published additional >>>>>>>>>>> Kafka >>>>>>>>>>> topics to read. >>>>>>>>>>> >>>>>>>>>>> PCollection<String> masterTopics = >>>>>>>>>>> TextIO.read().from(masterTopicsFile) >>>>>>>>>>> PCollection<String> nestedTopics = >>>>>>>>>>> masterTopics.apply(ParDo(ReadFromKafkaFn)) >>>>>>>>>>> PCollection<String> records = >>>>>>>>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn)) >>>>>>>>>>> >>>>>>>>>>> This exemplifies both use cases of a streaming SDF that emits >>>>>>>>>>> infinite output for every input: >>>>>>>>>>> - Applying it to a finite set of inputs (in this case to the >>>>>>>>>>> result of reading a text file) >>>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an >>>>>>>>>>> unbounded number of streams being read concurrently, each of the >>>>>>>>>>> streams >>>>>>>>>>> themselves is unbounded too) >>>>>>>>>>> >>>>>>>>>>> Does the multi-level solution you have in mind work for this >>>>>>>>>>> case? I suppose the second case is harder, so we can focus on that. >>>>>>>>>>> >>>>>>>>>> So none of those are a splittabledofn right? >>>>>>>>>> >>>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a >>>>>>>>> splittable DoFn and we're trying to figure out how to make Spark run >>>>>>>>> it. >>>>>>>>> >>>>>>>>> >>>>>>>> Ah ok, sorry I saw that and for some reason parsed them as old >>>>>>>> style DoFns in my head. >>>>>>>> >>>>>>>> To effectively allow us to union back into the “same” DStream we’d >>>>>>>> have to end up using Sparks queue streams (or their equivalent custom >>>>>>>> source because of some queue stream limitations), which invites some >>>>>>>> reliability challenges. This might be at the point where I should send >>>>>>>> a >>>>>>>> diagram/some sample code since it’s a bit convoluted. >>>>>>>> >>>>>>>> The more I think about the jumps required to make the “simple” >>>>>>>> union approach work, the more it seems just using the statemapping for >>>>>>>> steaming is probably more reasonable. Although the state tracking in >>>>>>>> Spark >>>>>>>> can be somewhat expensive so it would probably make sense to benchmark >>>>>>>> to >>>>>>>> see if it meets our needs. >>>>>>>> >>>>>>> So the problem is, I don't think this can be made to work using >>>>>>> mapWithState. It doesn't allow a mapping function that emits infinite >>>>>>> output for an input element, directly or not. >>>>>>> >>>>>> So, provided there is an infinite input (eg pick a never ending queue >>>>>> stream), and each call produces a finite output, we would have an >>>>>> infinite >>>>>> number of calls. >>>>>> >>>>>>> >>>>>>> Dataflow and Flink, for example, had timer support even before SDFs, >>>>>>> and a timer can set another timer and thus end up doing an infinite >>>>>>> amount >>>>>>> of work in a fault tolerant way - so SDF could be implemented on top of >>>>>>> that. But AFAIK spark doesn't have a similar feature, hence my concern. >>>>>>> >>>>>> So we can do an inifinite queue stream which would allow us to be >>>>>> triggered at each interval and handle our own persistence. >>>>>> >>>>>>> >>>>>>> >>>>>>>> But these still are both DStream based rather than Dataset which we >>>>>>>> might want to support (depends on what direction folks take with the >>>>>>>> runners). >>>>>>>> >>>>>>>> If we wanted to do this in the dataset world looking at a custom >>>>>>>> sink/source would also be an option, (which is effectively what a >>>>>>>> custom >>>>>>>> queue stream like thing for dstreams requires), but the datasource >>>>>>>> APIs are >>>>>>>> a bit influx so if we ended up doing things at the edge of what’s >>>>>>>> allowed >>>>>>>> there’s a good chance we’d have to rewrite it a few times. >>>>>>>> >>>>>>>> >>>>>>>>>> Assuming that we have a given dstream though in Spark we can get >>>>>>>>>> the underlying RDD implementation for each microbatch and do our work >>>>>>>>>> inside of that. >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> More generally this does raise an important question if we want >>>>>>>>>>>> to target datasets instead of rdds/DStreams in which case i would >>>>>>>>>>>> need to >>>>>>>>>>>> do some more poking. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com> >>>>>>>>>>>>> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> How would timers be implemented? By outputing and >>>>>>>>>>>>>> reprocessing, the same way you proposed for SDF? >>>>>>>>>>>>>> >>>>>>>>>>>>> i mean the timers could be inside the mappers within the >>>>>>>>>>>> system. Could use a singleton so if a partition is re-executed it >>>>>>>>>>>> doesn’t >>>>>>>>>>>> end up as a straggler. >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau < >>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> So the timers would have to be in our own code. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov < >>>>>>>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has support >>>>>>>>>>>>>>>> for state) >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax < >>>>>>>>>>>>>>>> re...@google.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Could we alternatively use a state mapping function to >>>>>>>>>>>>>>>>> keep track of the computation so far instead of outputting V >>>>>>>>>>>>>>>>> each time? >>>>>>>>>>>>>>>>> (also the progress so far is probably of a different type R >>>>>>>>>>>>>>>>> rather than V). >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau < >>>>>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to add >>>>>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some >>>>>>>>>>>>>>>>>> sketchy thinking >>>>>>>>>>>>>>>>>> about this last year but didn't get very far. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows: >>>>>>>>>>>>>>>>>> For input type T >>>>>>>>>>>>>>>>>> Output type V >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V) >>>>>>>>>>>>>>>>>> and if the computation finishes T will be populated >>>>>>>>>>>>>>>>>> otherwise V will be >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K >>>>>>>>>>>>>>>>>> seconds or listen for a signal on a port >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Once we're done running we take the result and filter for >>>>>>>>>>>>>>>>>> the ones with T and V into seperate collections re-run until >>>>>>>>>>>>>>>>>> finished >>>>>>>>>>>>>>>>>> and then union the results >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> This is maybe not a great design but it was minimally >>>>>>>>>>>>>>>>>> complicated and I figured terrible was a good place to start >>>>>>>>>>>>>>>>>> and improve >>>>>>>>>>>>>>>>>> from. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where >>>>>>>>>>>>>>>>>> this is worse than I remember because its been awhile since >>>>>>>>>>>>>>>>>> I thought about >>>>>>>>>>>>>>>>>> this. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>>>>>> >>>>>>>>>>>>>> -- >>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>> >>>>>>>>> -- >>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>> >>>>>>> -- >>>>>> Twitter: https://twitter.com/holdenkarau >>>>>> >>>>> >>>>> >>> >>> >>> -- >>> Twitter: https://twitter.com/holdenkarau >>> >>