Thank you!

Shen

On Mon, Mar 26, 2018 at 5:34 PM, Eugene Kirpichov <kirpic...@google.com>
wrote:

>
>
> On Mon, Mar 26, 2018, 2:08 PM Shen Li <cs.she...@gmail.com> wrote:
>
>> Hi Eugene,
>>
>> Thanks. Does it mean the application cannot dynamically change the
>> parallel width of an UnboundedSource during runtime?
>>
> Correct, it's limited by how many parts it was split into initially. So it
> makes sense to initially split into a fairly large number of parts,
> assigning more than one at the same time to each worker, so if you need
> more workers then you have more parts ready.
>
>
>> > Correct. split() is applied to a single argument, so there's nothing
>> to execute in parallel here. It executes sequentially, and produces a
>> number of sources that can then be executed in parallel. It's pretty
>> similar to executing a DoFn on a single element.
>>
>> I was thinking about the following scenario where the split() will be
>> called in parallel. Suppose initially the translation phase invoked the
>> UnboundedSource#split() API and got 3 sub-sources. It then starts the
>> runtime where the source operator has 3 instances running in parallel, one
>> for each sub-source. This part works fine, and there will only be one
>> split() invocation during the translation. However, say after a while, the
>> application would like to increase the source parallelism from 3 to 4. But,
>> as it has already finished the translation, this change will be done
>> dynamically during runtime. The runtime will add another source instance.
>> Then, the four source instances will all call the split() API in parallel.
>> If this API consistently returns 4 sub-sources, then each source operator
>> instance can retrieve its own sub-source and proceed from there.
>>
>
>>
>>
>> Thanks,
>> Shen
>>
>>
>> On Mon, Mar 26, 2018 at 4:48 PM, Eugene Kirpichov <kirpic...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Mon, Mar 26, 2018 at 1:09 PM Shen Li <cs.she...@gmail.com> wrote:
>>>
>>>> Hi Lukasz,
>>>>
>>>> Thanks for your response.
>>>>
>>>> > Each call to split may return a different set of sub sources but
>>>> they always represent the entire original source.
>>>>
>>>> Inconsistent sets of sub-sources prevent runners/engines from calling
>>>> the split API in a distributed manner during runtime.
>>>>
>>> Correct. split() is applied to a single argument, so there's nothing to
>>> execute in parallel here. It executes sequentially, and produces a number
>>> of sources that can then be executed in parallel. It's pretty similar to
>>> executing a DoFn on a single element.
>>>
>>>
>>>> Besides, the splitAtFraction(double fraction) is only available in
>>>> BoundedSources. How do you perform dynamic splitting for UnboundedSources?
>>>>
>>> There is no analogous API for unbounded sources.
>>>
>>>
>>>>
>>>> Another question: will Source transforms eventually become deprecated
>>>> and be replaced by the SplittableParDo?
>>>>
>>> Yes; this is already the case in the Portability framework.
>>>
>>>
>>>>
>>>> Thanks,
>>>> Shen
>>>>
>>>>
>>>>
>>>> On Mon, Mar 26, 2018 at 3:41 PM, Lukasz Cwik <lc...@google.com> wrote:
>>>>
>>>>> Contractually, the sources returned by splitting must represent the
>>>>> original source. Each call to split may return a different set of sub
>>>>> sources but they always represent the entire original source.
>>>>>
>>>>> Note that Dataflow does call split effectively during translation and
>>>>> then later calls APIs on sources to perform dynamic splitting[1].
>>>>>
>>>>> Note, that this is being replaced with SplittableDoFn. Worthwhile to
>>>>> look at this doc[2] and presentation[3].
>>>>>
>>>>> 1: https://github.com/apache/beam/blob/28665490f6ad0cad091f4f936a8f11
>>>>> 3617fd3f27/sdks/java/core/src/main/java/org/apache/beam/sdk/
>>>>> io/BoundedSource.java#L387
>>>>> 2: https://s.apache.org/splittable-do-fn
>>>>> 3: https://conferences.oreilly.com/strata/strata-ca/
>>>>> public/schedule/detail/63696
>>>>>
>>>>>
>>>>>
>>>>> On Mon, Mar 26, 2018 at 11:33 AM Shen Li <cs.she...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> Does the split API in Bounded/UnboundedSource guarantee to return the
>>>>>> same result if invoked in different parallel instances in a distributed
>>>>>> environment?
>>>>>>
>>>>>> For example, assume the original source can split into 3 sub-sources.
>>>>>> Say the runner creates 3 parallel source operator instances (perhaps
>>>>>> running in different servers) and uses each instance to handle 1 of the 3
>>>>>> sub-sources. In this case, if each operator instance invokes the split
>>>>>> method in a distributed manner, will they get the same split result?
>>>>>>
>>>>>> My understanding is that the current API does not guarantee the 3
>>>>>> operator instances will receive the same split result. It is possible 
>>>>>> that
>>>>>> 1 of the 3 instances receive 4 sub-sources and the other two receives 3.
>>>>>> Or, even if they all get 3 sub-sources, there could be gaps and overlaps 
>>>>>> in
>>>>>> the data streams. If so, shall we add an API to indicate that whether a
>>>>>> source can split at runtime?
>>>>>>
>>>>>> One solution is to avoid this problem is to split the source at
>>>>>> translation time and directly pass sub-sources to operator instances. But
>>>>>> this is not ideal. The server runs the translation might not have access 
>>>>>> to
>>>>>> the source (DB, KV, MQ, etc). Or the application may want to dynamically
>>>>>> change the source parallel width at runtime. Hence, the runner/engine
>>>>>> sometimes have to split the source during runtime in a distributed
>>>>>> environment.
>>>>>>
>>>>>> Thanks,
>>>>>> Shen
>>>>>>
>>>>>>
>>>>
>>

Reply via email to