Yeah that's been the implied source of being able to be continuous, you
union with a receiver which produce an infinite number of batches (the
"never ending queue stream" but not actually a queuestream since they have
some limitations but our own implementation there of).

On Tue, Apr 24, 2018 at 11:54 PM, Reuven Lax <re...@google.com> wrote:

> Could we do this behind the scenes by writing a Receiver that publishes
> periodic pings?
>
> On Tue, Apr 24, 2018 at 10:09 PM Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
>> Kenn - I'm arguing that in Spark SDF style computation can not be
>> expressed at all, and neither can Beam's timers.
>>
>> Spark, unlike Flink, does not have a timer facility (only state), and as
>> far as I can tell its programming model has no other primitive that can map
>> a finite RDD into an infinite DStream - the only way to create a new
>> infinite DStream appears to be to write a Receiver.
>>
>> I cc'd you because I'm wondering whether you've already investigated this
>> when considering whether timers can be implemented on the Spark runner.
>>
>> On Tue, Apr 24, 2018 at 2:53 PM Kenneth Knowles <k...@google.com> wrote:
>>
>>> I don't think I understand what the limitations of timers are that you
>>> are referring to. FWIW I would say implementing other primitives like SDF
>>> is an explicit non-goal for Beam state & timers.
>>>
>>> I got lost at some point in this thread, but is it actually necessary
>>> that a bounded PCollection maps to a finite/bounded structure in Spark?
>>> Skimming, I'm not sure if the problem is that we can't transliterate Beam
>>> to Spark (this might be a good sign) or that we can't express SDF style
>>> computation at all (seems far-fetched, but I could be convinced). Does
>>> doing a lightweight analysis and just promoting some things to be some kind
>>> of infinite representation help?
>>>
>>> Kenn
>>>
>>> On Tue, Apr 24, 2018 at 2:37 PM Eugene Kirpichov <kirpic...@google.com>
>>> wrote:
>>>
>>>> Would like to revive this thread one more time.
>>>>
>>>> At this point I'm pretty certain that Spark can't support this out of
>>>> the box and we're gonna have to make changes to Spark.
>>>>
>>>> Holden, could you advise who would be some Spark experts (yourself
>>>> included :) ) who could advise what kind of Spark change would both support
>>>> this AND be useful to the regular Spark community (non-Beam) so that it has
>>>> a chance of finding support? E.g. is there any plan in Spark regarding
>>>> adding timers similar to Flink's or Beam's timers, maybe we could help out
>>>> with that?
>>>>
>>>> +Kenneth Knowles <k...@google.com> because timers suffer from the same
>>>> problem.
>>>>
>>>> On Thu, Apr 12, 2018 at 2:28 PM Eugene Kirpichov <kirpic...@google.com>
>>>> wrote:
>>>>
>>>>> (resurrecting thread as I'm back from leave)
>>>>>
>>>>> I looked at this mode, and indeed as Reuven points out it seems that
>>>>> it affects execution details, but doesn't offer any new APIs.
>>>>> Holden - your suggestions of piggybacking an unbounded-per-element SDF
>>>>> on top of an infinite stream would work if 1) there was just 1 element and
>>>>> 2) the work was guaranteed to be infinite.
>>>>>
>>>>> Unfortunately, both of these assumptions are insufficient. In
>>>>> particular:
>>>>>
>>>>> - 1: The SDF is applied to a PCollection; the PCollection itself may
>>>>> be unbounded; and the unbounded work done by the SDF happens for every
>>>>> element. E.g. we might have a Kafka topic on which names of Kafka topics
>>>>> arrive, and we may end up concurrently reading a continuously growing
>>>>> number of topics.
>>>>> - 2: The work per element is not necessarily infinite, it's just *not
>>>>> guaranteed to be finite* - the SDF is allowed at any moment to say
>>>>> "Okay, this restriction is done for real" by returning stop() from the
>>>>> @ProcessElement method. Continuing the Kafka example, e.g., it could do
>>>>> that if the topic/partition being watched is deleted. Having an infinite
>>>>> stream as a driver of this process would require being able to send a
>>>>> signal to the stream to stop itself.
>>>>>
>>>>> Is it looking like there's any other way this can be done in Spark
>>>>> as-is, or are we going to have to make changes to Spark to support this?
>>>>>
>>>>> On Sun, Mar 25, 2018 at 9:50 PM Holden Karau <hol...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>> I mean the new mode is very much in the Dataset not the DStream API
>>>>>> (although you can use the Dataset API with the old modes too).
>>>>>>
>>>>>> On Sun, Mar 25, 2018 at 9:11 PM, Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> But this new mode isn't a semantic change, right? It's moving away
>>>>>>> from micro batches into something that looks a lot like what Flink does 
>>>>>>> -
>>>>>>> continuous processing with asynchronous snapshot boundaries.
>>>>>>>
>>>>>>> On Sun, Mar 25, 2018 at 9:01 PM Thomas Weise <t...@apache.org> wrote:
>>>>>>>
>>>>>>>> Hopefully the new "continuous processing mode" in Spark will enable
>>>>>>>> SDF implementation (and real streaming)?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Thomas
>>>>>>>>
>>>>>>>>
>>>>>>>> On Sat, Mar 24, 2018 at 3:22 PM, Holden Karau <hol...@pigscanfly.ca
>>>>>>>> > wrote:
>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <
>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <hol...@pigscanfly.ca>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <
>>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <
>>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <
>>>>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <
>>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Reviving this thread. I think SDF is a pretty big risk for
>>>>>>>>>>>>>>>> Spark runner streaming. Holden, is it correct that Spark 
>>>>>>>>>>>>>>>> appears to have no
>>>>>>>>>>>>>>>> way at all to produce an infinite DStream from a finite RDD? 
>>>>>>>>>>>>>>>> Maybe we can
>>>>>>>>>>>>>>>> somehow dynamically create a new DStream for every initial 
>>>>>>>>>>>>>>>> restriction,
>>>>>>>>>>>>>>>> said DStream being obtained using a Receiver that under the 
>>>>>>>>>>>>>>>> hood actually
>>>>>>>>>>>>>>>> runs the SDF? (this is of course less efficient than a 
>>>>>>>>>>>>>>>> timer-capable runner
>>>>>>>>>>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> So on the streaming side we could simply do it with a fixed
>>>>>>>>>>>>>>> number of levels on DStreams. It’s not great but it would work.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Not sure I understand this. Let me try to clarify what SDF
>>>>>>>>>>>>>> demands of the runner. Imagine the following case: a file 
>>>>>>>>>>>>>> contains a list
>>>>>>>>>>>>>> of "master" Kafka topics, on which there are published 
>>>>>>>>>>>>>> additional Kafka
>>>>>>>>>>>>>> topics to read.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> PCollection<String> masterTopics = TextIO.read().from(
>>>>>>>>>>>>>> masterTopicsFile)
>>>>>>>>>>>>>> PCollection<String> nestedTopics = masterTopics.apply(ParDo(
>>>>>>>>>>>>>> ReadFromKafkaFn))
>>>>>>>>>>>>>> PCollection<String> records = nestedTopics.apply(ParDo(
>>>>>>>>>>>>>> ReadFromKafkaFn))
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> This exemplifies both use cases of a streaming SDF that emits
>>>>>>>>>>>>>> infinite output for every input:
>>>>>>>>>>>>>> - Applying it to a finite set of inputs (in this case to the
>>>>>>>>>>>>>> result of reading a text file)
>>>>>>>>>>>>>> - Applying it to an infinite set of inputs (i.e. having an
>>>>>>>>>>>>>> unbounded number of streams being read concurrently, each of the 
>>>>>>>>>>>>>> streams
>>>>>>>>>>>>>> themselves is unbounded too)
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Does the multi-level solution you have in mind work for this
>>>>>>>>>>>>>> case? I suppose the second case is harder, so we can focus on 
>>>>>>>>>>>>>> that.
>>>>>>>>>>>>>>
>>>>>>>>>>>>> So none of those are a splittabledofn right?
>>>>>>>>>>>>>
>>>>>>>>>>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>>>>>>>>>>> splittable DoFn and we're trying to figure out how to make Spark 
>>>>>>>>>>>> run it.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>> Ah ok, sorry I saw that and for some reason parsed them as old
>>>>>>>>>>> style DoFns in my head.
>>>>>>>>>>>
>>>>>>>>>>> To effectively allow us to union back into the “same” DStream
>>>>>>>>>>>  we’d have to end up using Sparks queue streams (or their 
>>>>>>>>>>> equivalent custom
>>>>>>>>>>> source because of some queue stream limitations), which invites some
>>>>>>>>>>> reliability challenges. This might be at the point where I should 
>>>>>>>>>>> send a
>>>>>>>>>>> diagram/some sample code since it’s a bit convoluted.
>>>>>>>>>>>
>>>>>>>>>>> The more I think about the jumps required to make the “simple”
>>>>>>>>>>> union approach work, the more it seems just using the statemapping 
>>>>>>>>>>> for
>>>>>>>>>>> steaming is probably more reasonable. Although the state tracking 
>>>>>>>>>>> in Spark
>>>>>>>>>>> can be somewhat expensive so it would probably make sense to 
>>>>>>>>>>> benchmark to
>>>>>>>>>>> see if it meets our needs.
>>>>>>>>>>>
>>>>>>>>>> So the problem is, I don't think this can be made to work using
>>>>>>>>>> mapWithState. It doesn't allow a mapping function that emits infinite
>>>>>>>>>> output for an input element, directly or not.
>>>>>>>>>>
>>>>>>>>> So, provided there is an infinite input (eg pick a never ending
>>>>>>>>> queue stream), and each call produces a finite output, we would have 
>>>>>>>>> an
>>>>>>>>> infinite number of calls.
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> Dataflow and Flink, for example, had timer support even before
>>>>>>>>>> SDFs, and a timer can set another timer and thus end up doing an 
>>>>>>>>>> infinite
>>>>>>>>>> amount of work in a fault tolerant way - so SDF could be implemented 
>>>>>>>>>> on top
>>>>>>>>>> of that. But AFAIK spark doesn't have a similar feature, hence my 
>>>>>>>>>> concern.
>>>>>>>>>>
>>>>>>>>> So we can do an inifinite queue stream which would allow us to be
>>>>>>>>> triggered at each interval and handle our own persistence.
>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>> But these still are both DStream based rather than Dataset which
>>>>>>>>>>> we might want to support (depends on what direction folks take with 
>>>>>>>>>>> the
>>>>>>>>>>> runners).
>>>>>>>>>>>
>>>>>>>>>>> If we wanted to do this in the dataset world looking at a custom
>>>>>>>>>>> sink/source would also be an option, (which is effectively what a 
>>>>>>>>>>> custom
>>>>>>>>>>> queue stream like thing for dstreams requires), but the datasource 
>>>>>>>>>>> APIs are
>>>>>>>>>>> a bit influx so if we ended up doing things at the edge of what’s 
>>>>>>>>>>> allowed
>>>>>>>>>>> there’s a good chance we’d have to rewrite it a few times.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>>> Assuming that we have a given dstream though in Spark we can
>>>>>>>>>>>>> get the underlying RDD implementation for each microbatch and do 
>>>>>>>>>>>>> our work
>>>>>>>>>>>>> inside of that.
>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> More generally this does raise an important question if we
>>>>>>>>>>>>>>> want to target datasets instead of rdds/DStreams in which case 
>>>>>>>>>>>>>>> i would need
>>>>>>>>>>>>>>> to do some more poking.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <
>>>>>>>>>>>>>>>> re...@google.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> How would timers be implemented? By outputing and
>>>>>>>>>>>>>>>>> reprocessing, the same way you proposed for SDF?
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> i mean the timers could be inside the mappers within the
>>>>>>>>>>>>>>> system. Could use a singleton so if a partition is re-executed 
>>>>>>>>>>>>>>> it doesn’t
>>>>>>>>>>>>>>> end up as a straggler.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <
>>>>>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>>>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Does Spark have support for timers? (I know it has
>>>>>>>>>>>>>>>>>>> support for state)
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <
>>>>>>>>>>>>>>>>>>> re...@google.com> wrote:
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> Could we alternatively use a state mapping function to
>>>>>>>>>>>>>>>>>>>> keep track of the computation so far instead of outputting 
>>>>>>>>>>>>>>>>>>>> V each time?
>>>>>>>>>>>>>>>>>>>> (also the progress so far is probably of a different type 
>>>>>>>>>>>>>>>>>>>> R rather than V).
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> So we had a quick chat about what it would take to add
>>>>>>>>>>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some 
>>>>>>>>>>>>>>>>>>>>> sketchy thinking
>>>>>>>>>>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>>>>>>>>>>> For input type T
>>>>>>>>>>>>>>>>>>>>> Output type V
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>>>>>>>>>>> and if the computation finishes T will be populated
>>>>>>>>>>>>>>>>>>>>> otherwise V will be
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> For determining how long to run we'd up to either K
>>>>>>>>>>>>>>>>>>>>> seconds or listen for a signal on a port
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Once we're done running we take the result and filter
>>>>>>>>>>>>>>>>>>>>> for the ones with T and V into seperate collections 
>>>>>>>>>>>>>>>>>>>>> re-run until finished
>>>>>>>>>>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>>>>>>>>>>> complicated and I figured terrible was a good place to 
>>>>>>>>>>>>>>>>>>>>> start and improve
>>>>>>>>>>>>>>>>>>>>> from.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> Let me know your thoughts, especially the parts where
>>>>>>>>>>>>>>>>>>>>> this is worse than I remember because its been awhile 
>>>>>>>>>>>>>>>>>>>>> since I thought about
>>>>>>>>>>>>>>>>>>>>> this.
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>
>>>>>


-- 
Twitter: https://twitter.com/holdenkarau

Reply via email to