Hi Eugene,

Thanks. Does it mean the application cannot dynamically change the parallel
width of an UnboundedSource during runtime?

> Correct. split() is applied to a single argument, so there's nothing to
execute in parallel here. It executes sequentially, and produces a number
of sources that can then be executed in parallel. It's pretty similar to
executing a DoFn on a single element.

I was thinking about the following scenario where the split() will be
called in parallel. Suppose initially the translation phase invoked the
UnboundedSource#split() API and got 3 sub-sources. It then starts the
runtime where the source operator has 3 instances running in parallel, one
for each sub-source. This part works fine, and there will only be one
split() invocation during the translation. However, say after a while, the
application would like to increase the source parallelism from 3 to 4. But,
as it has already finished the translation, this change will be done
dynamically during runtime. The runtime will add another source instance.
Then, the four source instances will all call the split() API in parallel.
If this API consistently returns 4 sub-sources, then each source operator
instance can retrieve its own sub-source and proceed from there.



Thanks,
Shen


On Mon, Mar 26, 2018 at 4:48 PM, Eugene Kirpichov <kirpic...@google.com>
wrote:

>
>
> On Mon, Mar 26, 2018 at 1:09 PM Shen Li <cs.she...@gmail.com> wrote:
>
>> Hi Lukasz,
>>
>> Thanks for your response.
>>
>> > Each call to split may return a different set of sub sources but they
>> always represent the entire original source.
>>
>> Inconsistent sets of sub-sources prevent runners/engines from calling the
>> split API in a distributed manner during runtime.
>>
> Correct. split() is applied to a single argument, so there's nothing to
> execute in parallel here. It executes sequentially, and produces a number
> of sources that can then be executed in parallel. It's pretty similar to
> executing a DoFn on a single element.
>
>
>> Besides, the splitAtFraction(double fraction) is only available in
>> BoundedSources. How do you perform dynamic splitting for UnboundedSources?
>>
> There is no analogous API for unbounded sources.
>
>
>>
>> Another question: will Source transforms eventually become deprecated and
>> be replaced by the SplittableParDo?
>>
> Yes; this is already the case in the Portability framework.
>
>
>>
>> Thanks,
>> Shen
>>
>>
>>
>> On Mon, Mar 26, 2018 at 3:41 PM, Lukasz Cwik <lc...@google.com> wrote:
>>
>>> Contractually, the sources returned by splitting must represent the
>>> original source. Each call to split may return a different set of sub
>>> sources but they always represent the entire original source.
>>>
>>> Note that Dataflow does call split effectively during translation and
>>> then later calls APIs on sources to perform dynamic splitting[1].
>>>
>>> Note, that this is being replaced with SplittableDoFn. Worthwhile to
>>> look at this doc[2] and presentation[3].
>>>
>>> 1: https://github.com/apache/beam/blob/28665490f6ad0cad091f4f936a8f11
>>> 3617fd3f27/sdks/java/core/src/main/java/org/apache/beam/sdk/
>>> io/BoundedSource.java#L387
>>> 2: https://s.apache.org/splittable-do-fn
>>> 3: https://conferences.oreilly.com/strata/strata-ca/
>>> public/schedule/detail/63696
>>>
>>>
>>>
>>> On Mon, Mar 26, 2018 at 11:33 AM Shen Li <cs.she...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>
>>>> Does the split API in Bounded/UnboundedSource guarantee to return the
>>>> same result if invoked in different parallel instances in a distributed
>>>> environment?
>>>>
>>>> For example, assume the original source can split into 3 sub-sources.
>>>> Say the runner creates 3 parallel source operator instances (perhaps
>>>> running in different servers) and uses each instance to handle 1 of the 3
>>>> sub-sources. In this case, if each operator instance invokes the split
>>>> method in a distributed manner, will they get the same split result?
>>>>
>>>> My understanding is that the current API does not guarantee the 3
>>>> operator instances will receive the same split result. It is possible that
>>>> 1 of the 3 instances receive 4 sub-sources and the other two receives 3.
>>>> Or, even if they all get 3 sub-sources, there could be gaps and overlaps in
>>>> the data streams. If so, shall we add an API to indicate that whether a
>>>> source can split at runtime?
>>>>
>>>> One solution is to avoid this problem is to split the source at
>>>> translation time and directly pass sub-sources to operator instances. But
>>>> this is not ideal. The server runs the translation might not have access to
>>>> the source (DB, KV, MQ, etc). Or the application may want to dynamically
>>>> change the source parallel width at runtime. Hence, the runner/engine
>>>> sometimes have to split the source during runtime in a distributed
>>>> environment.
>>>>
>>>> Thanks,
>>>> Shen
>>>>
>>>>
>>

Reply via email to