(resurrecting thread as I'm back from leave) I looked at this mode, and indeed as Reuven points out it seems that it affects execution details, but doesn't offer any new APIs. Holden - your suggestions of piggybacking an unbounded-per-element SDF on top of an infinite stream would work if 1) there was just 1 element and 2) the work was guaranteed to be infinite.
Unfortunately, both of these assumptions are insufficient. In particular: - 1: The SDF is applied to a PCollection; the PCollection itself may be unbounded; and the unbounded work done by the SDF happens for every element. E.g. we might have a Kafka topic on which names of Kafka topics arrive, and we may end up concurrently reading a continuously growing number of topics. - 2: The work per element is not necessarily infinite, it's just *not guaranteed to be finite* - the SDF is allowed at any moment to say "Okay, this restriction is done for real" by returning stop() from the @ProcessElement method. Continuing the Kafka example, e.g., it could do that if the topic/partition being watched is deleted. Having an infinite stream as a driver of this process would require being able to send a signal to the stream to stop itself. Is it looking like there's any other way this can be done in Spark as-is, or are we going to have to make changes to Spark to support this? On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <[email protected]> wrote: > I mean the new mode is very much in the Dataset not the DStream API > (although you can use the Dataset API with the old modes too). > > On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <[email protected]> wrote: > >> But this new mode isn't a semantic change, right? It's moving away from >> micro batches into something that looks a lot like what Flink does - >> continuous processing with asynchronous snapshot boundaries. >> >> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <[email protected]> wrote: >> >>> Hopefully the new "continuous processing mode" in Spark will enable SDF >>> implementation (and real streaming)? >>> >>> Thanks, >>> Thomas >>> >>> >>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <[email protected]> >>> wrote: >>> >>>> >>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <[email protected]> >>>> wrote: >>>> >>>>> >>>>> >>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <[email protected]> >>>>> wrote: >>>>> >>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov < >>>>>> [email protected]> wrote: >>>>>> >>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <[email protected]> >>>>>>> wrote: >>>>>>> >>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov < >>>>>>>> [email protected]> wrote: >>>>>>>> >>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov < >>>>>>>>>> [email protected]> wrote: >>>>>>>>>> >>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for Spark >>>>>>>>>>> runner streaming. Holden, is it correct that Spark appears to have >>>>>>>>>>> no way >>>>>>>>>>> at all to produce an infinite DStream from a finite RDD? Maybe we >>>>>>>>>>> can >>>>>>>>>>> somehow dynamically create a new DStream for every initial >>>>>>>>>>> restriction, >>>>>>>>>>> said DStream being obtained using a Receiver that under the hood >>>>>>>>>>> actually >>>>>>>>>>> runs the SDF? (this is of course less efficient than a >>>>>>>>>>> timer-capable runner >>>>>>>>>>> would do, and I have doubts about the fault tolerance) >>>>>>>>>>> >>>>>>>>>> So on the streaming side we could simply do it with a fixed >>>>>>>>>> number of levels on DStreams. It’s not great but it would work. >>>>>>>>>> >>>>>>>>> Not sure I understand this. Let me try to clarify what SDF demands >>>>>>>>> of the runner. Imagine the following case: a file contains a list of >>>>>>>>> "master" Kafka topics, on which there are published additional Kafka >>>>>>>>> topics >>>>>>>>> to read. >>>>>>>>> >>>>>>>>> PCollection<String> masterTopics = >>>>>>>>> TextIO.read().from(masterTopicsFile) >>>>>>>>> PCollection<String> nestedTopics = >>>>>>>>> masterTopics.apply(ParDo(ReadFromKafkaFn)) >>>>>>>>> PCollection<String> records = >>>>>>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn)) >>>>>>>>> >>>>>>>>> This exemplifies both use cases of a streaming SDF that emits >>>>>>>>> infinite output for every input: >>>>>>>>> - Applying it to a finite set of inputs (in this case to the >>>>>>>>> result of reading a text file) >>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an >>>>>>>>> unbounded number of streams being read concurrently, each of the >>>>>>>>> streams >>>>>>>>> themselves is unbounded too) >>>>>>>>> >>>>>>>>> Does the multi-level solution you have in mind work for this case? >>>>>>>>> I suppose the second case is harder, so we can focus on that. >>>>>>>>> >>>>>>>> So none of those are a splittabledofn right? >>>>>>>> >>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a >>>>>>> splittable DoFn and we're trying to figure out how to make Spark run it. >>>>>>> >>>>>>> >>>>>> Ah ok, sorry I saw that and for some reason parsed them as old style >>>>>> DoFns in my head. >>>>>> >>>>>> To effectively allow us to union back into the “same” DStream we’d >>>>>> have to end up using Sparks queue streams (or their equivalent custom >>>>>> source because of some queue stream limitations), which invites some >>>>>> reliability challenges. This might be at the point where I should send a >>>>>> diagram/some sample code since it’s a bit convoluted. >>>>>> >>>>>> The more I think about the jumps required to make the “simple” union >>>>>> approach work, the more it seems just using the statemapping for steaming >>>>>> is probably more reasonable. Although the state tracking in Spark can be >>>>>> somewhat expensive so it would probably make sense to benchmark to see if >>>>>> it meets our needs. >>>>>> >>>>> So the problem is, I don't think this can be made to work using >>>>> mapWithState. It doesn't allow a mapping function that emits infinite >>>>> output for an input element, directly or not. >>>>> >>>> So, provided there is an infinite input (eg pick a never ending queue >>>> stream), and each call produces a finite output, we would have an infinite >>>> number of calls. >>>> >>>>> >>>>> Dataflow and Flink, for example, had timer support even before SDFs, >>>>> and a timer can set another timer and thus end up doing an infinite amount >>>>> of work in a fault tolerant way - so SDF could be implemented on top of >>>>> that. But AFAIK spark doesn't have a similar feature, hence my concern. >>>>> >>>> So we can do an inifinite queue stream which would allow us to be >>>> triggered at each interval and handle our own persistence. >>>> >>>>> >>>>> >>>>>> But these still are both DStream based rather than Dataset which we >>>>>> might want to support (depends on what direction folks take with the >>>>>> runners). >>>>>> >>>>>> If we wanted to do this in the dataset world looking at a custom >>>>>> sink/source would also be an option, (which is effectively what a custom >>>>>> queue stream like thing for dstreams requires), but the datasource APIs >>>>>> are >>>>>> a bit influx so if we ended up doing things at the edge of what’s allowed >>>>>> there’s a good chance we’d have to rewrite it a few times. >>>>>> >>>>>> >>>>>>>> Assuming that we have a given dstream though in Spark we can get >>>>>>>> the underlying RDD implementation for each microbatch and do our work >>>>>>>> inside of that. >>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>>> >>>>>>>>>> More generally this does raise an important question if we want >>>>>>>>>> to target datasets instead of rdds/DStreams in which case i would >>>>>>>>>> need to >>>>>>>>>> do some more poking. >>>>>>>>>> >>>>>>>>>> >>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <[email protected]> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> How would timers be implemented? By outputing and reprocessing, >>>>>>>>>>>> the same way you proposed for SDF? >>>>>>>>>>>> >>>>>>>>>>> i mean the timers could be inside the mappers within the system. >>>>>>>>>> Could use a singleton so if a partition is re-executed it doesn’t >>>>>>>>>> end up as >>>>>>>>>> a straggler. >>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau < >>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> So the timers would have to be in our own code. >>>>>>>>>>>>> >>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> Does Spark have support for timers? (I know it has support >>>>>>>>>>>>>> for state) >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <[email protected]> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Could we alternatively use a state mapping function to keep >>>>>>>>>>>>>>> track of the computation so far instead of outputting V each >>>>>>>>>>>>>>> time? (also >>>>>>>>>>>>>>> the progress so far is probably of a different type R rather >>>>>>>>>>>>>>> than V). >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau < >>>>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> So we had a quick chat about what it would take to add >>>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some sketchy >>>>>>>>>>>>>>>> thinking >>>>>>>>>>>>>>>> about this last year but didn't get very far. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows: >>>>>>>>>>>>>>>> For input type T >>>>>>>>>>>>>>>> Output type V >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V) >>>>>>>>>>>>>>>> and if the computation finishes T will be populated >>>>>>>>>>>>>>>> otherwise V will be >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> For determining how long to run we'd up to either K seconds >>>>>>>>>>>>>>>> or listen for a signal on a port >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Once we're done running we take the result and filter for >>>>>>>>>>>>>>>> the ones with T and V into seperate collections re-run until >>>>>>>>>>>>>>>> finished >>>>>>>>>>>>>>>> and then union the results >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> This is maybe not a great design but it was minimally >>>>>>>>>>>>>>>> complicated and I figured terrible was a good place to start >>>>>>>>>>>>>>>> and improve >>>>>>>>>>>>>>>> from. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where this >>>>>>>>>>>>>>>> is worse than I remember because its been awhile since I >>>>>>>>>>>>>>>> thought about this. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>> >>>>>>>>> -- >>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>> >>>>>>> -- >>>>>> Twitter: https://twitter.com/holdenkarau >>>>>> >>>>> -- >>>> Twitter: https://twitter.com/holdenkarau >>>> >>> >>> > > > -- > Twitter: https://twitter.com/holdenkarau >
