On Mon, Jan 27, 2025 at 1:00 PM Joey Tran <[email protected]> wrote:
>
> I heard mention that there is a flatten unzipping optimization implemented by
> some runners. I didn't see that in the python optimizations in
> translations.py[1]. Just curious what this optimization is?
It's done to increase the possibility of fusion. Suppose one has
... --> DoFnA \
--> Flatten --> DoFnC -> ...
... --> DoFnB /
When determining the physical execution plan, one can re-write this as
... --> DoFnA --> DoFnC \
--> Flatten --> ...
... --> DoFnB --> DoFnC /
which permits fusion into stages (DoFnA+DoFnC) and (DoFnB+DoFnC).
One can progressively do this up to the point that the consumer of the
flatten already requires materialization that permits multiple inputs
(e.g. writing to a shuffle/grouping operation).
> I think I get the general gist in that you dont necessarily need to combine
> the input pcollections to a flatten and instead you can just apply
> non-aggregating consuming transforms to all input pcollections, but when is a
> good time to do that? Do runners that implement this optimization always
> apply this to all flattens?
Pretty much whenever they can, though there are limitations (e.g. if
DoFnC is stateful). I think it depends on the internal implementation
of the runner whether this makes sense.
> Cheers,
> Joey
>
> [1]
> https://github.com/apache/beam/blob/72102b5985b3a13c4a4c3949bf23d129c3999827/sdks/python/apache_beam/runners/portability/fn_api_runner/translations.py