Hi Beam devs, I've been exploring recently how to optimize IO bound steps for my Python Beam pipelines, and have come up with a solution that I think might make sense to upstream into Beam's Python SDK.
It appears that Beam runners (at least the Cloud Dataflow runner) typically use only a single thread per Python process. The number of threads per worker can be adjusted with flags, but only for the entire pipeline. This behavior makes sense *in general* under the worst-case assumption that user-code in Python is CPU bound and requires the GIL. However, multiple threads can be quite helpful in many cases, e.g., 1. CPU bound tasks that release the GIL. This is typically the case when using libraries for numerical computing, such as NumPy and pandas. 2. IO bound tasks that can be run asynchronously, e.g., reading/writing files or RPCs. This is the use-case for which not using threads can be most problematic, e.g., in a recent dataflow pipeline reading/writing lots of relatively small files (~1-10 MB) to cloud storage with the default number of threads per worker, I found that I was only using ~20% of available CPU. Because the optimal number of threads for Python code can be quite heterogeneous, I would like to be able to indicate that particular steps of my Beam pipelines should be executed using more threads. This would be particularly valuable for writing libraries of custom IO transforms, which should still conservatively assume that *other* steps in user provided pipelines may be CPU bound. The solution I've come up with is to use beam.BatchElements with a ParDo function that executes tasks in separate threads (via concurrent.futures.ThreadPool). I've used this to make high-level wrappers like beam.Map, beam.MapTuple, etc that execute with multiple threads. This seems to work pretty well for my use-cases. I can put these in my own library, of course, but perhaps these would make sense upstream into Beam's Python SDK itself? One alternative would be supporting this sort of concurrency control inside Beam runners. In principle, I imagine runners could tune thread-pool size for each stage automatically, e.g., based on CPU usage. To be honest, I'm a little surprised this doesn't happen already, but I'm sure there are good reasons why not. Let me know what you think! Cheers, Stephan