Actually, I'm confused, in that example I linked, isn't it missing the part
that hooks up the element observable to the returned iterator?
ElementByteSizeObservableIterable does it in the implementation of
iterator() [1], but WindowReiterable overrides iterator.


[1]
https://github.com/apache/beam/blob/9a36490de8129359106baf37e1d0b071e19e303a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObservableIterable.java#L54

On Tue, Mar 22, 2022 at 3:07 PM Steve Niemitz <[email protected]> wrote:

> Oh that's interesting I didn't know it even had that optimization.  I
> wonder if implementing `ElementByteSizeObservableIterable` in TagIterable
> would be the solution then?  You're right this does seem very brittle
> though.
>
> It seems like GroupByKey does the "right" thing, WindowReiterable extends
> that magic interface [1]
>
> [1]
> https://github.com/apache/beam/blob/9a36490de8129359106baf37e1d0b071e19e303a/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/util/BatchGroupAlsoByWindowViaIteratorsFn.java#L156
>
> On Tue, Mar 22, 2022 at 2:41 PM Luke Cwik <[email protected]> wrote:
>
>> The same issue exists for the iterable that is output by a GroupByKey.
>>
>> We avoid loading all the data by using a byte size observer[1] that is
>> registered[1] which supports lazy iteration. The interaction pattern is
>> brittle in that an incorrect implementation will cause the entire iterable
>> to be walked when estimating and we will lose the lazy iteration benefit.
>>
>> 1:
>> https://github.com/apache/beam/blob/9a36490de8129359106baf37e1d0b071e19e303a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/common/ElementByteSizeObserver.java#L29
>> 2:
>> https://github.com/apache/beam/blob/9a36490de8129359106baf37e1d0b071e19e303a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/IterableLikeCoder.java#L192
>>
>>
>>
>> On Wed, Mar 16, 2022 at 4:09 PM Steve Niemitz <[email protected]>
>> wrote:
>>
>>> The thread a couple days ago about CoGroupByKey being possibly broken in
>>> beam 2.36.0 [1] had an interesting thing in it that had me thinking.  The
>>> CoGbkResultCoder doesn't override getEncodedElementByteSize (nor does it
>>> know how to actually compute the size in any efficient way), so sampling a
>>> CoGbkResult element size will require iterating over the entire result.
>>> This can be incredibly expensive, given that CoGbkResult objects can have
>>> an arbitrarily large number of elements (even more than that could fit into
>>> memory) and will require paging them in from shuffle.  These need to be all
>>> encoded, even if they never would have been to begin with (given that
>>> consumer of the CoGbkResult will fuse to the operation itself).
>>>
>>> Making CoGbkResultCoder.getEncodedElementByteSize simply return 0
>>> provided a significant performance boost in a few of the jobs I tested it
>>> with, and in fact fixed some jobs that had been failing due to OOM issues.
>>> The downside is obviously that this estimate will be incorrect now in the
>>> dataflow UI, but given that things like GBK are also incorrect, do we
>>> particularly care?
>>>
>>> Curious what the community thinks here, imo the benefit far
>>> outweighs the negative.
>>>
>>>
>>> [1] https://lists.apache.org/thread/5y56kbgm3q0m1byzf7186rrkomrcfldm
>>>
>>

Reply via email to