JIRA: https://issues.apache.org/jira/browse/BEAM-8816
On Thu, Nov 21, 2019 at 10:44 AM Thomas Weise <t...@apache.org> wrote: > Hi Luke, > > Thanks for the background and it is exciting to see the progress on the > SDF side. It will help with this use case and many other challenges. I > imagine the Python user code would be able to determine that it is bogged > down with high latency record processing (based on the duration it actually > took to process previous records) and opt to send back remaining work to > the runner. > > Until the Flink runner supports reassignment of work, I'm planning to > implement the simple bundle distribution approach referred to before. We > will test it in our environment and contribute it back if the results are > good. > > Thomas > > > > On Wed, Nov 20, 2019 at 11:34 AM Luke Cwik <lc...@google.com> wrote: > >> Dataflow has run into this issue as well. Dataflow has "work items" that >> are converted into bundles that are executed on the SDK. Each work item >> does a greedy assignment to the SDK worker with the fewest work items >> assigned. As you surmised, we use SDF splitting in batch pipelines to >> balance work. We would like to use splitting of SDFs in streaming pipelines >> as well but Dataflow can't handle it as of right now. >> >> As part of a few PRs, I have added basic SDF expansion to the shared >> runner lib and slowly exposed the runner side hooks[2, 3] for SDK initiated >> checkpointing and bundle finalization. There are still a few pieces left: >> * exposing an API so the bundle can be split during execution >> * adding the limited depth splitting logic that would add a basic form of >> dynamic work rebalancing for all runners that decide to use it >> >> 1: https://github.com/apache/beam/pull/10045 >> 2: https://github.com/apache/beam/pull/10065 >> 3: https://github.com/apache/beam/pull/10074 >> >> On Wed, Nov 20, 2019 at 10:49 AM Thomas Weise <t...@apache.org> wrote: >> >>> We found a problem with uneven utilization of SDK workers causing >>> excessive latency with Streaming/Python/Flink. Remember that with Python, >>> we need to execute multiple worker processes on a machine instead of >>> relying on threads in a single worker, which requires the runner to make a >>> decision to which worker to give a bundle for processing. >>> >>> The Flink runner has knobs to influence the number of records per bundle >>> and the maximum duration for a bundle. But since the runner does not >>> understand the cost of an individual record, it is possible that the >>> duration of bundles fluctuates significantly due to the skew in processing >>> time of individual records. And unless the bundle size is 1, multiple >>> expensive records could be allocated to a single bundle before the cutoff >>> time is reached. We notice this with a pipeline that executes models, but >>> there are other use cases where the cost of individual records can vary >>> significantly. >>> >>> Additionally, the Flink runner establishes the association between the >>> subtask managing an executable stage and the SDK worker during >>> initialization, lasting for the duration of the job. In other words, >>> bundles for the same executable stage will always be sent to the same SDK >>> worker. When the execution time skew is tied to specific keys (stateful >>> processing), it further aggravates the issue. >>> >>> I started experimenting with the ability to schedule bundles on any >>> available worker. Initially I'm trying a very basic approach, starting >>> processing of a bundle only on a free environment (one that does not >>> process any other bundle). This effectively removes the pipelining between >>> subtask and SDK worker. Potentially waiting for an available environment is >>> acceptable in this case, as the per bundle overhead is very small compared >>> to the per record cost. >>> >>> However, even if this suffices for the use case I'm looking at, this is >>> an area that will probably need more work going forward. Rather than the >>> runner guessing how to schedule bundles, I think that the best long term >>> solution would be SDF, where the user code can decide that something takes >>> too long and defer remaining work (and the runner can redistribute it). >>> >>> Curious if anyone else has run into this issue yet and what other ideas >>> there may be? >>> >>> Thanks, >>> Thomas >>> >>>