Oh that's interesting I didn't know it even had that optimization. I wonder if implementing `ElementByteSizeObservableIterable` in TagIterable would be the solution then? You're right this does seem very brittle though.
It seems like GroupByKey does the "right" thing, WindowReiterable extends that magic interface [1] [1] https://github.com/apache/beam/blob/9a36490de8129359106baf37e1d0b071e19e303a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java#L156 On Tue, Mar 22, 2022 at 2:41 PM Luke Cwik <[email protected]> wrote: > The same issue exists for the iterable that is output by a GroupByKey. > > We avoid loading all the data by using a byte size observer[1] that is > registered[1] which supports lazy iteration. The interaction pattern is > brittle in that an incorrect implementation will cause the entire iterable > to be walked when estimating and we will lose the lazy iteration benefit. > > 1: > https://github.com/apache/beam/blob/9a36490de8129359106baf37e1d0b071e19e303a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java#L29 > 2: > https://github.com/apache/beam/blob/9a36490de8129359106baf37e1d0b071e19e303a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java#L192 > > > > On Wed, Mar 16, 2022 at 4:09 PM Steve Niemitz <[email protected]> wrote: > >> The thread a couple days ago about CoGroupByKey being possibly broken in >> beam 2.36.0 [1] had an interesting thing in it that had me thinking. The >> CoGbkResultCoder doesn't override getEncodedElementByteSize (nor does it >> know how to actually compute the size in any efficient way), so sampling a >> CoGbkResult element size will require iterating over the entire result. >> This can be incredibly expensive, given that CoGbkResult objects can have >> an arbitrarily large number of elements (even more than that could fit into >> memory) and will require paging them in from shuffle. These need to be all >> encoded, even if they never would have been to begin with (given that >> consumer of the CoGbkResult will fuse to the operation itself). >> >> Making CoGbkResultCoder.getEncodedElementByteSize simply return 0 >> provided a significant performance boost in a few of the jobs I tested it >> with, and in fact fixed some jobs that had been failing due to OOM issues. >> The downside is obviously that this estimate will be incorrect now in the >> dataflow UI, but given that things like GBK are also incorrect, do we >> particularly care? >> >> Curious what the community thinks here, imo the benefit far outweighs the >> negative. >> >> >> [1] https://lists.apache.org/thread/5y56kbgm3q0m1byzf7186rrkomrcfldm >> >
