Thank you! Shen
On Mon, Mar 26, 2018 at 5:34 PM, Eugene Kirpichov <kirpic...@google.com> wrote: > > > On Mon, Mar 26, 2018, 2:08 PM Shen Li <cs.she...@gmail.com> wrote: > >> Hi Eugene, >> >> Thanks. Does it mean the application cannot dynamically change the >> parallel width of an UnboundedSource during runtime? >> > Correct, it's limited by how many parts it was split into initially. So it > makes sense to initially split into a fairly large number of parts, > assigning more than one at the same time to each worker, so if you need > more workers then you have more parts ready. > > >> > Correct. split() is applied to a single argument, so there's nothing >> to execute in parallel here. It executes sequentially, and produces a >> number of sources that can then be executed in parallel. It's pretty >> similar to executing a DoFn on a single element. >> >> I was thinking about the following scenario where the split() will be >> called in parallel. Suppose initially the translation phase invoked the >> UnboundedSource#split() API and got 3 sub-sources. It then starts the >> runtime where the source operator has 3 instances running in parallel, one >> for each sub-source. This part works fine, and there will only be one >> split() invocation during the translation. However, say after a while, the >> application would like to increase the source parallelism from 3 to 4. But, >> as it has already finished the translation, this change will be done >> dynamically during runtime. The runtime will add another source instance. >> Then, the four source instances will all call the split() API in parallel. >> If this API consistently returns 4 sub-sources, then each source operator >> instance can retrieve its own sub-source and proceed from there. >> > >> >> >> Thanks, >> Shen >> >> >> On Mon, Mar 26, 2018 at 4:48 PM, Eugene Kirpichov <kirpic...@google.com> >> wrote: >> >>> >>> >>> On Mon, Mar 26, 2018 at 1:09 PM Shen Li <cs.she...@gmail.com> wrote: >>> >>>> Hi Lukasz, >>>> >>>> Thanks for your response. >>>> >>>> > Each call to split may return a different set of sub sources but >>>> they always represent the entire original source. >>>> >>>> Inconsistent sets of sub-sources prevent runners/engines from calling >>>> the split API in a distributed manner during runtime. >>>> >>> Correct. split() is applied to a single argument, so there's nothing to >>> execute in parallel here. It executes sequentially, and produces a number >>> of sources that can then be executed in parallel. It's pretty similar to >>> executing a DoFn on a single element. >>> >>> >>>> Besides, the splitAtFraction(double fraction) is only available in >>>> BoundedSources. How do you perform dynamic splitting for UnboundedSources? >>>> >>> There is no analogous API for unbounded sources. >>> >>> >>>> >>>> Another question: will Source transforms eventually become deprecated >>>> and be replaced by the SplittableParDo? >>>> >>> Yes; this is already the case in the Portability framework. >>> >>> >>>> >>>> Thanks, >>>> Shen >>>> >>>> >>>> >>>> On Mon, Mar 26, 2018 at 3:41 PM, Lukasz Cwik <lc...@google.com> wrote: >>>> >>>>> Contractually, the sources returned by splitting must represent the >>>>> original source. Each call to split may return a different set of sub >>>>> sources but they always represent the entire original source. >>>>> >>>>> Note that Dataflow does call split effectively during translation and >>>>> then later calls APIs on sources to perform dynamic splitting[1]. >>>>> >>>>> Note, that this is being replaced with SplittableDoFn. Worthwhile to >>>>> look at this doc[2] and presentation[3]. >>>>> >>>>> 1: https://github.com/apache/beam/blob/28665490f6ad0cad091f4f936a8f11 >>>>> 3617fd3f27/sdks/java/core/src/main/java/org/apache/beam/sdk/ >>>>> io/BoundedSource.java#L387 >>>>> 2: https://s.apache.org/splittable-do-fn >>>>> 3: https://conferences.oreilly.com/strata/strata-ca/ >>>>> public/schedule/detail/63696 >>>>> >>>>> >>>>> >>>>> On Mon, Mar 26, 2018 at 11:33 AM Shen Li <cs.she...@gmail.com> wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Does the split API in Bounded/UnboundedSource guarantee to return the >>>>>> same result if invoked in different parallel instances in a distributed >>>>>> environment? >>>>>> >>>>>> For example, assume the original source can split into 3 sub-sources. >>>>>> Say the runner creates 3 parallel source operator instances (perhaps >>>>>> running in different servers) and uses each instance to handle 1 of the 3 >>>>>> sub-sources. In this case, if each operator instance invokes the split >>>>>> method in a distributed manner, will they get the same split result? >>>>>> >>>>>> My understanding is that the current API does not guarantee the 3 >>>>>> operator instances will receive the same split result. It is possible >>>>>> that >>>>>> 1 of the 3 instances receive 4 sub-sources and the other two receives 3. >>>>>> Or, even if they all get 3 sub-sources, there could be gaps and overlaps >>>>>> in >>>>>> the data streams. If so, shall we add an API to indicate that whether a >>>>>> source can split at runtime? >>>>>> >>>>>> One solution is to avoid this problem is to split the source at >>>>>> translation time and directly pass sub-sources to operator instances. But >>>>>> this is not ideal. The server runs the translation might not have access >>>>>> to >>>>>> the source (DB, KV, MQ, etc). Or the application may want to dynamically >>>>>> change the source parallel width at runtime. Hence, the runner/engine >>>>>> sometimes have to split the source during runtime in a distributed >>>>>> environment. >>>>>> >>>>>> Thanks, >>>>>> Shen >>>>>> >>>>>> >>>> >>