JIRA: https://issues.apache.org/jira/browse/BEAM-8816


On Thu, Nov 21, 2019 at 10:44 AM Thomas Weise <t...@apache.org> wrote:

> Hi Luke,
>
> Thanks for the background and it is exciting to see the progress on the
> SDF side. It will help with this use case and many other challenges. I
> imagine the Python user code would be able to determine that it is bogged
> down with high latency record processing (based on the duration it actually
> took to process previous records) and opt to send back remaining work to
> the runner.
>
> Until the Flink runner supports reassignment of work, I'm planning to
> implement the simple bundle distribution approach referred to before. We
> will test it in our environment and contribute it back if the results are
> good.
>
> Thomas
>
>
>
> On Wed, Nov 20, 2019 at 11:34 AM Luke Cwik <lc...@google.com> wrote:
>
>> Dataflow has run into this issue as well. Dataflow has "work items" that
>> are converted into bundles that are executed on the SDK. Each work item
>> does a greedy assignment to the SDK worker with the fewest work items
>> assigned. As you surmised, we use SDF splitting in batch pipelines to
>> balance work. We would like to use splitting of SDFs in streaming pipelines
>> as well but Dataflow can't handle it as of right now.
>>
>> As part of a few PRs, I have added basic SDF expansion to the shared
>> runner lib and slowly exposed the runner side hooks[2, 3] for SDK initiated
>> checkpointing and bundle finalization. There are still a few pieces left:
>> * exposing an API so the bundle can be split during execution
>> * adding the limited depth splitting logic that would add a basic form of
>> dynamic work rebalancing for all runners that decide to use it
>>
>> 1: https://github.com/apache/beam/pull/10045
>> 2: https://github.com/apache/beam/pull/10065
>> 3: https://github.com/apache/beam/pull/10074
>>
>> On Wed, Nov 20, 2019 at 10:49 AM Thomas Weise <t...@apache.org> wrote:
>>
>>> We found a problem with uneven utilization of SDK workers causing
>>> excessive latency with Streaming/Python/Flink. Remember that with Python,
>>> we need to execute multiple worker processes on a machine instead of
>>> relying on threads in a single worker, which requires the runner to make a
>>> decision to which worker to give a bundle for processing.
>>>
>>> The Flink runner has knobs to influence the number of records per bundle
>>> and the maximum duration for a bundle. But since the runner does not
>>> understand the cost of an individual record, it is possible that the
>>> duration of bundles fluctuates significantly due to the skew in processing
>>> time of individual records. And unless the bundle size is 1, multiple
>>> expensive records could be allocated to a single bundle before the cutoff
>>> time is reached. We notice this with a pipeline that executes models, but
>>> there are other use cases where the cost of individual records can vary
>>> significantly.
>>>
>>> Additionally, the Flink runner establishes the association between the
>>> subtask managing an executable stage and the SDK worker during
>>> initialization, lasting for the duration of the job. In other words,
>>> bundles for the same executable stage will always be sent to the same SDK
>>> worker. When the execution time skew is tied to specific keys (stateful
>>> processing), it further aggravates the issue.
>>>
>>> I started experimenting with the ability to schedule bundles on any
>>> available worker. Initially I'm trying a very basic approach, starting
>>> processing of a bundle only on a free environment (one that does not
>>> process any other bundle). This effectively removes the pipelining between
>>> subtask and SDK worker. Potentially waiting for an available environment is
>>> acceptable in this case, as the per bundle overhead is very small compared
>>> to the per record cost.
>>>
>>> However, even if this suffices for the use case I'm looking at, this is
>>> an area that will probably need more work going forward. Rather than the
>>> runner guessing how to schedule bundles, I think that the best long term
>>> solution would be SDF, where the user code can decide that something takes
>>> too long and defer remaining work (and the runner can redistribute it).
>>>
>>> Curious if anyone else has run into this issue yet and what other ideas
>>> there may be?
>>>
>>> Thanks,
>>> Thomas
>>>
>>>

Reply via email to