Hi Luke,

Thanks for the background and it is exciting to see the progress on the SDF
side. It will help with this use case and many other challenges. I
imagine the Python user code would be able to determine that it is bogged
down with high latency record processing (based on the duration it actually
took to process previous records) and opt to send back remaining work to
the runner.

Until the Flink runner supports reassignment of work, I'm planning to
implement the simple bundle distribution approach referred to before. We
will test it in our environment and contribute it back if the results are
good.

Thomas



On Wed, Nov 20, 2019 at 11:34 AM Luke Cwik <lc...@google.com> wrote:

> Dataflow has run into this issue as well. Dataflow has "work items" that
> are converted into bundles that are executed on the SDK. Each work item
> does a greedy assignment to the SDK worker with the fewest work items
> assigned. As you surmised, we use SDF splitting in batch pipelines to
> balance work. We would like to use splitting of SDFs in streaming pipelines
> as well but Dataflow can't handle it as of right now.
>
> As part of a few PRs, I have added basic SDF expansion to the shared
> runner lib and slowly exposed the runner side hooks[2, 3] for SDK initiated
> checkpointing and bundle finalization. There are still a few pieces left:
> * exposing an API so the bundle can be split during execution
> * adding the limited depth splitting logic that would add a basic form of
> dynamic work rebalancing for all runners that decide to use it
>
> 1: https://github.com/apache/beam/pull/10045
> 2: https://github.com/apache/beam/pull/10065
> 3: https://github.com/apache/beam/pull/10074
>
> On Wed, Nov 20, 2019 at 10:49 AM Thomas Weise <t...@apache.org> wrote:
>
>> We found a problem with uneven utilization of SDK workers causing
>> excessive latency with Streaming/Python/Flink. Remember that with Python,
>> we need to execute multiple worker processes on a machine instead of
>> relying on threads in a single worker, which requires the runner to make a
>> decision to which worker to give a bundle for processing.
>>
>> The Flink runner has knobs to influence the number of records per bundle
>> and the maximum duration for a bundle. But since the runner does not
>> understand the cost of an individual record, it is possible that the
>> duration of bundles fluctuates significantly due to the skew in processing
>> time of individual records. And unless the bundle size is 1, multiple
>> expensive records could be allocated to a single bundle before the cutoff
>> time is reached. We notice this with a pipeline that executes models, but
>> there are other use cases where the cost of individual records can vary
>> significantly.
>>
>> Additionally, the Flink runner establishes the association between the
>> subtask managing an executable stage and the SDK worker during
>> initialization, lasting for the duration of the job. In other words,
>> bundles for the same executable stage will always be sent to the same SDK
>> worker. When the execution time skew is tied to specific keys (stateful
>> processing), it further aggravates the issue.
>>
>> I started experimenting with the ability to schedule bundles on any
>> available worker. Initially I'm trying a very basic approach, starting
>> processing of a bundle only on a free environment (one that does not
>> process any other bundle). This effectively removes the pipelining between
>> subtask and SDK worker. Potentially waiting for an available environment is
>> acceptable in this case, as the per bundle overhead is very small compared
>> to the per record cost.
>>
>> However, even if this suffices for the use case I'm looking at, this is
>> an area that will probably need more work going forward. Rather than the
>> runner guessing how to schedule bundles, I think that the best long term
>> solution would be SDF, where the user code can decide that something takes
>> too long and defer remaining work (and the runner can redistribute it).
>>
>> Curious if anyone else has run into this issue yet and what other ideas
>> there may be?
>>
>> Thanks,
>> Thomas
>>
>>

Reply via email to