On Wed, Nov 28, 2018 at 11:57 PM Lukasz Cwik <[email protected]> wrote: > > Re-adding [email protected] > [email protected] > > On Wed, Nov 28, 2018 at 2:23 PM Robert Bradshaw <[email protected]> wrote: >> >> Thanks for bringing this to the list. More below. >> >> On Wed, Nov 28, 2018 at 11:10 PM Kenneth Knowles <[email protected]> wrote: >>> >>> FWIW I deliberately limited the thread to not mix public and private lists, >>> so people intending private replies do not accidentally send to dev@beam. >>> >>> I've left them on this time, to avoid contradicting your action, but I >>> recommend removing them. >>> >>> Kenn >>> >>> On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik <[email protected]> wrote: >>>> >>>> Re-adding [email protected] >>>> [email protected] >>>> >>>> On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik <[email protected]> wrote: >>>>> >>>>> That is correct Kenn. An important point would be that SomeOtherCoder >>>>> would be given a seekable stream (instead of the forward only stream it >>>>> gets right now) so it can either decode all the data or lazily decode >>>>> parts as it needs to as in the case of an iterable coder when used to >>>>> support large iterables coming out of a GroupByKey. >>>>> >>>>> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles <[email protected]> wrote: >>>>>> >>>>>> Interesting! Having large iterables within rows would be great for the >>>>>> interactions between SQL and the core SDK's schema/Row support, and we >>>>>> weren't sure how that could work, exactly. >>>>>> >>>>>> My (very basic) understanding would be that >>>>>> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length >>>>>> followed by the encoding of SomeOtherCoder. >>>>>> >>>>>> So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder) >>>>>> has an encoding where it has a length followed by some number of bytes >>>>>> and if it ends with a special token (ignoring escaping issues) then you >>>>>> have to gather bytes from more messages in order to assemble a stream to >>>>>> send to SomeOtherCoder? Have I got what you mean? So this is a >>>>>> different, yet compatible, approach to sending over a special token that >>>>>> has to be looked up separately via the state read API? >>>>>> >>>>>> Kenn >>>>>> >>>>>> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <[email protected]> wrote: >>>>>>> >>>>>>> There is a discussion happening on a PR 7127[1] where Robert is working >>>>>>> on providing the first implementation for supporting large iterables >>>>>>> resulting from a GroupByKey. This is inline with the original proposal >>>>>>> for remote references over the Fn Data & State API[2]. >>>>>>> >>>>>>> I had thought about this issue more since the original write up was >>>>>>> done over a year ago and believe that we can simplify the >>>>>>> implementation by migrating the length prefix coder to be able to embed >>>>>>> a remote reference token at the end of the stream if the data is too >>>>>>> large. This allows any coder which supports lazy decoding to return a >>>>>>> view over a seekable stream instead of decoding all the data >>>>>>> (regardless whether all the data was sent or there is a state token >>>>>>> representing the remote reference). >>>>>>> >>>>>>> Allowing any arbitrary coder to support lazy decoding helps solve the >>>>>>> large iterable use case but also opens up the ability for types which >>>>>>> don't need to be fully decoded to provide lazy views. Imagine our Beam >>>>>>> rows using a format where only rows that are read are decoded while >>>>>>> everything else is left in its encoded form. >>>>>>> >>>>>>> I also originally thought that this could also help solve an issue >>>>>>> where large values[3] need to be chunked across multiple protobuf >>>>>>> messages over the Data API which complicates the reading side decoding >>>>>>> implementation since each SDK needs to provide an implementation that >>>>>>> blocks and waits for the next chunk to come across for the same logical >>>>>>> stream[4]. But there are issues with this because the runner may make a >>>>>>> bad coder choice such as iterable<length_prefix<blob>> (instead of >>>>>>> length_prefix<iterable<blob>>) which can lead to > 2gb of state keys if >>>>>>> there are many many values. >> >> >> Yes. I think this would need to be a separate coder than the length prefix >> coder. >> >> >>>>>>> >>>>>>> Robert, would implementing the length prefix coder being backed by >>>>>>> state + adding a lazy decoding method to the iterable coder be >>>>>>> significantly more complicated then what you are proposing right now? >> >> >> Yes, chopping things up at arbitrary byte boundaries (rather than element >> boundaries) tends to be significantly more subtle and complex (based on my >> experience with the data plane API). It would also require new public APIs >> for Coders. > > > After some further thought, I don't think we need to have a different API for > coders, its just that they get a different implementation for the inputstream > when decoding. So the logic would be: > public T decode(InputStream is) { > if (is instanceof SeekableInputStream) { > return view((SeekableInputStream) is); > } > return decodeInternal(is); > }
SeekableInputStream is a new API. If we went this route of re-using decode, it'd be an easy bug to accidentally pass a SeekableInputStream to component coders which wouldn't do the right thing. (Perhaps all coders would have to be modified?) And encoding is less obvious (e.g. a subclass of OutputStream that takes a callback for the rest of the bytes? As chosen by the caller or the callee?). >> This is why I went with the more restricted (but still by far most common, >> and quite straightforward) case of supporting arbitrarily large iterables >> (which can still occur at any level of nesting, e.g. inside rows), leaving >> the general case as future work. >> >> >>>>>>> >>>>>>> What do others think about coders supporting a "lazy" decode mode in >>>>>>> coders? >>>>>>> >>>>>>> 1: https://github.com/apache/beam/pull/7127 >>>>>>> 2: >>>>>>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50 >>>>>>> 3: >>>>>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0 >>>>>>> 4: >>>>>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf > > -- > You received this message because you are subscribed to the Google Groups > "DataPLS Unified Worker" group. > To unsubscribe from this group and stop receiving emails from it, send an > email to [email protected]. > To post to this group, send email to [email protected]. > To view this discussion on the web visit > https://groups.google.com/a/google.com/d/msgid/datapls-unified-worker/CAF9t7_4WamYNMEc9j1aTvaKMk%3DYLp-CA41AX3UQYH8OnSbz0kw%40mail.gmail.com.
