Could we do this behind the scenes by writing a Receiver that publishes periodic pings?
On Tue, Apr 24, 2018 at 10:09 PM Eugene Kirpichov <kirpic...@google.com> wrote: > Kenn - I'm arguing that in Spark SDF style computation can not be > expressed at all, and neither can Beam's timers. > > Spark, unlike Flink, does not have a timer facility (only state), and as > far as I can tell its programming model has no other primitive that can map > a finite RDD into an infinite DStream - the only way to create a new > infinite DStream appears to be to write a Receiver. > > I cc'd you because I'm wondering whether you've already investigated this > when considering whether timers can be implemented on the Spark runner. > > On Tue, Apr 24, 2018 at 2:53 PM Kenneth Knowles <k...@google.com> wrote: > >> I don't think I understand what the limitations of timers are that you >> are referring to. FWIW I would say implementing other primitives like SDF >> is an explicit non-goal for Beam state & timers. >> >> I got lost at some point in this thread, but is it actually necessary >> that a bounded PCollection maps to a finite/bounded structure in Spark? >> Skimming, I'm not sure if the problem is that we can't transliterate Beam >> to Spark (this might be a good sign) or that we can't express SDF style >> computation at all (seems far-fetched, but I could be convinced). Does >> doing a lightweight analysis and just promoting some things to be some kind >> of infinite representation help? >> >> Kenn >> >> On Tue, Apr 24, 2018 at 2:37 PM Eugene Kirpichov <kirpic...@google.com> >> wrote: >> >>> Would like to revive this thread one more time. >>> >>> At this point I'm pretty certain that Spark can't support this out of >>> the box and we're gonna have to make changes to Spark. >>> >>> Holden, could you advise who would be some Spark experts (yourself >>> included :) ) who could advise what kind of Spark change would both support >>> this AND be useful to the regular Spark community (non-Beam) so that it has >>> a chance of finding support? E.g. is there any plan in Spark regarding >>> adding timers similar to Flink's or Beam's timers, maybe we could help out >>> with that? >>> >>> +Kenneth Knowles <k...@google.com> because timers suffer from the same >>> problem. >>> >>> On Thu, Apr 12, 2018 at 2:28 PM Eugene Kirpichov <kirpic...@google.com> >>> wrote: >>> >>>> (resurrecting thread as I'm back from leave) >>>> >>>> I looked at this mode, and indeed as Reuven points out it seems that it >>>> affects execution details, but doesn't offer any new APIs. >>>> Holden - your suggestions of piggybacking an unbounded-per-element SDF >>>> on top of an infinite stream would work if 1) there was just 1 element and >>>> 2) the work was guaranteed to be infinite. >>>> >>>> Unfortunately, both of these assumptions are insufficient. In >>>> particular: >>>> >>>> - 1: The SDF is applied to a PCollection; the PCollection itself may be >>>> unbounded; and the unbounded work done by the SDF happens for every >>>> element. E.g. we might have a Kafka topic on which names of Kafka topics >>>> arrive, and we may end up concurrently reading a continuously growing >>>> number of topics. >>>> - 2: The work per element is not necessarily infinite, it's just *not >>>> guaranteed to be finite* - the SDF is allowed at any moment to say >>>> "Okay, this restriction is done for real" by returning stop() from the >>>> @ProcessElement method. Continuing the Kafka example, e.g., it could do >>>> that if the topic/partition being watched is deleted. Having an infinite >>>> stream as a driver of this process would require being able to send a >>>> signal to the stream to stop itself. >>>> >>>> Is it looking like there's any other way this can be done in Spark >>>> as-is, or are we going to have to make changes to Spark to support this? >>>> >>>> On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <hol...@pigscanfly.ca> >>>> wrote: >>>> >>>>> I mean the new mode is very much in the Dataset not the DStream API >>>>> (although you can use the Dataset API with the old modes too). >>>>> >>>>> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> But this new mode isn't a semantic change, right? It's moving away >>>>>> from micro batches into something that looks a lot like what Flink does - >>>>>> continuous processing with asynchronous snapshot boundaries. >>>>>> >>>>>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <t...@apache.org> wrote: >>>>>> >>>>>>> Hopefully the new "continuous processing mode" in Spark will enable >>>>>>> SDF implementation (and real streaming)? >>>>>>> >>>>>>> Thanks, >>>>>>> Thomas >>>>>>> >>>>>>> >>>>>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <hol...@pigscanfly.ca> >>>>>>> wrote: >>>>>>> >>>>>>>> >>>>>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov < >>>>>>>> kirpic...@google.com> wrote: >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <hol...@pigscanfly.ca> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov < >>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>> >>>>>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau < >>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>> >>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov < >>>>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau < >>>>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov < >>>>>>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for >>>>>>>>>>>>>>> Spark runner streaming. Holden, is it correct that Spark >>>>>>>>>>>>>>> appears to have no >>>>>>>>>>>>>>> way at all to produce an infinite DStream from a finite RDD? >>>>>>>>>>>>>>> Maybe we can >>>>>>>>>>>>>>> somehow dynamically create a new DStream for every initial >>>>>>>>>>>>>>> restriction, >>>>>>>>>>>>>>> said DStream being obtained using a Receiver that under the >>>>>>>>>>>>>>> hood actually >>>>>>>>>>>>>>> runs the SDF? (this is of course less efficient than a >>>>>>>>>>>>>>> timer-capable runner >>>>>>>>>>>>>>> would do, and I have doubts about the fault tolerance) >>>>>>>>>>>>>>> >>>>>>>>>>>>>> So on the streaming side we could simply do it with a fixed >>>>>>>>>>>>>> number of levels on DStreams. It’s not great but it would work. >>>>>>>>>>>>>> >>>>>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF >>>>>>>>>>>>> demands of the runner. Imagine the following case: a file >>>>>>>>>>>>> contains a list >>>>>>>>>>>>> of "master" Kafka topics, on which there are published additional >>>>>>>>>>>>> Kafka >>>>>>>>>>>>> topics to read. >>>>>>>>>>>>> >>>>>>>>>>>>> PCollection<String> masterTopics = >>>>>>>>>>>>> TextIO.read().from(masterTopicsFile) >>>>>>>>>>>>> PCollection<String> nestedTopics = >>>>>>>>>>>>> masterTopics.apply(ParDo(ReadFromKafkaFn)) >>>>>>>>>>>>> PCollection<String> records = >>>>>>>>>>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn)) >>>>>>>>>>>>> >>>>>>>>>>>>> This exemplifies both use cases of a streaming SDF that emits >>>>>>>>>>>>> infinite output for every input: >>>>>>>>>>>>> - Applying it to a finite set of inputs (in this case to the >>>>>>>>>>>>> result of reading a text file) >>>>>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an >>>>>>>>>>>>> unbounded number of streams being read concurrently, each of the >>>>>>>>>>>>> streams >>>>>>>>>>>>> themselves is unbounded too) >>>>>>>>>>>>> >>>>>>>>>>>>> Does the multi-level solution you have in mind work for this >>>>>>>>>>>>> case? I suppose the second case is harder, so we can focus on >>>>>>>>>>>>> that. >>>>>>>>>>>>> >>>>>>>>>>>> So none of those are a splittabledofn right? >>>>>>>>>>>> >>>>>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a >>>>>>>>>>> splittable DoFn and we're trying to figure out how to make Spark >>>>>>>>>>> run it. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> Ah ok, sorry I saw that and for some reason parsed them as old >>>>>>>>>> style DoFns in my head. >>>>>>>>>> >>>>>>>>>> To effectively allow us to union back into the “same” DStream >>>>>>>>>> we’d have to end up using Sparks queue streams (or their equivalent >>>>>>>>>> custom >>>>>>>>>> source because of some queue stream limitations), which invites some >>>>>>>>>> reliability challenges. This might be at the point where I should >>>>>>>>>> send a >>>>>>>>>> diagram/some sample code since it’s a bit convoluted. >>>>>>>>>> >>>>>>>>>> The more I think about the jumps required to make the “simple” >>>>>>>>>> union approach work, the more it seems just using the statemapping >>>>>>>>>> for >>>>>>>>>> steaming is probably more reasonable. Although the state tracking in >>>>>>>>>> Spark >>>>>>>>>> can be somewhat expensive so it would probably make sense to >>>>>>>>>> benchmark to >>>>>>>>>> see if it meets our needs. >>>>>>>>>> >>>>>>>>> So the problem is, I don't think this can be made to work using >>>>>>>>> mapWithState. It doesn't allow a mapping function that emits infinite >>>>>>>>> output for an input element, directly or not. >>>>>>>>> >>>>>>>> So, provided there is an infinite input (eg pick a never ending >>>>>>>> queue stream), and each call produces a finite output, we would have an >>>>>>>> infinite number of calls. >>>>>>>> >>>>>>>>> >>>>>>>>> Dataflow and Flink, for example, had timer support even before >>>>>>>>> SDFs, and a timer can set another timer and thus end up doing an >>>>>>>>> infinite >>>>>>>>> amount of work in a fault tolerant way - so SDF could be implemented >>>>>>>>> on top >>>>>>>>> of that. But AFAIK spark doesn't have a similar feature, hence my >>>>>>>>> concern. >>>>>>>>> >>>>>>>> So we can do an inifinite queue stream which would allow us to be >>>>>>>> triggered at each interval and handle our own persistence. >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>> But these still are both DStream based rather than Dataset which >>>>>>>>>> we might want to support (depends on what direction folks take with >>>>>>>>>> the >>>>>>>>>> runners). >>>>>>>>>> >>>>>>>>>> If we wanted to do this in the dataset world looking at a custom >>>>>>>>>> sink/source would also be an option, (which is effectively what a >>>>>>>>>> custom >>>>>>>>>> queue stream like thing for dstreams requires), but the datasource >>>>>>>>>> APIs are >>>>>>>>>> a bit influx so if we ended up doing things at the edge of what’s >>>>>>>>>> allowed >>>>>>>>>> there’s a good chance we’d have to rewrite it a few times. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>>> Assuming that we have a given dstream though in Spark we can >>>>>>>>>>>> get the underlying RDD implementation for each microbatch and do >>>>>>>>>>>> our work >>>>>>>>>>>> inside of that. >>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>> More generally this does raise an important question if we >>>>>>>>>>>>>> want to target datasets instead of rdds/DStreams in which case i >>>>>>>>>>>>>> would need >>>>>>>>>>>>>> to do some more poking. >>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax < >>>>>>>>>>>>>>> re...@google.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> How would timers be implemented? By outputing and >>>>>>>>>>>>>>>> reprocessing, the same way you proposed for SDF? >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> i mean the timers could be inside the mappers within the >>>>>>>>>>>>>> system. Could use a singleton so if a partition is re-executed >>>>>>>>>>>>>> it doesn’t >>>>>>>>>>>>>> end up as a straggler. >>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau < >>>>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> So the timers would have to be in our own code. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov < >>>>>>>>>>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has >>>>>>>>>>>>>>>>>> support for state) >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax < >>>>>>>>>>>>>>>>>> re...@google.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Could we alternatively use a state mapping function to >>>>>>>>>>>>>>>>>>> keep track of the computation so far instead of outputting >>>>>>>>>>>>>>>>>>> V each time? >>>>>>>>>>>>>>>>>>> (also the progress so far is probably of a different type R >>>>>>>>>>>>>>>>>>> rather than V). >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau < >>>>>>>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to add >>>>>>>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some >>>>>>>>>>>>>>>>>>>> sketchy thinking >>>>>>>>>>>>>>>>>>>> about this last year but didn't get very far. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows: >>>>>>>>>>>>>>>>>>>> For input type T >>>>>>>>>>>>>>>>>>>> Output type V >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V) >>>>>>>>>>>>>>>>>>>> and if the computation finishes T will be populated >>>>>>>>>>>>>>>>>>>> otherwise V will be >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K >>>>>>>>>>>>>>>>>>>> seconds or listen for a signal on a port >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Once we're done running we take the result and filter >>>>>>>>>>>>>>>>>>>> for the ones with T and V into seperate collections re-run >>>>>>>>>>>>>>>>>>>> until finished >>>>>>>>>>>>>>>>>>>> and then union the results >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> This is maybe not a great design but it was minimally >>>>>>>>>>>>>>>>>>>> complicated and I figured terrible was a good place to >>>>>>>>>>>>>>>>>>>> start and improve >>>>>>>>>>>>>>>>>>>> from. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where >>>>>>>>>>>>>>>>>>>> this is worse than I remember because its been awhile >>>>>>>>>>>>>>>>>>>> since I thought about >>>>>>>>>>>>>>>>>>>> this. >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>> >>>>>>>>> -- >>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>> >>>>>>> >>>>>>> >>>>> >>>>> >>>>> -- >>>>> Twitter: https://twitter.com/holdenkarau >>>>> >>>>