On Fri, Mar 23, 2018, 11:17 PM Holden Karau <hol...@pigscanfly.ca> wrote:

> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <kirpic...@google.com>
> wrote:
>
>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <hol...@pigscanfly.ca>
>> wrote:
>>
>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <kirpic...@google.com>
>>> wrote:
>>>
>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <hol...@pigscanfly.ca>
>>>> wrote:
>>>>
>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <kirpic...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Reviving this thread. I think SDF is a pretty big risk for Spark
>>>>>> runner streaming. Holden, is it correct that Spark appears to have no way
>>>>>> at all to produce an infinite DStream from a finite RDD? Maybe we can
>>>>>> somehow dynamically create a new DStream for every initial restriction,
>>>>>> said DStream being obtained using a Receiver that under the hood actually
>>>>>> runs the SDF? (this is of course less efficient than a timer-capable 
>>>>>> runner
>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>
>>>>> So on the streaming side we could simply do it with a fixed number of
>>>>> levels on DStreams. It’s not great but it would work.
>>>>>
>>>> Not sure I understand this. Let me try to clarify what SDF demands of
>>>> the runner. Imagine the following case: a file contains a list of "master"
>>>> Kafka topics, on which there are published additional Kafka topics to read.
>>>>
>>>> PCollection<String> masterTopics = TextIO.read().from(masterTopicsFile)
>>>> PCollection<String> nestedTopics =
>>>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>>>> PCollection<String> records = nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>>>
>>>> This exemplifies both use cases of a streaming SDF that emits infinite
>>>> output for every input:
>>>> - Applying it to a finite set of inputs (in this case to the result of
>>>> reading a text file)
>>>> - Applying it to an infinite set of inputs (i.e. having an unbounded
>>>> number of streams being read concurrently, each of the streams themselves
>>>> is unbounded too)
>>>>
>>>> Does the multi-level solution you have in mind work for this case? I
>>>> suppose the second case is harder, so we can focus on that.
>>>>
>>> So none of those are a splittabledofn right?
>>>
>> Not sure what you mean? ReadFromKafkaFn in these examples is a splittable
>> DoFn and we're trying to figure out how to make Spark run it.
>>
>>
> Ah ok, sorry I saw that and for some reason parsed them as old style DoFns
> in my head.
>
> To effectively allow us to union back into the “same” DStream  we’d have
> to end up using Sparks queue streams (or their equivalent custom source
> because of some queue stream limitations), which invites some reliability
> challenges. This might be at the point where I should send a diagram/some
> sample code since it’s a bit convoluted.
>
> The more I think about the jumps required to make the “simple” union
> approach work, the more it seems just using the statemapping for steaming
> is probably more reasonable. Although the state tracking in Spark can be
> somewhat expensive so it would probably make sense to benchmark to see if
> it meets our needs.
>
So the problem is, I don't think this can be made to work using
mapWithState. It doesn't allow a mapping function that emits infinite
output for an input element, directly or not.

Dataflow and Flink, for example, had timer support even before SDFs, and a
timer can set another timer and thus end up doing an infinite amount of
work in a fault tolerant way - so SDF could be implemented on top of that.
But AFAIK spark doesn't have a similar feature, hence my concern.


> But these still are both DStream based rather than Dataset which we might
> want to support (depends on what direction folks take with the runners).
>
> If we wanted to do this in the dataset world looking at a custom
> sink/source would also be an option, (which is effectively what a custom
> queue stream like thing for dstreams requires), but the datasource APIs are
> a bit influx so if we ended up doing things at the edge of what’s allowed
> there’s a good chance we’d have to rewrite it a few times.
>
>
>>> Assuming that we have a given dstream though in Spark we can get the
>>> underlying RDD implementation for each microbatch and do our work inside of
>>> that.
>>>
>>>>
>>>>
>>>>>
>>>>> More generally this does raise an important question if we want to
>>>>> target datasets instead of rdds/DStreams in which case i would need to do
>>>>> some more poking.
>>>>>
>>>>>
>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> How would timers be implemented? By outputing and reprocessing, the
>>>>>>> same way you proposed for SDF?
>>>>>>>
>>>>>> i mean the timers could be inside the mappers within the system.
>>>>> Could use a singleton so if a partition is re-executed it doesn’t end up 
>>>>> as
>>>>> a straggler.
>>>>>
>>>>>>
>>>>>>>
>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <hol...@pigscanfly.ca>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>
>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>
>>>>>>>>> Does Spark have support for timers? (I know it has support for
>>>>>>>>> state)
>>>>>>>>>
>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Could we alternatively use a state mapping function to keep track
>>>>>>>>>> of the computation so far instead of outputting V each time? (also 
>>>>>>>>>> the
>>>>>>>>>> progress so far is probably of a different type R rather than V).
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>
>>>>>>>>>>> So we had a quick chat about what it would take to add something
>>>>>>>>>>> like SplittableDoFns to Spark. I'd done some sketchy thinking about 
>>>>>>>>>>> this
>>>>>>>>>>> last year but didn't get very far.
>>>>>>>>>>>
>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>> For input type T
>>>>>>>>>>> Output type V
>>>>>>>>>>>
>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>> and if the computation finishes T will be populated otherwise V
>>>>>>>>>>> will be
>>>>>>>>>>>
>>>>>>>>>>> For determining how long to run we'd up to either K seconds or
>>>>>>>>>>> listen for a signal on a port
>>>>>>>>>>>
>>>>>>>>>>> Once we're done running we take the result and filter for the
>>>>>>>>>>> ones with T and V into seperate collections re-run until finished
>>>>>>>>>>> and then union the results
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>> complicated and I figured terrible was a good place to start and 
>>>>>>>>>>> improve
>>>>>>>>>>> from.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> Let me know your thoughts, especially the parts where this is
>>>>>>>>>>> worse than I remember because its been awhile since I thought about 
>>>>>>>>>>> this.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>
>>>>>>> --
>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>
>>>> --
>>> Twitter: https://twitter.com/holdenkarau
>>>
>> --
> Twitter: https://twitter.com/holdenkarau
>

Reply via email to