Re-adding +datapls-portability-t...@google.com <datapls-portability-t...@google.com> +datapls-unified-wor...@google.com <datapls-unified-wor...@google.com>
On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik <lc...@google.com> wrote: > That is correct Kenn. An important point would be that SomeOtherCoder > would be given a seekable stream (instead of the forward only stream it > gets right now) so it can either decode all the data or lazily decode parts > as it needs to as in the case of an iterable coder when used to support > large iterables coming out of a GroupByKey. > > On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles <k...@apache.org> wrote: > >> Interesting! Having large iterables within rows would be great for the >> interactions between SQL and the core SDK's schema/Row support, and we >> weren't sure how that could work, exactly. >> >> My (very basic) understanding would be that >> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length >> followed by the encoding of SomeOtherCoder. >> >> So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder) has >> an encoding where it has a length followed by some number of bytes and if >> it ends with a special token (ignoring escaping issues) then you have to >> gather bytes from more messages in order to assemble a stream to send to >> SomeOtherCoder? Have I got what you mean? So this is a different, yet >> compatible, approach to sending over a special token that has to be looked >> up separately via the state read API? >> >> Kenn >> >> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <lc...@google.com> wrote: >> >>> There is a discussion happening on a PR 7127[1] where Robert is working >>> on providing the first implementation for supporting large iterables >>> resulting from a GroupByKey. This is inline with the original proposal for >>> remote references over the Fn Data & State API[2]. >>> >>> I had thought about this issue more since the original write up was done >>> over a year ago and believe that we can simplify the implementation by >>> migrating the length prefix coder to be able to embed a remote reference >>> token at the end of the stream if the data is too large. This allows any >>> coder which supports lazy decoding to return a view over a seekable stream >>> instead of decoding all the data (regardless whether all the data was sent >>> or there is a state token representing the remote reference). >>> >>> Allowing any arbitrary coder to support lazy decoding helps solve the >>> large iterable use case but also opens up the ability for types which don't >>> need to be fully decoded to provide lazy views. Imagine our Beam rows using >>> a format where only rows that are read are decoded while everything else is >>> left in its encoded form. >>> >>> I also originally thought that this could also help solve an issue where >>> large values[3] need to be chunked across multiple protobuf messages over >>> the Data API which complicates the reading side decoding implementation >>> since each SDK needs to provide an implementation that blocks and waits for >>> the next chunk to come across for the same logical stream[4]. But there are >>> issues with this because the runner may make a bad coder choice such >>> as iterable<length_prefix<blob>> (instead of length_prefix<iterable<blob>>) >>> which can lead to > 2gb of state keys if there are many many values. >>> >>> Robert, would implementing the length prefix coder being backed by >>> state + adding a lazy decoding method to the iterable coder be >>> significantly more complicated then what you are proposing right now? >>> >>> What do others think about coders supporting a "lazy" decode mode in >>> coders? >>> >>> 1: https://github.com/apache/beam/pull/7127 >>> 2: >>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50 >>> 3: >>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0 >>> 4: >>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf >>> >>