Actually, I'm confused, in that example I linked, isn't it missing the part that hooks up the element observable to the returned iterator? ElementByteSizeObservableIterable does it in the implementation of iterator() [1], but WindowReiterable overrides iterator.
[1] https://github.com/apache/beam/blob/9a36490de8129359106baf37e1d0b071e19e303a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObservableIterable.java#L54 On Tue, Mar 22, 2022 at 3:07 PM Steve Niemitz <[email protected]> wrote: > Oh that's interesting I didn't know it even had that optimization. I > wonder if implementing `ElementByteSizeObservableIterable` in TagIterable > would be the solution then? You're right this does seem very brittle > though. > > It seems like GroupByKey does the "right" thing, WindowReiterable extends > that magic interface [1] > > [1] > https://github.com/apache/beam/blob/9a36490de8129359106baf37e1d0b071e19e303a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java#L156 > > On Tue, Mar 22, 2022 at 2:41 PM Luke Cwik <[email protected]> wrote: > >> The same issue exists for the iterable that is output by a GroupByKey. >> >> We avoid loading all the data by using a byte size observer[1] that is >> registered[1] which supports lazy iteration. The interaction pattern is >> brittle in that an incorrect implementation will cause the entire iterable >> to be walked when estimating and we will lose the lazy iteration benefit. >> >> 1: >> https://github.com/apache/beam/blob/9a36490de8129359106baf37e1d0b071e19e303a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java#L29 >> 2: >> https://github.com/apache/beam/blob/9a36490de8129359106baf37e1d0b071e19e303a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java#L192 >> >> >> >> On Wed, Mar 16, 2022 at 4:09 PM Steve Niemitz <[email protected]> >> wrote: >> >>> The thread a couple days ago about CoGroupByKey being possibly broken in >>> beam 2.36.0 [1] had an interesting thing in it that had me thinking. The >>> CoGbkResultCoder doesn't override getEncodedElementByteSize (nor does it >>> know how to actually compute the size in any efficient way), so sampling a >>> CoGbkResult element size will require iterating over the entire result. >>> This can be incredibly expensive, given that CoGbkResult objects can have >>> an arbitrarily large number of elements (even more than that could fit into >>> memory) and will require paging them in from shuffle. These need to be all >>> encoded, even if they never would have been to begin with (given that >>> consumer of the CoGbkResult will fuse to the operation itself). >>> >>> Making CoGbkResultCoder.getEncodedElementByteSize simply return 0 >>> provided a significant performance boost in a few of the jobs I tested it >>> with, and in fact fixed some jobs that had been failing due to OOM issues. >>> The downside is obviously that this estimate will be incorrect now in the >>> dataflow UI, but given that things like GBK are also incorrect, do we >>> particularly care? >>> >>> Curious what the community thinks here, imo the benefit far >>> outweighs the negative. >>> >>> >>> [1] https://lists.apache.org/thread/5y56kbgm3q0m1byzf7186rrkomrcfldm >>> >>
