Hi Beam devs,

I've been exploring recently how to optimize IO bound steps for my Python
Beam pipelines, and have come up with a solution that I think might make
sense to upstream into Beam's Python SDK.

It appears that Beam runners (at least the Cloud Dataflow runner) typically
use only a single thread per Python process. The number of threads per
worker can be adjusted with flags, but only for the entire pipeline. This
behavior makes sense *in general* under the worst-case assumption that
user-code in Python is CPU bound and requires the GIL.

However, multiple threads can be quite helpful in many cases, e.g.,
1. CPU bound tasks that release the GIL. This is typically the case when
using libraries for numerical computing, such as NumPy and pandas.
2. IO bound tasks that can be run asynchronously, e.g., reading/writing
files or RPCs. This is the use-case for which not using threads can be most
problematic, e.g., in a recent dataflow pipeline reading/writing lots of
relatively small files (~1-10 MB) to cloud storage with the default number
of threads per worker, I found that I was only using ~20% of available CPU.

Because the optimal number of threads for Python code can be quite
heterogeneous, I would like to be able to indicate that particular steps of
my Beam pipelines should be executed using more threads. This would be
particularly valuable for writing libraries of custom IO transforms, which
should still conservatively assume that *other* steps in user provided
pipelines may be CPU bound.

The solution I've come up with is to use beam.BatchElements with a ParDo
function that executes tasks in separate threads (via
concurrent.futures.ThreadPool). I've used this to make high-level wrappers
like beam.Map, beam.MapTuple, etc that execute with multiple threads. This
seems to work pretty well for my use-cases. I can put these in my own
library, of course, but perhaps these would make sense upstream into Beam's
Python SDK itself?

One alternative would be supporting this sort of concurrency control inside
Beam runners. In principle, I imagine runners could tune thread-pool size
for each stage automatically, e.g., based on CPU usage. To be honest, I'm a
little surprised this doesn't happen already, but I'm sure there are good
reasons why not.

Let me know what you think!

Cheers,
Stephan

Reply via email to