Could we do this behind the scenes by writing a Receiver that publishes
periodic pings?

On Tue, Apr 24, 2018 at 10:09 PM Eugene Kirpichov <kirpic...@google.com>
wrote:

> Kenn - I'm arguing that in Spark SDF style computation can not be
> expressed at all, and neither can Beam's timers.
>
> Spark, unlike Flink, does not have a timer facility (only state), and as
> far as I can tell its programming model has no other primitive that can map
> a finite RDD into an infinite DStream - the only way to create a new
> infinite DStream appears to be to write a Receiver.
>
> I cc'd you because I'm wondering whether you've already investigated this
> when considering whether timers can be implemented on the Spark runner.
>
> On Tue, Apr 24, 2018 at 2:53 PM Kenneth Knowles <k...@google.com> wrote:
>
>> I don't think I understand what the limitations of timers are that you
>> are referring to. FWIW I would say implementing other primitives like SDF
>> is an explicit non-goal for Beam state & timers.
>>
>> I got lost at some point in this thread, but is it actually necessary
>> that a bounded PCollection maps to a finite/bounded structure in Spark?
>> Skimming, I'm not sure if the problem is that we can't transliterate Beam
>> to Spark (this might be a good sign) or that we can't express SDF style
>> computation at all (seems far-fetched, but I could be convinced). Does
>> doing a lightweight analysis and just promoting some things to be some kind
>> of infinite representation help?
>>
>> Kenn
>>
>> On Tue, Apr 24, 2018 at 2:37 PM Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>>
>>> Would like to revive this thread one more time.
>>>
>>> At this point I'm pretty certain that Spark can't support this out of
>>> the box and we're gonna have to make changes to Spark.
>>>
>>> Holden, could you advise who would be some Spark experts (yourself
>>> included :) ) who could advise what kind of Spark change would both support
>>> this AND be useful to the regular Spark community (non-Beam) so that it has
>>> a chance of finding support? E.g. is there any plan in Spark regarding
>>> adding timers similar to Flink's or Beam's timers, maybe we could help out
>>> with that?
>>>
>>> +Kenneth Knowles <k...@google.com> because timers suffer from the same
>>> problem.
>>>
>>> On Thu, Apr 12, 2018 at 2:28 PM Eugene Kirpichov <kirpic...@google.com>
>>> wrote:
>>>
>>>> (resurrecting thread as I'm back from leave)
>>>>
>>>> I looked at this mode, and indeed as Reuven points out it seems that it
>>>> affects execution details, but doesn't offer any new APIs.
>>>> Holden - your suggestions of piggybacking an unbounded-per-element SDF
>>>> on top of an infinite stream would work if 1) there was just 1 element and
>>>> 2) the work was guaranteed to be infinite.
>>>>
>>>> Unfortunately, both of these assumptions are insufficient. In
>>>> particular:
>>>>
>>>> - 1: The SDF is applied to a PCollection; the PCollection itself may be
>>>> unbounded; and the unbounded work done by the SDF happens for every
>>>> element. E.g. we might have a Kafka topic on which names of Kafka topics
>>>> arrive, and we may end up concurrently reading a continuously growing
>>>> number of topics.
>>>> - 2: The work per element is not necessarily infinite, it's just *not
>>>> guaranteed to be finite* - the SDF is allowed at any moment to say
>>>> "Okay, this restriction is done for real" by returning stop() from the
>>>> @ProcessElement method. Continuing the Kafka example, e.g., it could do
>>>> that if the topic/partition being watched is deleted. Having an infinite
>>>> stream as a driver of this process would require being able to send a
>>>> signal to the stream to stop itself.
>>>>
>>>> Is it looking like there's any other way this can be done in Spark
>>>> as-is, or are we going to have to make changes to Spark to support this?
>>>>
>>>> On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <hol...@pigscanfly.ca>
>>>> wrote:
>>>>
>>>>> I mean the new mode is very much in the Dataset not the DStream API
>>>>> (although you can use the Dataset API with the old modes too).
>>>>>
>>>>> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> But this new mode isn't a semantic change, right? It's moving away
>>>>>> from micro batches into something that looks a lot like what Flink does -
>>>>>> continuous processing with asynchronous snapshot boundaries.
>>>>>>
>>>>>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <t...@apache.org> wrote:
>>>>>>
>>>>>>> Hopefully the new "continuous processing mode" in Spark will enable
>>>>>>> SDF implementation (and real streaming)?
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Thomas
>>>>>>>
>>>>>>>
>>>>>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <hol...@pigscanfly.ca>
>>>>>>> wrote:
>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <
>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <hol...@pigscanfly.ca>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <
>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <
>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
>>>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <
>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for
>>>>>>>>>>>>>>> Spark runner streaming. Holden, is it correct that Spark 
>>>>>>>>>>>>>>> appears to have no
>>>>>>>>>>>>>>> way at all to produce an infinite DStream from a finite RDD? 
>>>>>>>>>>>>>>> Maybe we can
>>>>>>>>>>>>>>> somehow dynamically create a new DStream for every initial 
>>>>>>>>>>>>>>> restriction,
>>>>>>>>>>>>>>> said DStream being obtained using a Receiver that under the 
>>>>>>>>>>>>>>> hood actually
>>>>>>>>>>>>>>> runs the SDF? (this is of course less efficient than a 
>>>>>>>>>>>>>>> timer-capable runner
>>>>>>>>>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> So on the streaming side we could simply do it with a fixed
>>>>>>>>>>>>>> number of levels on DStreams. It’s not great but it would work.
>>>>>>>>>>>>>>
>>>>>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF
>>>>>>>>>>>>> demands of the runner. Imagine the following case: a file 
>>>>>>>>>>>>> contains a list
>>>>>>>>>>>>> of "master" Kafka topics, on which there are published additional 
>>>>>>>>>>>>> Kafka
>>>>>>>>>>>>> topics to read.
>>>>>>>>>>>>>
>>>>>>>>>>>>> PCollection<String> masterTopics =
>>>>>>>>>>>>> TextIO.read().from(masterTopicsFile)
>>>>>>>>>>>>> PCollection<String> nestedTopics =
>>>>>>>>>>>>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>>>>> PCollection<String> records =
>>>>>>>>>>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>>>>>>>>>
>>>>>>>>>>>>> This exemplifies both use cases of a streaming SDF that emits
>>>>>>>>>>>>> infinite output for every input:
>>>>>>>>>>>>> - Applying it to a finite set of inputs (in this case to the
>>>>>>>>>>>>> result of reading a text file)
>>>>>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an
>>>>>>>>>>>>> unbounded number of streams being read concurrently, each of the 
>>>>>>>>>>>>> streams
>>>>>>>>>>>>> themselves is unbounded too)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Does the multi-level solution you have in mind work for this
>>>>>>>>>>>>> case? I suppose the second case is harder, so we can focus on 
>>>>>>>>>>>>> that.
>>>>>>>>>>>>>
>>>>>>>>>>>> So none of those are a splittabledofn right?
>>>>>>>>>>>>
>>>>>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>>>>>>>>>> splittable DoFn and we're trying to figure out how to make Spark 
>>>>>>>>>>> run it.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>> Ah ok, sorry I saw that and for some reason parsed them as old
>>>>>>>>>> style DoFns in my head.
>>>>>>>>>>
>>>>>>>>>> To effectively allow us to union back into the “same” DStream
>>>>>>>>>>  we’d have to end up using Sparks queue streams (or their equivalent 
>>>>>>>>>> custom
>>>>>>>>>> source because of some queue stream limitations), which invites some
>>>>>>>>>> reliability challenges. This might be at the point where I should 
>>>>>>>>>> send a
>>>>>>>>>> diagram/some sample code since it’s a bit convoluted.
>>>>>>>>>>
>>>>>>>>>> The more I think about the jumps required to make the “simple”
>>>>>>>>>> union approach work, the more it seems just using the statemapping 
>>>>>>>>>> for
>>>>>>>>>> steaming is probably more reasonable. Although the state tracking in 
>>>>>>>>>> Spark
>>>>>>>>>> can be somewhat expensive so it would probably make sense to 
>>>>>>>>>> benchmark to
>>>>>>>>>> see if it meets our needs.
>>>>>>>>>>
>>>>>>>>> So the problem is, I don't think this can be made to work using
>>>>>>>>> mapWithState. It doesn't allow a mapping function that emits infinite
>>>>>>>>> output for an input element, directly or not.
>>>>>>>>>
>>>>>>>> So, provided there is an infinite input (eg pick a never ending
>>>>>>>> queue stream), and each call produces a finite output, we would have an
>>>>>>>> infinite number of calls.
>>>>>>>>
>>>>>>>>>
>>>>>>>>> Dataflow and Flink, for example, had timer support even before
>>>>>>>>> SDFs, and a timer can set another timer and thus end up doing an 
>>>>>>>>> infinite
>>>>>>>>> amount of work in a fault tolerant way - so SDF could be implemented 
>>>>>>>>> on top
>>>>>>>>> of that. But AFAIK spark doesn't have a similar feature, hence my 
>>>>>>>>> concern.
>>>>>>>>>
>>>>>>>> So we can do an inifinite queue stream which would allow us to be
>>>>>>>> triggered at each interval and handle our own persistence.
>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>> But these still are both DStream based rather than Dataset which
>>>>>>>>>> we might want to support (depends on what direction folks take with 
>>>>>>>>>> the
>>>>>>>>>> runners).
>>>>>>>>>>
>>>>>>>>>> If we wanted to do this in the dataset world looking at a custom
>>>>>>>>>> sink/source would also be an option, (which is effectively what a 
>>>>>>>>>> custom
>>>>>>>>>> queue stream like thing for dstreams requires), but the datasource 
>>>>>>>>>> APIs are
>>>>>>>>>> a bit influx so if we ended up doing things at the edge of what’s 
>>>>>>>>>> allowed
>>>>>>>>>> there’s a good chance we’d have to rewrite it a few times.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>>> Assuming that we have a given dstream though in Spark we can
>>>>>>>>>>>> get the underlying RDD implementation for each microbatch and do 
>>>>>>>>>>>> our work
>>>>>>>>>>>> inside of that.
>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> More generally this does raise an important question if we
>>>>>>>>>>>>>> want to target datasets instead of rdds/DStreams in which case i 
>>>>>>>>>>>>>> would need
>>>>>>>>>>>>>> to do some more poking.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <
>>>>>>>>>>>>>>> re...@google.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> How would timers be implemented? By outputing and
>>>>>>>>>>>>>>>> reprocessing, the same way you proposed for SDF?
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> i mean the timers could be inside the mappers within the
>>>>>>>>>>>>>> system. Could use a singleton so if a partition is re-executed 
>>>>>>>>>>>>>> it doesn’t
>>>>>>>>>>>>>> end up as a straggler.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
>>>>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has
>>>>>>>>>>>>>>>>>> support for state)
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <
>>>>>>>>>>>>>>>>>> re...@google.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Could we alternatively use a state mapping function to
>>>>>>>>>>>>>>>>>>> keep track of the computation so far instead of outputting 
>>>>>>>>>>>>>>>>>>> V each time?
>>>>>>>>>>>>>>>>>>> (also the progress so far is probably of a different type R 
>>>>>>>>>>>>>>>>>>> rather than V).
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to add
>>>>>>>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some 
>>>>>>>>>>>>>>>>>>>> sketchy thinking
>>>>>>>>>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>>>>>>>>>> For input type T
>>>>>>>>>>>>>>>>>>>> Output type V
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>>>>>>>>>> and if the computation finishes T will be populated
>>>>>>>>>>>>>>>>>>>> otherwise V will be
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K
>>>>>>>>>>>>>>>>>>>> seconds or listen for a signal on a port
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Once we're done running we take the result and filter
>>>>>>>>>>>>>>>>>>>> for the ones with T and V into seperate collections re-run 
>>>>>>>>>>>>>>>>>>>> until finished
>>>>>>>>>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>>>>>>>>>> complicated and I figured terrible was a good place to 
>>>>>>>>>>>>>>>>>>>> start and improve
>>>>>>>>>>>>>>>>>>>> from.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where
>>>>>>>>>>>>>>>>>>>> this is worse than I remember because its been awhile 
>>>>>>>>>>>>>>>>>>>> since I thought about
>>>>>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>
>>>>>>>>> --
>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>
>>>>

Reply via email to