On Mon, Mar 26, 2018, 2:08 PM Shen Li <[email protected]> wrote: > Hi Eugene, > > Thanks. Does it mean the application cannot dynamically change the > parallel width of an UnboundedSource during runtime? > Correct, it's limited by how many parts it was split into initially. So it makes sense to initially split into a fairly large number of parts, assigning more than one at the same time to each worker, so if you need more workers then you have more parts ready.
> > Correct. split() is applied to a single argument, so there's nothing to > execute in parallel here. It executes sequentially, and produces a number > of sources that can then be executed in parallel. It's pretty similar to > executing a DoFn on a single element. > > I was thinking about the following scenario where the split() will be > called in parallel. Suppose initially the translation phase invoked the > UnboundedSource#split() API and got 3 sub-sources. It then starts the > runtime where the source operator has 3 instances running in parallel, one > for each sub-source. This part works fine, and there will only be one > split() invocation during the translation. However, say after a while, the > application would like to increase the source parallelism from 3 to 4. But, > as it has already finished the translation, this change will be done > dynamically during runtime. The runtime will add another source instance. > Then, the four source instances will all call the split() API in parallel. > If this API consistently returns 4 sub-sources, then each source operator > instance can retrieve its own sub-source and proceed from there. > > > > Thanks, > Shen > > > On Mon, Mar 26, 2018 at 4:48 PM, Eugene Kirpichov <[email protected]> > wrote: > >> >> >> On Mon, Mar 26, 2018 at 1:09 PM Shen Li <[email protected]> wrote: >> >>> Hi Lukasz, >>> >>> Thanks for your response. >>> >>> > Each call to split may return a different set of sub sources but they >>> always represent the entire original source. >>> >>> Inconsistent sets of sub-sources prevent runners/engines from calling >>> the split API in a distributed manner during runtime. >>> >> Correct. split() is applied to a single argument, so there's nothing to >> execute in parallel here. It executes sequentially, and produces a number >> of sources that can then be executed in parallel. It's pretty similar to >> executing a DoFn on a single element. >> >> >>> Besides, the splitAtFraction(double fraction) is only available in >>> BoundedSources. How do you perform dynamic splitting for UnboundedSources? >>> >> There is no analogous API for unbounded sources. >> >> >>> >>> Another question: will Source transforms eventually become deprecated >>> and be replaced by the SplittableParDo? >>> >> Yes; this is already the case in the Portability framework. >> >> >>> >>> Thanks, >>> Shen >>> >>> >>> >>> On Mon, Mar 26, 2018 at 3:41 PM, Lukasz Cwik <[email protected]> wrote: >>> >>>> Contractually, the sources returned by splitting must represent the >>>> original source. Each call to split may return a different set of sub >>>> sources but they always represent the entire original source. >>>> >>>> Note that Dataflow does call split effectively during translation and >>>> then later calls APIs on sources to perform dynamic splitting[1]. >>>> >>>> Note, that this is being replaced with SplittableDoFn. Worthwhile to >>>> look at this doc[2] and presentation[3]. >>>> >>>> 1: >>>> https://github.com/apache/beam/blob/28665490f6ad0cad091f4f936a8f113617fd3f27/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BoundedSource.java#L387 >>>> 2: https://s.apache.org/splittable-do-fn >>>> 3: >>>> https://conferences.oreilly.com/strata/strata-ca/public/schedule/detail/63696 >>>> >>>> >>>> >>>> On Mon, Mar 26, 2018 at 11:33 AM Shen Li <[email protected]> wrote: >>>> >>>>> Hi, >>>>> >>>>> Does the split API in Bounded/UnboundedSource guarantee to return the >>>>> same result if invoked in different parallel instances in a distributed >>>>> environment? >>>>> >>>>> For example, assume the original source can split into 3 sub-sources. >>>>> Say the runner creates 3 parallel source operator instances (perhaps >>>>> running in different servers) and uses each instance to handle 1 of the 3 >>>>> sub-sources. In this case, if each operator instance invokes the split >>>>> method in a distributed manner, will they get the same split result? >>>>> >>>>> My understanding is that the current API does not guarantee the 3 >>>>> operator instances will receive the same split result. It is possible that >>>>> 1 of the 3 instances receive 4 sub-sources and the other two receives 3. >>>>> Or, even if they all get 3 sub-sources, there could be gaps and overlaps >>>>> in >>>>> the data streams. If so, shall we add an API to indicate that whether a >>>>> source can split at runtime? >>>>> >>>>> One solution is to avoid this problem is to split the source at >>>>> translation time and directly pass sub-sources to operator instances. But >>>>> this is not ideal. The server runs the translation might not have access >>>>> to >>>>> the source (DB, KV, MQ, etc). Or the application may want to dynamically >>>>> change the source parallel width at runtime. Hence, the runner/engine >>>>> sometimes have to split the source during runtime in a distributed >>>>> environment. >>>>> >>>>> Thanks, >>>>> Shen >>>>> >>>>> >>> >
