Load-balancing the worker selection for bundle execution sounds like the solution to uneven work distribution across the workers. Some comments:

(1) I could imagine that in case of long-running bundle execution (e.g. model execution), this could stall upstream operators because their busy downstream operators hold all available workers, thus also letting the pipeline throughput/latency suffer.

Instead of balancing across _all_ the workers available on particular node (aka TaskManager), it could make sense to just increase the share of SDK workers for a particular executable stage. At the moment, each stage just receives a single worker. Instead, it could receive a higher share of workers, which could either be exclusive or overlap with a share of another executable stage. Essentially, this is an extension to what you are proposing to ensure stages make progress.

(2) Another concern is that load balancing across multiple worker instances would render state caching useless. We need to make the Runner aware of it such that it can turn off state caching. With the approach of multiple workers per stage in (1), it would also be possible to keep the state caching, if we divided the key range across the workers.

Cheers,
Max

On 23.11.19 18:42, Thomas Weise wrote:
JIRA: https://issues.apache.org/jira/browse/BEAM-8816


On Thu, Nov 21, 2019 at 10:44 AM Thomas Weise <t...@apache.org <mailto:t...@apache.org>> wrote:

    Hi Luke,

    Thanks for the background and it is exciting to see the progress on
    the SDF side. It will help with this use case and many other
    challenges. I imagine the Python user code would be able to
    determine that it is bogged down with high latency record processing
    (based on the duration it actually took to process previous records)
    and opt to send back remaining work to the runner.

    Until the Flink runner supports reassignment of work, I'm planning
    to implement the simple bundle distribution approach referred to
    before. We will test it in our environment and contribute it back if
    the results are good.

    Thomas



    On Wed, Nov 20, 2019 at 11:34 AM Luke Cwik <lc...@google.com
    <mailto:lc...@google.com>> wrote:

        Dataflow has run into this issue as well. Dataflow has "work
        items" that are converted into bundles that are executed on the
        SDK. Each work item does a greedy assignment to the SDK worker
        with the fewest work items assigned. As you surmised, we use SDF
        splitting in batch pipelines to balance work. We would like to
        use splitting of SDFs in streaming pipelines as well but
        Dataflow can't handle it as of right now.

        As part of a few PRs, I have added basic SDF expansion to the
        shared runner lib and slowly exposed the runner side hooks[2, 3]
        for SDK initiated checkpointing and bundle finalization. There
        are still a few pieces left:
        * exposing an API so the bundle can be split during execution
        * adding the limited depth splitting logic that would add a
        basic form of dynamic work rebalancing for all runners that
        decide to use it

        1: https://github.com/apache/beam/pull/10045
        2: https://github.com/apache/beam/pull/10065
        3: https://github.com/apache/beam/pull/10074

        On Wed, Nov 20, 2019 at 10:49 AM Thomas Weise <t...@apache.org
        <mailto:t...@apache.org>> wrote:

            We found a problem with uneven utilization of SDK workers
            causing excessive latency with Streaming/Python/Flink.
            Remember that with Python, we need to execute multiple
            worker processes on a machine instead of relying on threads
            in a single worker, which requires the runner to make a
            decision to which worker to give a bundle for processing.

            The Flink runner has knobs to influence the number of
            records per bundle and the maximum duration for a bundle.
            But since the runner does not understand the cost of an
            individual record, it is possible that the duration of
            bundles fluctuates significantly due to the skew in
            processing time of individual records. And unless the bundle
            size is 1, multiple expensive records could be allocated to
            a single bundle before the cutoff time is reached. We notice
            this with a pipeline that executes models, but there are
            other use cases where the cost of individual records can
            vary significantly.

            Additionally, the Flink runner establishes the association
            between the subtask managing an executable stage and the SDK
            worker during initialization, lasting for the duration of
            the job. In other words, bundles for the same executable
            stage will always be sent to the same SDK worker. When the
            execution time skew is tied to specific keys (stateful
            processing), it further aggravates the issue.

            I started experimenting with the ability to schedule bundles
            on any available worker. Initially I'm trying a very basic
            approach, starting processing of a bundle only on a free
            environment (one that does not process any other bundle).
            This effectively removes the pipelining between subtask and
            SDK worker. Potentially waiting for an available environment
            is acceptable in this case, as the per bundle overhead is
            very small compared to the per record cost.

            However, even if this suffices for the use case I'm looking
            at, this is an area that will probably need more work going
            forward. Rather than the runner guessing how to schedule
            bundles, I think that the best long term solution would be
            SDF, where the user code can decide that something takes too
            long and defer remaining work (and the runner can
            redistribute it).

            Curious if anyone else has run into this issue yet and what
            other ideas there may be?

            Thanks,
            Thomas

Reply via email to