On Fri, Mar 23, 2018, 11:17 PM Holden Karau <hol...@pigscanfly.ca> wrote:
> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <kirpic...@google.com> > wrote: > >> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <hol...@pigscanfly.ca> >> wrote: >> >>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <kirpic...@google.com> >>> wrote: >>> >>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <hol...@pigscanfly.ca> >>>> wrote: >>>> >>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <kirpic...@google.com> >>>>> wrote: >>>>> >>>>>> Reviving this thread. I think SDF is a pretty big risk for Spark >>>>>> runner streaming. Holden, is it correct that Spark appears to have no way >>>>>> at all to produce an infinite DStream from a finite RDD? Maybe we can >>>>>> somehow dynamically create a new DStream for every initial restriction, >>>>>> said DStream being obtained using a Receiver that under the hood actually >>>>>> runs the SDF? (this is of course less efficient than a timer-capable >>>>>> runner >>>>>> would do, and I have doubts about the fault tolerance) >>>>>> >>>>> So on the streaming side we could simply do it with a fixed number of >>>>> levels on DStreams. It’s not great but it would work. >>>>> >>>> Not sure I understand this. Let me try to clarify what SDF demands of >>>> the runner. Imagine the following case: a file contains a list of "master" >>>> Kafka topics, on which there are published additional Kafka topics to read. >>>> >>>> PCollection<String> masterTopics = TextIO.read().from(masterTopicsFile) >>>> PCollection<String> nestedTopics = >>>> masterTopics.apply(ParDo(ReadFromKafkaFn)) >>>> PCollection<String> records = nestedTopics.apply(ParDo(ReadFromKafkaFn)) >>>> >>>> This exemplifies both use cases of a streaming SDF that emits infinite >>>> output for every input: >>>> - Applying it to a finite set of inputs (in this case to the result of >>>> reading a text file) >>>> - Applying it to an infinite set of inputs (i.e. having an unbounded >>>> number of streams being read concurrently, each of the streams themselves >>>> is unbounded too) >>>> >>>> Does the multi-level solution you have in mind work for this case? I >>>> suppose the second case is harder, so we can focus on that. >>>> >>> So none of those are a splittabledofn right? >>> >> Not sure what you mean? ReadFromKafkaFn in these examples is a splittable >> DoFn and we're trying to figure out how to make Spark run it. >> >> > Ah ok, sorry I saw that and for some reason parsed them as old style DoFns > in my head. > > To effectively allow us to union back into the “same” DStream we’d have > to end up using Sparks queue streams (or their equivalent custom source > because of some queue stream limitations), which invites some reliability > challenges. This might be at the point where I should send a diagram/some > sample code since it’s a bit convoluted. > > The more I think about the jumps required to make the “simple” union > approach work, the more it seems just using the statemapping for steaming > is probably more reasonable. Although the state tracking in Spark can be > somewhat expensive so it would probably make sense to benchmark to see if > it meets our needs. > So the problem is, I don't think this can be made to work using mapWithState. It doesn't allow a mapping function that emits infinite output for an input element, directly or not. Dataflow and Flink, for example, had timer support even before SDFs, and a timer can set another timer and thus end up doing an infinite amount of work in a fault tolerant way - so SDF could be implemented on top of that. But AFAIK spark doesn't have a similar feature, hence my concern. > But these still are both DStream based rather than Dataset which we might > want to support (depends on what direction folks take with the runners). > > If we wanted to do this in the dataset world looking at a custom > sink/source would also be an option, (which is effectively what a custom > queue stream like thing for dstreams requires), but the datasource APIs are > a bit influx so if we ended up doing things at the edge of what’s allowed > there’s a good chance we’d have to rewrite it a few times. > > >>> Assuming that we have a given dstream though in Spark we can get the >>> underlying RDD implementation for each microbatch and do our work inside of >>> that. >>> >>>> >>>> >>>>> >>>>> More generally this does raise an important question if we want to >>>>> target datasets instead of rdds/DStreams in which case i would need to do >>>>> some more poking. >>>>> >>>>> >>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> How would timers be implemented? By outputing and reprocessing, the >>>>>>> same way you proposed for SDF? >>>>>>> >>>>>> i mean the timers could be inside the mappers within the system. >>>>> Could use a singleton so if a partition is re-executed it doesn’t end up >>>>> as >>>>> a straggler. >>>>> >>>>>> >>>>>>> >>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <hol...@pigscanfly.ca> >>>>>>> wrote: >>>>>>> >>>>>>>> So the timers would have to be in our own code. >>>>>>>> >>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov < >>>>>>>> kirpic...@google.com> wrote: >>>>>>>> >>>>>>>>> Does Spark have support for timers? (I know it has support for >>>>>>>>> state) >>>>>>>>> >>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Could we alternatively use a state mapping function to keep track >>>>>>>>>> of the computation so far instead of outputting V each time? (also >>>>>>>>>> the >>>>>>>>>> progress so far is probably of a different type R rather than V). >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau < >>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>> >>>>>>>>>>> So we had a quick chat about what it would take to add something >>>>>>>>>>> like SplittableDoFns to Spark. I'd done some sketchy thinking about >>>>>>>>>>> this >>>>>>>>>>> last year but didn't get very far. >>>>>>>>>>> >>>>>>>>>>> My back-of-the-envelope design was as follows: >>>>>>>>>>> For input type T >>>>>>>>>>> Output type V >>>>>>>>>>> >>>>>>>>>>> Implement a mapper which outputs type (T, V) >>>>>>>>>>> and if the computation finishes T will be populated otherwise V >>>>>>>>>>> will be >>>>>>>>>>> >>>>>>>>>>> For determining how long to run we'd up to either K seconds or >>>>>>>>>>> listen for a signal on a port >>>>>>>>>>> >>>>>>>>>>> Once we're done running we take the result and filter for the >>>>>>>>>>> ones with T and V into seperate collections re-run until finished >>>>>>>>>>> and then union the results >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> This is maybe not a great design but it was minimally >>>>>>>>>>> complicated and I figured terrible was a good place to start and >>>>>>>>>>> improve >>>>>>>>>>> from. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> Let me know your thoughts, especially the parts where this is >>>>>>>>>>> worse than I remember because its been awhile since I thought about >>>>>>>>>>> this. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>> >>>>>>>>>> -- >>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>> >>>>>>> -- >>>>> Twitter: https://twitter.com/holdenkarau >>>>> >>>> -- >>> Twitter: https://twitter.com/holdenkarau >>> >> -- > Twitter: https://twitter.com/holdenkarau >