On Fri, Apr 27, 2018 at 12:06 PM Robert Bradshaw <rober...@google.com>
wrote:

> On Fri, Apr 27, 2018 at 11:56 AM Kenneth Knowles <k...@google.com> wrote:
>
> > I'm still pretty shallow on this topic & this thread, so forgive if I'm
> restating or missing things.
>
> > My understanding is that the Spark runner does support Beam's triggering
> semantics for unbounded aggregations, using the same support code from
> runners/core that all runners use. Relevant code in SparkTimerInternals and
> SparkGroupAlsoByWindowViaWindowSet.
>
> > IIRC timers are stored in state, scanned each microbatch to see which are
> eligible.
>
> I think the issue (which is more severe in the case of sources) is what to
> do if no more date comes in to trigger another microbatch.
>

So will a streaming pipeline fail to trigger in this case? I have this
feeling the "join with an infinite stream of pings" might already be
happening.

Kenn



> > I don't see an immediate barrier to having timer loops. I don't know
> about performance of this approach, but currently the number of timers per
> shard (key+window) is bounded by their declarations in code, so it is a
> tiny number unless codegenerated. We do later want to have dynamic timers
> (some people call it a TimerMap by analogy with MapState) but I haven't
> seen a design or even a sketch that I can recall.
>
> > Kenn
>
> > On Thu, Apr 26, 2018 at 1:48 PM Holden Karau <hol...@pigscanfly.ca>
> wrote:
>
> >> Yeah that's been the implied source of being able to be continuous, you
> union with a receiver which produce an infinite number of batches (the
> "never ending queue stream" but not actually a queuestream since they have
> some limitations but our own implementation there of).
>
> >> On Tue, Apr 24, 2018 at 11:54 PM, Reuven Lax <re...@google.com> wrote:
>
> >>> Could we do this behind the scenes by writing a Receiver that publishes
> periodic pings?
>
> >>> On Tue, Apr 24, 2018 at 10:09 PM Eugene Kirpichov <
> kirpic...@google.com>
> wrote:
>
> >>>> Kenn - I'm arguing that in Spark SDF style computation can not be
> expressed at all, and neither can Beam's timers.
>
> >>>> Spark, unlike Flink, does not have a timer facility (only state), and
> as far as I can tell its programming model has no other primitive that can
> map a finite RDD into an infinite DStream - the only way to create a new
> infinite DStream appears to be to write a Receiver.
>
> >>>> I cc'd you because I'm wondering whether you've already investigated
> this when considering whether timers can be implemented on the Spark
> runner.
>
> >>>> On Tue, Apr 24, 2018 at 2:53 PM Kenneth Knowles <k...@google.com>
> wrote:
>
> >>>>> I don't think I understand what the limitations of timers are that
> you are referring to. FWIW I would say implementing other primitives like
> SDF is an explicit non-goal for Beam state & timers.
>
> >>>>> I got lost at some point in this thread, but is it actually necessary
> that a bounded PCollection maps to a finite/bounded structure in Spark?
> Skimming, I'm not sure if the problem is that we can't transliterate Beam
> to Spark (this might be a good sign) or that we can't express SDF style
> computation at all (seems far-fetched, but I could be convinced). Does
> doing a lightweight analysis and just promoting some things to be some kind
> of infinite representation help?
>
> >>>>> Kenn
>
> >>>>> On Tue, Apr 24, 2018 at 2:37 PM Eugene Kirpichov
> >>>>> <kirpic...@google.com>
> wrote:
>
> >>>>>> Would like to revive this thread one more time.
>
> >>>>>> At this point I'm pretty certain that Spark can't support this out
> of the box and we're gonna have to make changes to Spark.
>
> >>>>>> Holden, could you advise who would be some Spark experts (yourself
> included :) ) who could advise what kind of Spark change would both support
> this AND be useful to the regular Spark community (non-Beam) so that it has
> a chance of finding support? E.g. is there any plan in Spark regarding
> adding timers similar to Flink's or Beam's timers, maybe we could help out
> with that?
>
> >>>>>> +Kenneth Knowles because timers suffer from the same problem.
>
> >>>>>> On Thu, Apr 12, 2018 at 2:28 PM Eugene Kirpichov <
> kirpic...@google.com> wrote:
>
> >>>>>>> (resurrecting thread as I'm back from leave)
>
> >>>>>>> I looked at this mode, and indeed as Reuven points out it seems
> that it affects execution details, but doesn't offer any new APIs.
> >>>>>>> Holden - your suggestions of piggybacking an unbounded-per-element
> SDF on top of an infinite stream would work if 1) there was just 1 element
> and 2) the work was guaranteed to be infinite.
>
> >>>>>>> Unfortunately, both of these assumptions are insufficient. In
> particular:
>
> >>>>>>> - 1: The SDF is applied to a PCollection; the PCollection itself
> may be unbounded; and the unbounded work done by the SDF happens for every
> element. E.g. we might have a Kafka topic on which names of Kafka topics
> arrive, and we may end up concurrently reading a continuously growing
> number of topics.
> >>>>>>> - 2: The work per element is not necessarily infinite, it's just
> not guaranteed to be finite - the SDF is allowed at any moment to say
> "Okay, this restriction is done for real" by returning stop() from the
> @ProcessElement method. Continuing the Kafka example, e.g., it could do
> that if the topic/partition being watched is deleted. Having an infinite
> stream as a driver of this process would require being able to send a
> signal to the stream to stop itself.
>
> >>>>>>> Is it looking like there's any other way this can be done in Spark
> as-is, or are we going to have to make changes to Spark to support this?
>
> >>>>>>> On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <hol...@pigscanfly.ca
> >
> wrote:
>
> >>>>>>>> I mean the new mode is very much in the Dataset not the DStream
> API (although you can use the Dataset API with the old modes too).
>
> >>>>>>>> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com>
> wrote:
>
> >>>>>>>>> But this new mode isn't a semantic change, right? It's moving
> away from micro batches into something that looks a lot like what Flink
> does - continuous processing with asynchronous snapshot boundaries.
>
> >>>>>>>>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <t...@apache.org>
> wrote:
>
> >>>>>>>>>> Hopefully the new "continuous processing mode" in Spark will
> enable SDF implementation (and real streaming)?
>
> >>>>>>>>>> Thanks,
> >>>>>>>>>> Thomas
>
>
> >>>>>>>>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <
> hol...@pigscanfly.ca> wrote:
>
>
> >>>>>>>>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <
> kirpic...@google.com> wrote:
>
>
>
> >>>>>>>>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <
> hol...@pigscanfly.ca> wrote:
>
> >>>>>>>>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <
> kirpic...@google.com> wrote:
>
> >>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <
> hol...@pigscanfly.ca> wrote:
>
> >>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
> kirpic...@google.com> wrote:
>
> >>>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <
> hol...@pigscanfly.ca> wrote:
>
> >>>>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
> kirpic...@google.com> wrote:
>
> >>>>>>>>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk
> for Spark runner streaming. Holden, is it correct that Spark appears to
> have no way at all to produce an infinite DStream from a finite RDD? Maybe
> we can somehow dynamically create a new DStream for every initial
> restriction, said DStream being obtained using a Receiver that under the
> hood actually runs the SDF? (this is of course less efficient than a
> timer-capable runner would do, and I have doubts about the fault tolerance)
>
> >>>>>>>>>>>>>>>>> So on the streaming side we could simply do it with a
> fixed number of levels on DStreams. It’s not great but it would work.
>
> >>>>>>>>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF
> demands of the runner. Imagine the following case: a file contains a list
> of "master" Kafka topics, on which there are published additional Kafka
> topics to read.
>
> >>>>>>>>>>>>>>>> PCollection<String> masterTopics =
> TextIO.read().from(masterTopicsFile)
> >>>>>>>>>>>>>>>> PCollection<String> nestedTopics =
> masterTopics.apply(ParDo(ReadFromKafkaFn))
> >>>>>>>>>>>>>>>> PCollection<String> records =
> nestedTopics.apply(ParDo(ReadFromKafkaFn))
>
> >>>>>>>>>>>>>>>> This exemplifies both use cases of a streaming SDF that
> emits infinite output for every input:
> >>>>>>>>>>>>>>>> - Applying it to a finite set of inputs (in this case to
> the result of reading a text file)
> >>>>>>>>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an
> unbounded number of streams being read concurrently, each of the streams
> themselves is unbounded too)
>
> >>>>>>>>>>>>>>>> Does the multi-level solution you have in mind work for
> this case? I suppose the second case is harder, so we can focus on that.
>
> >>>>>>>>>>>>>>> So none of those are a splittabledofn right?
>
> >>>>>>>>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is
> a splittable DoFn and we're trying to figure out how to make Spark run it.
>
>
> >>>>>>>>>>>>> Ah ok, sorry I saw that and for some reason parsed them as
> old style DoFns in my head.
>
> >>>>>>>>>>>>> To effectively allow us to union back into the “same” DStream
>   we’d have to end up using Sparks queue streams (or their equivalent
> custom
> source because of some queue stream limitations), which invites some
> reliability challenges. This might be at the point where I should send a
> diagram/some sample code since it’s a bit convoluted.
>
> >>>>>>>>>>>>> The more I think about the jumps required to make the
> “simple” union approach work, the more it seems just using the statemapping
> for steaming is probably more reasonable. Although the state tracking in
> Spark can be somewhat expensive so it would probably make sense to
> benchmark to see if it meets our needs.
>
> >>>>>>>>>>>> So the problem is, I don't think this can be made to work
> using mapWithState. It doesn't allow a mapping function that emits infinite
> output for an input element, directly or not.
>
> >>>>>>>>>>> So, provided there is an infinite input (eg pick a never ending
> queue stream), and each call produces a finite output, we would have an
> infinite number of calls.
>
>
> >>>>>>>>>>>> Dataflow and Flink, for example, had timer support even before
> SDFs, and a timer can set another timer and thus end up doing an infinite
> amount of work in a fault tolerant way - so SDF could be implemented on top
> of that. But AFAIK spark doesn't have a similar feature, hence my concern.
>
> >>>>>>>>>>> So we can do an inifinite queue stream which would allow us to
> be triggered at each interval and handle our own persistence.
>
>
>
> >>>>>>>>>>>>> But these still are both DStream based rather than Dataset
> which we might want to support (depends on what direction folks take with
> the runners).
>
> >>>>>>>>>>>>> If we wanted to do this in the dataset world looking at a
> custom sink/source would also be an option, (which is effectively what a
> custom queue stream like thing for dstreams requires), but the datasource
> APIs are a bit influx so if we ended up doing things at the edge of what’s
> allowed there’s a good chance we’d have to rewrite it a few times.
>
>
> >>>>>>>>>>>>>>> Assuming that we have a given dstream though in Spark we
> can get the underlying RDD implementation for each microbatch and do our
> work inside of that.
>
>
>
>
> >>>>>>>>>>>>>>>>> More generally this does raise an important question if
> we want to target datasets instead of rdds/DStreams in which case i would
> need to do some more poking.
>
>
> >>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <
> re...@google.com> wrote:
>
> >>>>>>>>>>>>>>>>>>> How would timers be implemented? By outputing and
> reprocessing, the same way you proposed for SDF?
>
> >>>>>>>>>>>>>>>>> i mean the timers could be inside the mappers within the
> system. Could use a singleton so if a partition is re-executed it doesn’t
> end up as a straggler.
>
>
>
> >>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
> hol...@pigscanfly.ca> wrote:
>
> >>>>>>>>>>>>>>>>>>>> So the timers would have to be in our own code.
>
> >>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
> kirpic...@google.com> wrote:
>
> >>>>>>>>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has
> support for state)
>
> >>>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <
> re...@google.com> wrote:
>
> >>>>>>>>>>>>>>>>>>>>>> Could we alternatively use a state mapping function
> to keep track of the computation so far instead of outputting V each time?
> (also the progress so far is probably of a different type R rather than V).
>
>
> >>>>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
> hol...@pigscanfly.ca> wrote:
>
> >>>>>>>>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to
> add something like SplittableDoFns to Spark. I'd done some sketchy thinking
> about this last year but didn't get very far.
>
> >>>>>>>>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
> >>>>>>>>>>>>>>>>>>>>>>> For input type T
> >>>>>>>>>>>>>>>>>>>>>>> Output type V
>
> >>>>>>>>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
> >>>>>>>>>>>>>>>>>>>>>>> and if the computation finishes T will be populated
> otherwise V will be
>
> >>>>>>>>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K
> seconds or listen for a signal on a port
>
> >>>>>>>>>>>>>>>>>>>>>>> Once we're done running we take the result and
> filter for the ones with T and V into seperate collections re-run until
> finished
> >>>>>>>>>>>>>>>>>>>>>>> and then union the results
>
>
> >>>>>>>>>>>>>>>>>>>>>>> This is maybe not a great design but it was
> minimally complicated and I figured terrible was a good place to start and
> improve from.
>
>
> >>>>>>>>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts
> where this is worse than I remember because its been awhile since I thought
> about this.
>
>
> >>>>>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>
> >>>>>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>
> >>>>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>
> >>>>>>>>>>>>>>> --
> >>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>
> >>>>>>>>>>>>> --
> >>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>
> >>>>>>>>>>> --
> >>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>
>
>
>
>
> >>>>>>>> --
> >>>>>>>> Twitter: https://twitter.com/holdenkarau
>
>
>
>
> >> --
> >> Twitter: https://twitter.com/holdenkarau
>

Reply via email to