On Sat, Mar 24, 2018 at 1:23 PM Eugene Kirpichov <kirpic...@google.com>
wrote:

>
>
> On Fri, Mar 23, 2018, 11:17 PM Holden Karau <hol...@pigscanfly.ca> wrote:
>
>> On Fri, Mar 23, 2018 at 7:00 PM Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>>
>>> On Fri, Mar 23, 2018 at 6:49 PM Holden Karau <hol...@pigscanfly.ca>
>>> wrote:
>>>
>>>> On Fri, Mar 23, 2018 at 6:20 PM Eugene Kirpichov <kirpic...@google.com>
>>>> wrote:
>>>>
>>>>> On Fri, Mar 23, 2018 at 6:12 PM Holden Karau <hol...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>> On Fri, Mar 23, 2018 at 5:58 PM Eugene Kirpichov <
>>>>>> kirpic...@google.com> wrote:
>>>>>>
>>>>>>> Reviving this thread. I think SDF is a pretty big risk for Spark
>>>>>>> runner streaming. Holden, is it correct that Spark appears to have no 
>>>>>>> way
>>>>>>> at all to produce an infinite DStream from a finite RDD? Maybe we can
>>>>>>> somehow dynamically create a new DStream for every initial restriction,
>>>>>>> said DStream being obtained using a Receiver that under the hood 
>>>>>>> actually
>>>>>>> runs the SDF? (this is of course less efficient than a timer-capable 
>>>>>>> runner
>>>>>>> would do, and I have doubts about the fault tolerance)
>>>>>>>
>>>>>> So on the streaming side we could simply do it with a fixed number of
>>>>>> levels on DStreams. It’s not great but it would work.
>>>>>>
>>>>> Not sure I understand this. Let me try to clarify what SDF demands of
>>>>> the runner. Imagine the following case: a file contains a list of "master"
>>>>> Kafka topics, on which there are published additional Kafka topics to 
>>>>> read.
>>>>>
>>>>> PCollection<String> masterTopics = TextIO.read().from(masterTopicsFile)
>>>>> PCollection<String> nestedTopics =
>>>>> masterTopics.apply(ParDo(ReadFromKafkaFn))
>>>>> PCollection<String> records =
>>>>> nestedTopics.apply(ParDo(ReadFromKafkaFn))
>>>>>
>>>>> This exemplifies both use cases of a streaming SDF that emits infinite
>>>>> output for every input:
>>>>> - Applying it to a finite set of inputs (in this case to the result of
>>>>> reading a text file)
>>>>> - Applying it to an infinite set of inputs (i.e. having an unbounded
>>>>> number of streams being read concurrently, each of the streams themselves
>>>>> is unbounded too)
>>>>>
>>>>> Does the multi-level solution you have in mind work for this case? I
>>>>> suppose the second case is harder, so we can focus on that.
>>>>>
>>>> So none of those are a splittabledofn right?
>>>>
>>> Not sure what you mean? ReadFromKafkaFn in these examples is a
>>> splittable DoFn and we're trying to figure out how to make Spark run it.
>>>
>>>
>> Ah ok, sorry I saw that and for some reason parsed them as old style
>> DoFns in my head.
>>
>> To effectively allow us to union back into the “same” DStream  we’d have
>> to end up using Sparks queue streams (or their equivalent custom source
>> because of some queue stream limitations), which invites some reliability
>> challenges. This might be at the point where I should send a diagram/some
>> sample code since it’s a bit convoluted.
>>
>> The more I think about the jumps required to make the “simple” union
>> approach work, the more it seems just using the statemapping for steaming
>> is probably more reasonable. Although the state tracking in Spark can be
>> somewhat expensive so it would probably make sense to benchmark to see if
>> it meets our needs.
>>
> So the problem is, I don't think this can be made to work using
> mapWithState. It doesn't allow a mapping function that emits infinite
> output for an input element, directly or not.
>
So, provided there is an infinite input (eg pick a never ending queue
stream), and each call produces a finite output, we would have an infinite
number of calls.

>
> Dataflow and Flink, for example, had timer support even before SDFs, and a
> timer can set another timer and thus end up doing an infinite amount of
> work in a fault tolerant way - so SDF could be implemented on top of that.
> But AFAIK spark doesn't have a similar feature, hence my concern.
>
So we can do an inifinite queue stream which would allow us to be triggered
at each interval and handle our own persistence.

>
>
>> But these still are both DStream based rather than Dataset which we might
>> want to support (depends on what direction folks take with the runners).
>>
>> If we wanted to do this in the dataset world looking at a custom
>> sink/source would also be an option, (which is effectively what a custom
>> queue stream like thing for dstreams requires), but the datasource APIs are
>> a bit influx so if we ended up doing things at the edge of what’s allowed
>> there’s a good chance we’d have to rewrite it a few times.
>>
>>
>>>> Assuming that we have a given dstream though in Spark we can get the
>>>> underlying RDD implementation for each microbatch and do our work inside of
>>>> that.
>>>>
>>>>>
>>>>>
>>>>>>
>>>>>> More generally this does raise an important question if we want to
>>>>>> target datasets instead of rdds/DStreams in which case i would need to do
>>>>>> some more poking.
>>>>>>
>>>>>>
>>>>>>> On Wed, Mar 14, 2018 at 10:26 PM Reuven Lax <re...@google.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> How would timers be implemented? By outputing and reprocessing, the
>>>>>>>> same way you proposed for SDF?
>>>>>>>>
>>>>>>> i mean the timers could be inside the mappers within the system.
>>>>>> Could use a singleton so if a partition is re-executed it doesn’t end up 
>>>>>> as
>>>>>> a straggler.
>>>>>>
>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Mar 14, 2018 at 7:25 PM Holden Karau <hol...@pigscanfly.ca>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> So the timers would have to be in our own code.
>>>>>>>>>
>>>>>>>>> On Wed, Mar 14, 2018 at 5:18 PM Eugene Kirpichov <
>>>>>>>>> kirpic...@google.com> wrote:
>>>>>>>>>
>>>>>>>>>> Does Spark have support for timers? (I know it has support for
>>>>>>>>>> state)
>>>>>>>>>>
>>>>>>>>>> On Wed, Mar 14, 2018 at 4:43 PM Reuven Lax <re...@google.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Could we alternatively use a state mapping function to keep
>>>>>>>>>>> track of the computation so far instead of outputting V each time? 
>>>>>>>>>>> (also
>>>>>>>>>>> the progress so far is probably of a different type R rather than 
>>>>>>>>>>> V).
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Wed, Mar 14, 2018 at 4:28 PM Holden Karau <
>>>>>>>>>>> hol...@pigscanfly.ca> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> So we had a quick chat about what it would take to add
>>>>>>>>>>>> something like SplittableDoFns to Spark. I'd done some sketchy 
>>>>>>>>>>>> thinking
>>>>>>>>>>>> about this last year but didn't get very far.
>>>>>>>>>>>>
>>>>>>>>>>>> My back-of-the-envelope design was as follows:
>>>>>>>>>>>> For input type T
>>>>>>>>>>>> Output type V
>>>>>>>>>>>>
>>>>>>>>>>>> Implement a mapper which outputs type (T, V)
>>>>>>>>>>>> and if the computation finishes T will be populated otherwise V
>>>>>>>>>>>> will be
>>>>>>>>>>>>
>>>>>>>>>>>> For determining how long to run we'd up to either K seconds or
>>>>>>>>>>>> listen for a signal on a port
>>>>>>>>>>>>
>>>>>>>>>>>> Once we're done running we take the result and filter for the
>>>>>>>>>>>> ones with T and V into seperate collections re-run until finished
>>>>>>>>>>>> and then union the results
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> This is maybe not a great design but it was minimally
>>>>>>>>>>>> complicated and I figured terrible was a good place to start and 
>>>>>>>>>>>> improve
>>>>>>>>>>>> from.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> Let me know your thoughts, especially the parts where this is
>>>>>>>>>>>> worse than I remember because its been awhile since I thought 
>>>>>>>>>>>> about this.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>>
>>>>>>>> --
>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>
>>>>> --
>>>> Twitter: https://twitter.com/holdenkarau
>>>>
>>> --
>> Twitter: https://twitter.com/holdenkarau
>>
> --
Twitter: https://twitter.com/holdenkarau

Reply via email to