On Mon, Mar 26, 2018, 2:08 PM Shen Li <[email protected]> wrote:

> Hi Eugene,
>
> Thanks. Does it mean the application cannot dynamically change the
> parallel width of an UnboundedSource during runtime?
>
Correct, it's limited by how many parts it was split into initially. So it
makes sense to initially split into a fairly large number of parts,
assigning more than one at the same time to each worker, so if you need
more workers then you have more parts ready.


> > Correct. split() is applied to a single argument, so there's nothing to
> execute in parallel here. It executes sequentially, and produces a number
> of sources that can then be executed in parallel. It's pretty similar to
> executing a DoFn on a single element.
>
> I was thinking about the following scenario where the split() will be
> called in parallel. Suppose initially the translation phase invoked the
> UnboundedSource#split() API and got 3 sub-sources. It then starts the
> runtime where the source operator has 3 instances running in parallel, one
> for each sub-source. This part works fine, and there will only be one
> split() invocation during the translation. However, say after a while, the
> application would like to increase the source parallelism from 3 to 4. But,
> as it has already finished the translation, this change will be done
> dynamically during runtime. The runtime will add another source instance.
> Then, the four source instances will all call the split() API in parallel.
> If this API consistently returns 4 sub-sources, then each source operator
> instance can retrieve its own sub-source and proceed from there.
>

>
>
> Thanks,
> Shen
>
>
> On Mon, Mar 26, 2018 at 4:48 PM, Eugene Kirpichov <[email protected]>
> wrote:
>
>>
>>
>> On Mon, Mar 26, 2018 at 1:09 PM Shen Li <[email protected]> wrote:
>>
>>> Hi Lukasz,
>>>
>>> Thanks for your response.
>>>
>>> > Each call to split may return a different set of sub sources but they
>>> always represent the entire original source.
>>>
>>> Inconsistent sets of sub-sources prevent runners/engines from calling
>>> the split API in a distributed manner during runtime.
>>>
>> Correct. split() is applied to a single argument, so there's nothing to
>> execute in parallel here. It executes sequentially, and produces a number
>> of sources that can then be executed in parallel. It's pretty similar to
>> executing a DoFn on a single element.
>>
>>
>>> Besides, the splitAtFraction(double fraction) is only available in
>>> BoundedSources. How do you perform dynamic splitting for UnboundedSources?
>>>
>> There is no analogous API for unbounded sources.
>>
>>
>>>
>>> Another question: will Source transforms eventually become deprecated
>>> and be replaced by the SplittableParDo?
>>>
>> Yes; this is already the case in the Portability framework.
>>
>>
>>>
>>> Thanks,
>>> Shen
>>>
>>>
>>>
>>> On Mon, Mar 26, 2018 at 3:41 PM, Lukasz Cwik <[email protected]> wrote:
>>>
>>>> Contractually, the sources returned by splitting must represent the
>>>> original source. Each call to split may return a different set of sub
>>>> sources but they always represent the entire original source.
>>>>
>>>> Note that Dataflow does call split effectively during translation and
>>>> then later calls APIs on sources to perform dynamic splitting[1].
>>>>
>>>> Note, that this is being replaced with SplittableDoFn. Worthwhile to
>>>> look at this doc[2] and presentation[3].
>>>>
>>>> 1:
>>>> https://github.com/apache/beam/blob/28665490f6ad0cad091f4f936a8f113617fd3f27/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java#L387
>>>> 2: https://s.apache.org/splittable-do-fn
>>>> 3:
>>>> https://conferences.oreilly.com/strata/strata-ca/public/schedule/detail/63696
>>>>
>>>>
>>>>
>>>> On Mon, Mar 26, 2018 at 11:33 AM Shen Li <[email protected]> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Does the split API in Bounded/UnboundedSource guarantee to return the
>>>>> same result if invoked in different parallel instances in a distributed
>>>>> environment?
>>>>>
>>>>> For example, assume the original source can split into 3 sub-sources.
>>>>> Say the runner creates 3 parallel source operator instances (perhaps
>>>>> running in different servers) and uses each instance to handle 1 of the 3
>>>>> sub-sources. In this case, if each operator instance invokes the split
>>>>> method in a distributed manner, will they get the same split result?
>>>>>
>>>>> My understanding is that the current API does not guarantee the 3
>>>>> operator instances will receive the same split result. It is possible that
>>>>> 1 of the 3 instances receive 4 sub-sources and the other two receives 3.
>>>>> Or, even if they all get 3 sub-sources, there could be gaps and overlaps 
>>>>> in
>>>>> the data streams. If so, shall we add an API to indicate that whether a
>>>>> source can split at runtime?
>>>>>
>>>>> One solution is to avoid this problem is to split the source at
>>>>> translation time and directly pass sub-sources to operator instances. But
>>>>> this is not ideal. The server runs the translation might not have access 
>>>>> to
>>>>> the source (DB, KV, MQ, etc). Or the application may want to dynamically
>>>>> change the source parallel width at runtime. Hence, the runner/engine
>>>>> sometimes have to split the source during runtime in a distributed
>>>>> environment.
>>>>>
>>>>> Thanks,
>>>>> Shen
>>>>>
>>>>>
>>>
>

Reply via email to