Interesting! Having large iterables within rows would be great for the
interactions between SQL and the core SDK's schema/Row support, and we
weren't sure how that could work, exactly.

My (very basic) understanding would be that
LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length
followed by the encoding of SomeOtherCoder.

So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder) has
an encoding where it has a length followed by some number of bytes and if
it ends with a special token (ignoring escaping issues) then you have to
gather bytes from more messages in order to assemble a stream to send to
SomeOtherCoder? Have I got what you mean? So this is a different, yet
compatible, approach to sending over a special token that has to be looked
up separately via the state read API?

Kenn

On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <lc...@google.com> wrote:

> There is a discussion happening on a PR 7127[1] where Robert is working on
> providing the first implementation for supporting large iterables resulting
> from a GroupByKey. This is inline with the original proposal for remote
> references over the Fn Data & State API[2].
>
> I had thought about this issue more since the original write up was done
> over a year ago and believe that we can simplify the implementation by
> migrating the length prefix coder to be able to embed a remote reference
> token at the end of the stream if the data is too large. This allows any
> coder which supports lazy decoding to return a view over a seekable stream
> instead of decoding all the data (regardless whether all the data was sent
> or there is a state token representing the remote reference).
>
> Allowing any arbitrary coder to support lazy decoding helps solve the
> large iterable use case but also opens up the ability for types which don't
> need to be fully decoded to provide lazy views. Imagine our Beam rows using
> a format where only rows that are read are decoded while everything else is
> left in its encoded form.
>
> I also originally thought that this could also help solve an issue where
> large values[3] need to be chunked across multiple protobuf messages over
> the Data API which complicates the reading side decoding implementation
> since each SDK needs to provide an implementation that blocks and waits for
> the next chunk to come across for the same logical stream[4]. But there are
> issues with this because the runner may make a bad coder choice such
> as iterable<length_prefix<blob>> (instead of length_prefix<iterable<blob>>)
> which can lead to > 2gb of state keys if there are many many values.
>
> Robert, would implementing the length prefix coder being backed by state +
> adding a lazy decoding method to the iterable coder be significantly more
> complicated then what you are proposing right now?
>
> What do others think about coders supporting a "lazy" decode mode in
> coders?
>
> 1: https://github.com/apache/beam/pull/7127
> 2:
> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
> 3:
> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
> 4:
> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf
>

Reply via email to