I don't think I understand what the limitations of timers are that you are
referring to. FWIW I would say implementing other primitives like SDF is an
explicit non-goal for Beam state & timers.

I got lost at some point in this thread, but is it actually necessary that
a bounded PCollection maps to a finite/bounded structure in Spark?
Skimming, I'm not sure if the problem is that we can't transliterate Beam
to Spark (this might be a good sign) or that we can't express SDF style
computation at all (seems far-fetched, but I could be convinced). Does
doing a lightweight analysis and just promoting some things to be some kind
of infinite representation help?

Kenn

On Tue, Apr 24, 2018 at 2:37 PM Eugene Kirpichov <kirpic...@google.com>
wrote:

> Would like to revive this thread one more time.
>
> At this point I'm pretty certain that Spark can't support this out of the
> box and we're gonna have to make changes to Spark.
>
> Holden, could you advise who would be some Spark experts (yourself
> included :) ) who could advise what kind of Spark change would both support
> this AND be useful to the regular Spark community (non-Beam) so that it has
> a chance of finding support? E.g. is there any plan in Spark regarding
> adding timers similar to Flink's or Beam's timers, maybe we could help out
> with that?
>
> +Kenneth Knowles <k...@google.com> because timers suffer from the same
> problem.
>
> On Thu, Apr 12, 2018 at 2:28 PM Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
>> (resurrecting thread as I'm back from leave)
>>
>> I looked at this mode, and indeed as Reuven points out it seems that it
>> affects execution details, but doesn't offer any new APIs.
>> Holden - your suggestions of piggybacking an unbounded-per-element SDF on
>> top of an infinite stream would work if 1) there was just 1 element and 2)
>> the work was guaranteed to be infinite.
>>
>> Unfortunately, both of these assumptions are insufficient. In particular:
>>
>> - 1: The SDF is applied to a PCollection; the PCollection itself may be
>> unbounded; and the unbounded work done by the SDF happens for every
>> element. E.g. we might have a Kafka topic on which names of Kafka topics
>> arrive, and we may end up concurrently reading a continuously growing
>> number of topics.
>> - 2: The work per element is not necessarily infinite, it's just *not
>> guaranteed to be finite* - the SDF is allowed at any moment to say
>> "Okay, this restriction is done for real" by returning stop() from the
>> @ProcessElement method. Continuing the Kafka example, e.g., it could do
>> that if the topic/partition being watched is deleted. Having an infinite
>> stream as a driver of this process would require being able to send a
>> signal to the stream to stop itself.
>>
>> Is it looking like there's any other way this can be done in Spark as-is,
>> or are we going to have to make changes to Spark to support this?
>>
>> On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <hol...@pigscanfly.ca>
>> wrote:
>>
>>> I mean the new mode is very much in the Dataset not the DStream API
>>> (although you can use the Dataset API with the old modes too).
>>>
>>> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com> wrote:
>>>
>>>> But this new mode isn't a semantic change, right? It's moving away from
>>>> micro batches into something that looks a lot like what Flink does -
>>>> continuous processing with asynchronous snapshot boundaries.
>>>>
>>>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <t...@apache.org> wrote:
>>>>
>>>>> Hopefully the new "continuous processing mode" in Spark will enable
>>>>> SDF implementation (and real streaming)?
>>>>>
>>>>> Thanks,
>>>>> Thomas
>>>>>
>>>>>
>>>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <hol...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>>
>>>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <
>>>>>> kirpic...@google.com> wrote:
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <hol...@pigscanfly.ca>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <
>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>
>>>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <hol...@pigscanfly.ca>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <
>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for
>>>>>>>>>>>>> Spark runner streaming. Holden, is it correct that Spark appears 
>>>>>>>>>>>>> to have no
>>>>>>>>>>>>> way at all to produce an infinite DStream from a finite RDD? 
>>>>>>>>>>>>> Maybe we can
>>>>>>>>>>>>> somehow dynamically create a new DStream for every initial 
>>>>>>>>>>>>> restriction,
>>>>>>>>>>>>> said DStream being obtained using a Receiver that under the hood 
>>>>>>>>>>>>> actually
>>>>>>>>>>>>> runs the SDF? (this is of course less efficient than a 
>>>>>>>>>>>>> timer-capable runner
>>>>>>>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>>>>>>>
>>>>>>>>>>>> So on the streaming side we could simply do it with a fixed
>>>>>>>>>>>> number of levels on DStreams. It’s not great but it would work.
>>>>>>>>>>>>
>>>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF
>>>>>>>>>>> demands of the runner. Imagine the following case: a file contains 
>>>>>>>>>>> a list
>>>>>>>>>>> of "master" Kafka topics, on which there are published additional 
>>>>>>>>>>> Kafka
>>>>>>>>>>> topics to read.
>>>>>>>>>>>
>>>>>>>>>>> PCollection<String> masterTopics =
>>>>>>>>>>> TextIO.read().from(masterTopicsFile)
>>>>>>>>>>> PCollection<String> nestedTopics =
>>>>>>>>>>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>>> PCollection<String> records =
>>>>>>>>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>>>
>>>>>>>>>>> This exemplifies both use cases of a streaming SDF that emits
>>>>>>>>>>> infinite output for every input:
>>>>>>>>>>> - Applying it to a finite set of inputs (in this case to the
>>>>>>>>>>> result of reading a text file)
>>>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an
>>>>>>>>>>> unbounded number of streams being read concurrently, each of the 
>>>>>>>>>>> streams
>>>>>>>>>>> themselves is unbounded too)
>>>>>>>>>>>
>>>>>>>>>>> Does the multi-level solution you have in mind work for this
>>>>>>>>>>> case? I suppose the second case is harder, so we can focus on that.
>>>>>>>>>>>
>>>>>>>>>> So none of those are a splittabledofn right?
>>>>>>>>>>
>>>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>>>>>>>> splittable DoFn and we're trying to figure out how to make Spark run 
>>>>>>>>> it.
>>>>>>>>>
>>>>>>>>>
>>>>>>>> Ah ok, sorry I saw that and for some reason parsed them as old
>>>>>>>> style DoFns in my head.
>>>>>>>>
>>>>>>>> To effectively allow us to union back into the “same” DStream  we’d
>>>>>>>> have to end up using Sparks queue streams (or their equivalent custom
>>>>>>>> source because of some queue stream limitations), which invites some
>>>>>>>> reliability challenges. This might be at the point where I should send 
>>>>>>>> a
>>>>>>>> diagram/some sample code since it’s a bit convoluted.
>>>>>>>>
>>>>>>>> The more I think about the jumps required to make the “simple”
>>>>>>>> union approach work, the more it seems just using the statemapping for
>>>>>>>> steaming is probably more reasonable. Although the state tracking in 
>>>>>>>> Spark
>>>>>>>> can be somewhat expensive so it would probably make sense to benchmark 
>>>>>>>> to
>>>>>>>> see if it meets our needs.
>>>>>>>>
>>>>>>> So the problem is, I don't think this can be made to work using
>>>>>>> mapWithState. It doesn't allow a mapping function that emits infinite
>>>>>>> output for an input element, directly or not.
>>>>>>>
>>>>>> So, provided there is an infinite input (eg pick a never ending queue
>>>>>> stream), and each call produces a finite output, we would have an 
>>>>>> infinite
>>>>>> number of calls.
>>>>>>
>>>>>>>
>>>>>>> Dataflow and Flink, for example, had timer support even before SDFs,
>>>>>>> and a timer can set another timer and thus end up doing an infinite 
>>>>>>> amount
>>>>>>> of work in a fault tolerant way - so SDF could be implemented on top of
>>>>>>> that. But AFAIK spark doesn't have a similar feature, hence my concern.
>>>>>>>
>>>>>> So we can do an inifinite queue stream which would allow us to be
>>>>>> triggered at each interval and handle our own persistence.
>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>> But these still are both DStream based rather than Dataset which we
>>>>>>>> might want to support (depends on what direction folks take with the
>>>>>>>> runners).
>>>>>>>>
>>>>>>>> If we wanted to do this in the dataset world looking at a custom
>>>>>>>> sink/source would also be an option, (which is effectively what a 
>>>>>>>> custom
>>>>>>>> queue stream like thing for dstreams requires), but the datasource 
>>>>>>>> APIs are
>>>>>>>> a bit influx so if we ended up doing things at the edge of what’s 
>>>>>>>> allowed
>>>>>>>> there’s a good chance we’d have to rewrite it a few times.
>>>>>>>>
>>>>>>>>
>>>>>>>>>> Assuming that we have a given dstream though in Spark we can get
>>>>>>>>>> the underlying RDD implementation for each microbatch and do our work
>>>>>>>>>> inside of that.
>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> More generally this does raise an important question if we want
>>>>>>>>>>>> to target datasets instead of rdds/DStreams in which case i would 
>>>>>>>>>>>> need to
>>>>>>>>>>>> do some more poking.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> How would timers be implemented? By outputing and
>>>>>>>>>>>>>> reprocessing, the same way you proposed for SDF?
>>>>>>>>>>>>>>
>>>>>>>>>>>>> i mean the timers could be inside the mappers within the
>>>>>>>>>>>> system. Could use a singleton so if a partition is re-executed it 
>>>>>>>>>>>> doesn’t
>>>>>>>>>>>> end up as a straggler.
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
>>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has support
>>>>>>>>>>>>>>>> for state)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <
>>>>>>>>>>>>>>>> re...@google.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Could we alternatively use a state mapping function to
>>>>>>>>>>>>>>>>> keep track of the computation so far instead of outputting V 
>>>>>>>>>>>>>>>>> each time?
>>>>>>>>>>>>>>>>> (also the progress so far is probably of a different type R 
>>>>>>>>>>>>>>>>> rather than V).
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to add
>>>>>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some 
>>>>>>>>>>>>>>>>>> sketchy thinking
>>>>>>>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>>>>>>>> For input type T
>>>>>>>>>>>>>>>>>> Output type V
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>>>>>>>> and if the computation finishes T will be populated
>>>>>>>>>>>>>>>>>> otherwise V will be
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K
>>>>>>>>>>>>>>>>>> seconds or listen for a signal on a port
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Once we're done running we take the result and filter for
>>>>>>>>>>>>>>>>>> the ones with T and V into seperate collections re-run until 
>>>>>>>>>>>>>>>>>> finished
>>>>>>>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>>>>>>>> complicated and I figured terrible was a good place to start 
>>>>>>>>>>>>>>>>>> and improve
>>>>>>>>>>>>>>>>>> from.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where
>>>>>>>>>>>>>>>>>> this is worse than I remember because its been awhile since 
>>>>>>>>>>>>>>>>>> I thought about
>>>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>
>>>>>>>>> --
>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>
>>>>>>> --
>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>
>>>>>
>>>>>
>>>
>>>
>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>>
>>

Reply via email to