On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <kirpic...@google.com> wrote:
> > > On Fri, Mar 23, 2018, 11:17 PM Holden Karau <hol...@pigscanfly.ca> wrote: > >> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <kirpic...@google.com> >> wrote: >> >>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <hol...@pigscanfly.ca> >>> wrote: >>> >>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <kirpic...@google.com> >>>> wrote: >>>> >>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <hol...@pigscanfly.ca> >>>>> wrote: >>>>> >>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov < >>>>>> kirpic...@google.com> wrote: >>>>>> >>>>>>> Reviving this thread. I think SDF is a pretty big risk for Spark >>>>>>> runner streaming. Holden, is it correct that Spark appears to have no >>>>>>> way >>>>>>> at all to produce an infinite DStream from a finite RDD? Maybe we can >>>>>>> somehow dynamically create a new DStream for every initial restriction, >>>>>>> said DStream being obtained using a Receiver that under the hood >>>>>>> actually >>>>>>> runs the SDF? (this is of course less efficient than a timer-capable >>>>>>> runner >>>>>>> would do, and I have doubts about the fault tolerance) >>>>>>> >>>>>> So on the streaming side we could simply do it with a fixed number of >>>>>> levels on DStreams. It’s not great but it would work. >>>>>> >>>>> Not sure I understand this. Let me try to clarify what SDF demands of >>>>> the runner. Imagine the following case: a file contains a list of "master" >>>>> Kafka topics, on which there are published additional Kafka topics to >>>>> read. >>>>> >>>>> PCollection<String> masterTopics = TextIO.read().from(masterTopicsFile) >>>>> PCollection<String> nestedTopics = >>>>> masterTopics.apply(ParDo(ReadFromKafkaFn)) >>>>> PCollection<String> records = >>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn)) >>>>> >>>>> This exemplifies both use cases of a streaming SDF that emits infinite >>>>> output for every input: >>>>> - Applying it to a finite set of inputs (in this case to the result of >>>>> reading a text file) >>>>> - Applying it to an infinite set of inputs (i.e. having an unbounded >>>>> number of streams being read concurrently, each of the streams themselves >>>>> is unbounded too) >>>>> >>>>> Does the multi-level solution you have in mind work for this case? I >>>>> suppose the second case is harder, so we can focus on that. >>>>> >>>> So none of those are a splittabledofn right? >>>> >>> Not sure what you mean? ReadFromKafkaFn in these examples is a >>> splittable DoFn and we're trying to figure out how to make Spark run it. >>> >>> >> Ah ok, sorry I saw that and for some reason parsed them as old style >> DoFns in my head. >> >> To effectively allow us to union back into the “same” DStream we’d have >> to end up using Sparks queue streams (or their equivalent custom source >> because of some queue stream limitations), which invites some reliability >> challenges. This might be at the point where I should send a diagram/some >> sample code since it’s a bit convoluted. >> >> The more I think about the jumps required to make the “simple” union >> approach work, the more it seems just using the statemapping for steaming >> is probably more reasonable. Although the state tracking in Spark can be >> somewhat expensive so it would probably make sense to benchmark to see if >> it meets our needs. >> > So the problem is, I don't think this can be made to work using > mapWithState. It doesn't allow a mapping function that emits infinite > output for an input element, directly or not. > So, provided there is an infinite input (eg pick a never ending queue stream), and each call produces a finite output, we would have an infinite number of calls. > > Dataflow and Flink, for example, had timer support even before SDFs, and a > timer can set another timer and thus end up doing an infinite amount of > work in a fault tolerant way - so SDF could be implemented on top of that. > But AFAIK spark doesn't have a similar feature, hence my concern. > So we can do an inifinite queue stream which would allow us to be triggered at each interval and handle our own persistence. > > >> But these still are both DStream based rather than Dataset which we might >> want to support (depends on what direction folks take with the runners). >> >> If we wanted to do this in the dataset world looking at a custom >> sink/source would also be an option, (which is effectively what a custom >> queue stream like thing for dstreams requires), but the datasource APIs are >> a bit influx so if we ended up doing things at the edge of what’s allowed >> there’s a good chance we’d have to rewrite it a few times. >> >> >>>> Assuming that we have a given dstream though in Spark we can get the >>>> underlying RDD implementation for each microbatch and do our work inside of >>>> that. >>>> >>>>> >>>>> >>>>>> >>>>>> More generally this does raise an important question if we want to >>>>>> target datasets instead of rdds/DStreams in which case i would need to do >>>>>> some more poking. >>>>>> >>>>>> >>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com> >>>>>>> wrote: >>>>>>> >>>>>>>> How would timers be implemented? By outputing and reprocessing, the >>>>>>>> same way you proposed for SDF? >>>>>>>> >>>>>>> i mean the timers could be inside the mappers within the system. >>>>>> Could use a singleton so if a partition is re-executed it doesn’t end up >>>>>> as >>>>>> a straggler. >>>>>> >>>>>>> >>>>>>>> >>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <hol...@pigscanfly.ca> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> So the timers would have to be in our own code. >>>>>>>>> >>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov < >>>>>>>>> kirpic...@google.com> wrote: >>>>>>>>> >>>>>>>>>> Does Spark have support for timers? (I know it has support for >>>>>>>>>> state) >>>>>>>>>> >>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Could we alternatively use a state mapping function to keep >>>>>>>>>>> track of the computation so far instead of outputting V each time? >>>>>>>>>>> (also >>>>>>>>>>> the progress so far is probably of a different type R rather than >>>>>>>>>>> V). >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau < >>>>>>>>>>> hol...@pigscanfly.ca> wrote: >>>>>>>>>>> >>>>>>>>>>>> So we had a quick chat about what it would take to add >>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some sketchy >>>>>>>>>>>> thinking >>>>>>>>>>>> about this last year but didn't get very far. >>>>>>>>>>>> >>>>>>>>>>>> My back-of-the-envelope design was as follows: >>>>>>>>>>>> For input type T >>>>>>>>>>>> Output type V >>>>>>>>>>>> >>>>>>>>>>>> Implement a mapper which outputs type (T, V) >>>>>>>>>>>> and if the computation finishes T will be populated otherwise V >>>>>>>>>>>> will be >>>>>>>>>>>> >>>>>>>>>>>> For determining how long to run we'd up to either K seconds or >>>>>>>>>>>> listen for a signal on a port >>>>>>>>>>>> >>>>>>>>>>>> Once we're done running we take the result and filter for the >>>>>>>>>>>> ones with T and V into seperate collections re-run until finished >>>>>>>>>>>> and then union the results >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> This is maybe not a great design but it was minimally >>>>>>>>>>>> complicated and I figured terrible was a good place to start and >>>>>>>>>>>> improve >>>>>>>>>>>> from. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> Let me know your thoughts, especially the parts where this is >>>>>>>>>>>> worse than I remember because its been awhile since I thought >>>>>>>>>>>> about this. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> -- >>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>> >>>>>>>> -- >>>>>> Twitter: https://twitter.com/holdenkarau >>>>>> >>>>> -- >>>> Twitter: https://twitter.com/holdenkarau >>>> >>> -- >> Twitter: https://twitter.com/holdenkarau >> > -- Twitter: https://twitter.com/holdenkarau