We found a problem with uneven utilization of SDK workers causing excessive
latency with Streaming/Python/Flink. Remember that with Python, we need to
execute multiple worker processes on a machine instead of relying on
threads in a single worker, which requires the runner to make a decision to
which worker to give a bundle for processing.

The Flink runner has knobs to influence the number of records per bundle
and the maximum duration for a bundle. But since the runner does not
understand the cost of an individual record, it is possible that the
duration of bundles fluctuates significantly due to the skew in processing
time of individual records. And unless the bundle size is 1, multiple
expensive records could be allocated to a single bundle before the cutoff
time is reached. We notice this with a pipeline that executes models, but
there are other use cases where the cost of individual records can vary
significantly.

Additionally, the Flink runner establishes the association between the
subtask managing an executable stage and the SDK worker during
initialization, lasting for the duration of the job. In other words,
bundles for the same executable stage will always be sent to the same SDK
worker. When the execution time skew is tied to specific keys (stateful
processing), it further aggravates the issue.

I started experimenting with the ability to schedule bundles on any
available worker. Initially I'm trying a very basic approach, starting
processing of a bundle only on a free environment (one that does not
process any other bundle). This effectively removes the pipelining between
subtask and SDK worker. Potentially waiting for an available environment is
acceptable in this case, as the per bundle overhead is very small compared
to the per record cost.

However, even if this suffices for the use case I'm looking at, this is an
area that will probably need more work going forward. Rather than the
runner guessing how to schedule bundles, I think that the best long term
solution would be SDF, where the user code can decide that something takes
too long and defer remaining work (and the runner can redistribute it).

Curious if anyone else has run into this issue yet and what other ideas
there may be?

Thanks,
Thomas

Reply via email to