On Fri, Apr 27, 2018 at 12:06 PM Robert Bradshaw <rober...@google.com> wrote:
> On Fri, Apr 27, 2018 at 11:56 AM Kenneth Knowles <k...@google.com> wrote: > > > I'm still pretty shallow on this topic & this thread, so forgive if I'm > restating or missing things. > > > My understanding is that the Spark runner does support Beam's triggering > semantics for unbounded aggregations, using the same support code from > runners/core that all runners use. Relevant code in SparkTimerInternals and > SparkGroupAlsoByWindowViaWindowSet. > > > IIRC timers are stored in state, scanned each microbatch to see which are > eligible. > > I think the issue (which is more severe in the case of sources) is what to > do if no more date comes in to trigger another microbatch. > So will a streaming pipeline fail to trigger in this case? I have this feeling the "join with an infinite stream of pings" might already be happening. Kenn > > I don't see an immediate barrier to having timer loops. I don't know > about performance of this approach, but currently the number of timers per > shard (key+window) is bounded by their declarations in code, so it is a > tiny number unless codegenerated. We do later want to have dynamic timers > (some people call it a TimerMap by analogy with MapState) but I haven't > seen a design or even a sketch that I can recall. > > > Kenn > > > On Thu, Apr 26, 2018 at 1:48 PM Holden Karau <hol...@pigscanfly.ca> > wrote: > > >> Yeah that's been the implied source of being able to be continuous, you > union with a receiver which produce an infinite number of batches (the > "never ending queue stream" but not actually a queuestream since they have > some limitations but our own implementation there of). > > >> On Tue, Apr 24, 2018 at 11:54 PM, Reuven Lax <re...@google.com> wrote: > > >>> Could we do this behind the scenes by writing a Receiver that publishes > periodic pings? > > >>> On Tue, Apr 24, 2018 at 10:09 PM Eugene Kirpichov < > kirpic...@google.com> > wrote: > > >>>> Kenn - I'm arguing that in Spark SDF style computation can not be > expressed at all, and neither can Beam's timers. > > >>>> Spark, unlike Flink, does not have a timer facility (only state), and > as far as I can tell its programming model has no other primitive that can > map a finite RDD into an infinite DStream - the only way to create a new > infinite DStream appears to be to write a Receiver. > > >>>> I cc'd you because I'm wondering whether you've already investigated > this when considering whether timers can be implemented on the Spark > runner. > > >>>> On Tue, Apr 24, 2018 at 2:53 PM Kenneth Knowles <k...@google.com> > wrote: > > >>>>> I don't think I understand what the limitations of timers are that > you are referring to. FWIW I would say implementing other primitives like > SDF is an explicit non-goal for Beam state & timers. > > >>>>> I got lost at some point in this thread, but is it actually necessary > that a bounded PCollection maps to a finite/bounded structure in Spark? > Skimming, I'm not sure if the problem is that we can't transliterate Beam > to Spark (this might be a good sign) or that we can't express SDF style > computation at all (seems far-fetched, but I could be convinced). Does > doing a lightweight analysis and just promoting some things to be some kind > of infinite representation help? > > >>>>> Kenn > > >>>>> On Tue, Apr 24, 2018 at 2:37 PM Eugene Kirpichov > >>>>> <kirpic...@google.com> > wrote: > > >>>>>> Would like to revive this thread one more time. > > >>>>>> At this point I'm pretty certain that Spark can't support this out > of the box and we're gonna have to make changes to Spark. > > >>>>>> Holden, could you advise who would be some Spark experts (yourself > included :) ) who could advise what kind of Spark change would both support > this AND be useful to the regular Spark community (non-Beam) so that it has > a chance of finding support? E.g. is there any plan in Spark regarding > adding timers similar to Flink's or Beam's timers, maybe we could help out > with that? > > >>>>>> +Kenneth Knowles because timers suffer from the same problem. > > >>>>>> On Thu, Apr 12, 2018 at 2:28 PM Eugene Kirpichov < > kirpic...@google.com> wrote: > > >>>>>>> (resurrecting thread as I'm back from leave) > > >>>>>>> I looked at this mode, and indeed as Reuven points out it seems > that it affects execution details, but doesn't offer any new APIs. > >>>>>>> Holden - your suggestions of piggybacking an unbounded-per-element > SDF on top of an infinite stream would work if 1) there was just 1 element > and 2) the work was guaranteed to be infinite. > > >>>>>>> Unfortunately, both of these assumptions are insufficient. In > particular: > > >>>>>>> - 1: The SDF is applied to a PCollection; the PCollection itself > may be unbounded; and the unbounded work done by the SDF happens for every > element. E.g. we might have a Kafka topic on which names of Kafka topics > arrive, and we may end up concurrently reading a continuously growing > number of topics. > >>>>>>> - 2: The work per element is not necessarily infinite, it's just > not guaranteed to be finite - the SDF is allowed at any moment to say > "Okay, this restriction is done for real" by returning stop() from the > @ProcessElement method. Continuing the Kafka example, e.g., it could do > that if the topic/partition being watched is deleted. Having an infinite > stream as a driver of this process would require being able to send a > signal to the stream to stop itself. > > >>>>>>> Is it looking like there's any other way this can be done in Spark > as-is, or are we going to have to make changes to Spark to support this? > > >>>>>>> On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <hol...@pigscanfly.ca > > > wrote: > > >>>>>>>> I mean the new mode is very much in the Dataset not the DStream > API (although you can use the Dataset API with the old modes too). > > >>>>>>>> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com> > wrote: > > >>>>>>>>> But this new mode isn't a semantic change, right? It's moving > away from micro batches into something that looks a lot like what Flink > does - continuous processing with asynchronous snapshot boundaries. > > >>>>>>>>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <t...@apache.org> > wrote: > > >>>>>>>>>> Hopefully the new "continuous processing mode" in Spark will > enable SDF implementation (and real streaming)? > > >>>>>>>>>> Thanks, > >>>>>>>>>> Thomas > > > >>>>>>>>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau < > hol...@pigscanfly.ca> wrote: > > > >>>>>>>>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov < > kirpic...@google.com> wrote: > > > > >>>>>>>>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau < > hol...@pigscanfly.ca> wrote: > > >>>>>>>>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov < > kirpic...@google.com> wrote: > > >>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau < > hol...@pigscanfly.ca> wrote: > > >>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov < > kirpic...@google.com> wrote: > > >>>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau < > hol...@pigscanfly.ca> wrote: > > >>>>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov < > kirpic...@google.com> wrote: > > >>>>>>>>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk > for Spark runner streaming. Holden, is it correct that Spark appears to > have no way at all to produce an infinite DStream from a finite RDD? Maybe > we can somehow dynamically create a new DStream for every initial > restriction, said DStream being obtained using a Receiver that under the > hood actually runs the SDF? (this is of course less efficient than a > timer-capable runner would do, and I have doubts about the fault tolerance) > > >>>>>>>>>>>>>>>>> So on the streaming side we could simply do it with a > fixed number of levels on DStreams. It’s not great but it would work. > > >>>>>>>>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF > demands of the runner. Imagine the following case: a file contains a list > of "master" Kafka topics, on which there are published additional Kafka > topics to read. > > >>>>>>>>>>>>>>>> PCollection<String> masterTopics = > TextIO.read().from(masterTopicsFile) > >>>>>>>>>>>>>>>> PCollection<String> nestedTopics = > masterTopics.apply(ParDo(ReadFromKafkaFn)) > >>>>>>>>>>>>>>>> PCollection<String> records = > nestedTopics.apply(ParDo(ReadFromKafkaFn)) > > >>>>>>>>>>>>>>>> This exemplifies both use cases of a streaming SDF that > emits infinite output for every input: > >>>>>>>>>>>>>>>> - Applying it to a finite set of inputs (in this case to > the result of reading a text file) > >>>>>>>>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an > unbounded number of streams being read concurrently, each of the streams > themselves is unbounded too) > > >>>>>>>>>>>>>>>> Does the multi-level solution you have in mind work for > this case? I suppose the second case is harder, so we can focus on that. > > >>>>>>>>>>>>>>> So none of those are a splittabledofn right? > > >>>>>>>>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is > a splittable DoFn and we're trying to figure out how to make Spark run it. > > > >>>>>>>>>>>>> Ah ok, sorry I saw that and for some reason parsed them as > old style DoFns in my head. > > >>>>>>>>>>>>> To effectively allow us to union back into the “same” DStream > we’d have to end up using Sparks queue streams (or their equivalent > custom > source because of some queue stream limitations), which invites some > reliability challenges. This might be at the point where I should send a > diagram/some sample code since it’s a bit convoluted. > > >>>>>>>>>>>>> The more I think about the jumps required to make the > “simple” union approach work, the more it seems just using the statemapping > for steaming is probably more reasonable. Although the state tracking in > Spark can be somewhat expensive so it would probably make sense to > benchmark to see if it meets our needs. > > >>>>>>>>>>>> So the problem is, I don't think this can be made to work > using mapWithState. It doesn't allow a mapping function that emits infinite > output for an input element, directly or not. > > >>>>>>>>>>> So, provided there is an infinite input (eg pick a never ending > queue stream), and each call produces a finite output, we would have an > infinite number of calls. > > > >>>>>>>>>>>> Dataflow and Flink, for example, had timer support even before > SDFs, and a timer can set another timer and thus end up doing an infinite > amount of work in a fault tolerant way - so SDF could be implemented on top > of that. But AFAIK spark doesn't have a similar feature, hence my concern. > > >>>>>>>>>>> So we can do an inifinite queue stream which would allow us to > be triggered at each interval and handle our own persistence. > > > > >>>>>>>>>>>>> But these still are both DStream based rather than Dataset > which we might want to support (depends on what direction folks take with > the runners). > > >>>>>>>>>>>>> If we wanted to do this in the dataset world looking at a > custom sink/source would also be an option, (which is effectively what a > custom queue stream like thing for dstreams requires), but the datasource > APIs are a bit influx so if we ended up doing things at the edge of what’s > allowed there’s a good chance we’d have to rewrite it a few times. > > > >>>>>>>>>>>>>>> Assuming that we have a given dstream though in Spark we > can get the underlying RDD implementation for each microbatch and do our > work inside of that. > > > > > >>>>>>>>>>>>>>>>> More generally this does raise an important question if > we want to target datasets instead of rdds/DStreams in which case i would > need to do some more poking. > > > >>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax < > re...@google.com> wrote: > > >>>>>>>>>>>>>>>>>>> How would timers be implemented? By outputing and > reprocessing, the same way you proposed for SDF? > > >>>>>>>>>>>>>>>>> i mean the timers could be inside the mappers within the > system. Could use a singleton so if a partition is re-executed it doesn’t > end up as a straggler. > > > > >>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau < > hol...@pigscanfly.ca> wrote: > > >>>>>>>>>>>>>>>>>>>> So the timers would have to be in our own code. > > >>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov < > kirpic...@google.com> wrote: > > >>>>>>>>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has > support for state) > > >>>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax < > re...@google.com> wrote: > > >>>>>>>>>>>>>>>>>>>>>> Could we alternatively use a state mapping function > to keep track of the computation so far instead of outputting V each time? > (also the progress so far is probably of a different type R rather than V). > > > >>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau < > hol...@pigscanfly.ca> wrote: > > >>>>>>>>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to > add something like SplittableDoFns to Spark. I'd done some sketchy thinking > about this last year but didn't get very far. > > >>>>>>>>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows: > >>>>>>>>>>>>>>>>>>>>>>> For input type T > >>>>>>>>>>>>>>>>>>>>>>> Output type V > > >>>>>>>>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V) > >>>>>>>>>>>>>>>>>>>>>>> and if the computation finishes T will be populated > otherwise V will be > > >>>>>>>>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K > seconds or listen for a signal on a port > > >>>>>>>>>>>>>>>>>>>>>>> Once we're done running we take the result and > filter for the ones with T and V into seperate collections re-run until > finished > >>>>>>>>>>>>>>>>>>>>>>> and then union the results > > > >>>>>>>>>>>>>>>>>>>>>>> This is maybe not a great design but it was > minimally complicated and I figured terrible was a good place to start and > improve from. > > > >>>>>>>>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts > where this is worse than I remember because its been awhile since I thought > about this. > > > >>>>>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau > > >>>>>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau > > >>>>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau > > >>>>>>>>>>>>>>> -- > >>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau > > >>>>>>>>>>>>> -- > >>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau > > >>>>>>>>>>> -- > >>>>>>>>>>> Twitter: https://twitter.com/holdenkarau > > > > > > >>>>>>>> -- > >>>>>>>> Twitter: https://twitter.com/holdenkarau > > > > > >> -- > >> Twitter: https://twitter.com/holdenkarau >