Kenn - I'm arguing that in Spark SDF style computation can not be expressed
at all, and neither can Beam's timers.

Spark, unlike Flink, does not have a timer facility (only state), and as
far as I can tell its programming model has no other primitive that can map
a finite RDD into an infinite DStream - the only way to create a new
infinite DStream appears to be to write a Receiver.

I cc'd you because I'm wondering whether you've already investigated this
when considering whether timers can be implemented on the Spark runner.

On Tue, Apr 24, 2018 at 2:53 PM Kenneth Knowles <k...@google.com> wrote:

> I don't think I understand what the limitations of timers are that you are
> referring to. FWIW I would say implementing other primitives like SDF is an
> explicit non-goal for Beam state & timers.
>
> I got lost at some point in this thread, but is it actually necessary that
> a bounded PCollection maps to a finite/bounded structure in Spark?
> Skimming, I'm not sure if the problem is that we can't transliterate Beam
> to Spark (this might be a good sign) or that we can't express SDF style
> computation at all (seems far-fetched, but I could be convinced). Does
> doing a lightweight analysis and just promoting some things to be some kind
> of infinite representation help?
>
> Kenn
>
> On Tue, Apr 24, 2018 at 2:37 PM Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
>> Would like to revive this thread one more time.
>>
>> At this point I'm pretty certain that Spark can't support this out of the
>> box and we're gonna have to make changes to Spark.
>>
>> Holden, could you advise who would be some Spark experts (yourself
>> included :) ) who could advise what kind of Spark change would both support
>> this AND be useful to the regular Spark community (non-Beam) so that it has
>> a chance of finding support? E.g. is there any plan in Spark regarding
>> adding timers similar to Flink's or Beam's timers, maybe we could help out
>> with that?
>>
>> +Kenneth Knowles <k...@google.com> because timers suffer from the same
>> problem.
>>
>> On Thu, Apr 12, 2018 at 2:28 PM Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>>
>>> (resurrecting thread as I'm back from leave)
>>>
>>> I looked at this mode, and indeed as Reuven points out it seems that it
>>> affects execution details, but doesn't offer any new APIs.
>>> Holden - your suggestions of piggybacking an unbounded-per-element SDF
>>> on top of an infinite stream would work if 1) there was just 1 element and
>>> 2) the work was guaranteed to be infinite.
>>>
>>> Unfortunately, both of these assumptions are insufficient. In particular:
>>>
>>> - 1: The SDF is applied to a PCollection; the PCollection itself may be
>>> unbounded; and the unbounded work done by the SDF happens for every
>>> element. E.g. we might have a Kafka topic on which names of Kafka topics
>>> arrive, and we may end up concurrently reading a continuously growing
>>> number of topics.
>>> - 2: The work per element is not necessarily infinite, it's just *not
>>> guaranteed to be finite* - the SDF is allowed at any moment to say
>>> "Okay, this restriction is done for real" by returning stop() from the
>>> @ProcessElement method. Continuing the Kafka example, e.g., it could do
>>> that if the topic/partition being watched is deleted. Having an infinite
>>> stream as a driver of this process would require being able to send a
>>> signal to the stream to stop itself.
>>>
>>> Is it looking like there's any other way this can be done in Spark
>>> as-is, or are we going to have to make changes to Spark to support this?
>>>
>>> On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <hol...@pigscanfly.ca>
>>> wrote:
>>>
>>>> I mean the new mode is very much in the Dataset not the DStream API
>>>> (although you can use the Dataset API with the old modes too).
>>>>
>>>> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> But this new mode isn't a semantic change, right? It's moving away
>>>>> from micro batches into something that looks a lot like what Flink does -
>>>>> continuous processing with asynchronous snapshot boundaries.
>>>>>
>>>>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <t...@apache.org> wrote:
>>>>>
>>>>>> Hopefully the new "continuous processing mode" in Spark will enable
>>>>>> SDF implementation (and real streaming)?
>>>>>>
>>>>>> Thanks,
>>>>>> Thomas
>>>>>>
>>>>>>
>>>>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <hol...@pigscanfly.ca>
>>>>>> wrote:
>>>>>>
>>>>>>>
>>>>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <
>>>>>>> kirpic...@google.com> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <hol...@pigscanfly.ca>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <
>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <
>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>
>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
>>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <
>>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for
>>>>>>>>>>>>>> Spark runner streaming. Holden, is it correct that Spark appears 
>>>>>>>>>>>>>> to have no
>>>>>>>>>>>>>> way at all to produce an infinite DStream from a finite RDD? 
>>>>>>>>>>>>>> Maybe we can
>>>>>>>>>>>>>> somehow dynamically create a new DStream for every initial 
>>>>>>>>>>>>>> restriction,
>>>>>>>>>>>>>> said DStream being obtained using a Receiver that under the hood 
>>>>>>>>>>>>>> actually
>>>>>>>>>>>>>> runs the SDF? (this is of course less efficient than a 
>>>>>>>>>>>>>> timer-capable runner
>>>>>>>>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>>>>>>>>
>>>>>>>>>>>>> So on the streaming side we could simply do it with a fixed
>>>>>>>>>>>>> number of levels on DStreams. It’s not great but it would work.
>>>>>>>>>>>>>
>>>>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF
>>>>>>>>>>>> demands of the runner. Imagine the following case: a file contains 
>>>>>>>>>>>> a list
>>>>>>>>>>>> of "master" Kafka topics, on which there are published additional 
>>>>>>>>>>>> Kafka
>>>>>>>>>>>> topics to read.
>>>>>>>>>>>>
>>>>>>>>>>>> PCollection<String> masterTopics =
>>>>>>>>>>>> TextIO.read().from(masterTopicsFile)
>>>>>>>>>>>> PCollection<String> nestedTopics =
>>>>>>>>>>>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>>>> PCollection<String> records =
>>>>>>>>>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>>>>
>>>>>>>>>>>> This exemplifies both use cases of a streaming SDF that emits
>>>>>>>>>>>> infinite output for every input:
>>>>>>>>>>>> - Applying it to a finite set of inputs (in this case to the
>>>>>>>>>>>> result of reading a text file)
>>>>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an
>>>>>>>>>>>> unbounded number of streams being read concurrently, each of the 
>>>>>>>>>>>> streams
>>>>>>>>>>>> themselves is unbounded too)
>>>>>>>>>>>>
>>>>>>>>>>>> Does the multi-level solution you have in mind work for this
>>>>>>>>>>>> case? I suppose the second case is harder, so we can focus on that.
>>>>>>>>>>>>
>>>>>>>>>>> So none of those are a splittabledofn right?
>>>>>>>>>>>
>>>>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>>>>>>>>> splittable DoFn and we're trying to figure out how to make Spark run 
>>>>>>>>>> it.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>> Ah ok, sorry I saw that and for some reason parsed them as old
>>>>>>>>> style DoFns in my head.
>>>>>>>>>
>>>>>>>>> To effectively allow us to union back into the “same” DStream
>>>>>>>>>  we’d have to end up using Sparks queue streams (or their equivalent 
>>>>>>>>> custom
>>>>>>>>> source because of some queue stream limitations), which invites some
>>>>>>>>> reliability challenges. This might be at the point where I should 
>>>>>>>>> send a
>>>>>>>>> diagram/some sample code since it’s a bit convoluted.
>>>>>>>>>
>>>>>>>>> The more I think about the jumps required to make the “simple”
>>>>>>>>> union approach work, the more it seems just using the statemapping for
>>>>>>>>> steaming is probably more reasonable. Although the state tracking in 
>>>>>>>>> Spark
>>>>>>>>> can be somewhat expensive so it would probably make sense to 
>>>>>>>>> benchmark to
>>>>>>>>> see if it meets our needs.
>>>>>>>>>
>>>>>>>> So the problem is, I don't think this can be made to work using
>>>>>>>> mapWithState. It doesn't allow a mapping function that emits infinite
>>>>>>>> output for an input element, directly or not.
>>>>>>>>
>>>>>>> So, provided there is an infinite input (eg pick a never ending
>>>>>>> queue stream), and each call produces a finite output, we would have an
>>>>>>> infinite number of calls.
>>>>>>>
>>>>>>>>
>>>>>>>> Dataflow and Flink, for example, had timer support even before
>>>>>>>> SDFs, and a timer can set another timer and thus end up doing an 
>>>>>>>> infinite
>>>>>>>> amount of work in a fault tolerant way - so SDF could be implemented 
>>>>>>>> on top
>>>>>>>> of that. But AFAIK spark doesn't have a similar feature, hence my 
>>>>>>>> concern.
>>>>>>>>
>>>>>>> So we can do an inifinite queue stream which would allow us to be
>>>>>>> triggered at each interval and handle our own persistence.
>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>> But these still are both DStream based rather than Dataset which
>>>>>>>>> we might want to support (depends on what direction folks take with 
>>>>>>>>> the
>>>>>>>>> runners).
>>>>>>>>>
>>>>>>>>> If we wanted to do this in the dataset world looking at a custom
>>>>>>>>> sink/source would also be an option, (which is effectively what a 
>>>>>>>>> custom
>>>>>>>>> queue stream like thing for dstreams requires), but the datasource 
>>>>>>>>> APIs are
>>>>>>>>> a bit influx so if we ended up doing things at the edge of what’s 
>>>>>>>>> allowed
>>>>>>>>> there’s a good chance we’d have to rewrite it a few times.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>>> Assuming that we have a given dstream though in Spark we can get
>>>>>>>>>>> the underlying RDD implementation for each microbatch and do our 
>>>>>>>>>>> work
>>>>>>>>>>> inside of that.
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> More generally this does raise an important question if we
>>>>>>>>>>>>> want to target datasets instead of rdds/DStreams in which case i 
>>>>>>>>>>>>> would need
>>>>>>>>>>>>> to do some more poking.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> How would timers be implemented? By outputing and
>>>>>>>>>>>>>>> reprocessing, the same way you proposed for SDF?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> i mean the timers could be inside the mappers within the
>>>>>>>>>>>>> system. Could use a singleton so if a partition is re-executed it 
>>>>>>>>>>>>> doesn’t
>>>>>>>>>>>>> end up as a straggler.
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
>>>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has support
>>>>>>>>>>>>>>>>> for state)
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <
>>>>>>>>>>>>>>>>> re...@google.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Could we alternatively use a state mapping function to
>>>>>>>>>>>>>>>>>> keep track of the computation so far instead of outputting V 
>>>>>>>>>>>>>>>>>> each time?
>>>>>>>>>>>>>>>>>> (also the progress so far is probably of a different type R 
>>>>>>>>>>>>>>>>>> rather than V).
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to add
>>>>>>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some 
>>>>>>>>>>>>>>>>>>> sketchy thinking
>>>>>>>>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>>>>>>>>> For input type T
>>>>>>>>>>>>>>>>>>> Output type V
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>>>>>>>>> and if the computation finishes T will be populated
>>>>>>>>>>>>>>>>>>> otherwise V will be
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K
>>>>>>>>>>>>>>>>>>> seconds or listen for a signal on a port
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Once we're done running we take the result and filter
>>>>>>>>>>>>>>>>>>> for the ones with T and V into seperate collections re-run 
>>>>>>>>>>>>>>>>>>> until finished
>>>>>>>>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>>>>>>>>> complicated and I figured terrible was a good place to 
>>>>>>>>>>>>>>>>>>> start and improve
>>>>>>>>>>>>>>>>>>> from.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where
>>>>>>>>>>>>>>>>>>> this is worse than I remember because its been awhile since 
>>>>>>>>>>>>>>>>>>> I thought about
>>>>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>
>>>>>>>> --
>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>
>>>>>>
>>>>>>
>>>>
>>>>
>>>> --
>>>> Twitter: https://twitter.com/holdenkarau
>>>>
>>>

Reply via email to