On Wed, Nov 28, 2018 at 11:57 PM Lukasz Cwik <[email protected]> wrote:
>
> Re-adding [email protected] 
> [email protected]
>
> On Wed, Nov 28, 2018 at 2:23 PM Robert Bradshaw <[email protected]> wrote:
>>
>> Thanks for bringing this to the list. More below.
>>
>> On Wed, Nov 28, 2018 at 11:10 PM Kenneth Knowles <[email protected]> wrote:
>>>
>>> FWIW I deliberately limited the thread to not mix public and private lists, 
>>> so people intending private replies do not accidentally send to dev@beam.
>>>
>>> I've left them on this time, to avoid contradicting your action, but I 
>>> recommend removing them.
>>>
>>> Kenn
>>>
>>> On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik <[email protected]> wrote:
>>>>
>>>> Re-adding [email protected] 
>>>> [email protected]
>>>>
>>>> On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik <[email protected]> wrote:
>>>>>
>>>>> That is correct Kenn. An important point would be that SomeOtherCoder 
>>>>> would be given a seekable stream (instead of the forward only stream it 
>>>>> gets right now) so it can either decode all the data or lazily decode 
>>>>> parts as it needs to as in the case of an iterable coder when used to 
>>>>> support large iterables coming out of a GroupByKey.
>>>>>
>>>>> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles <[email protected]> wrote:
>>>>>>
>>>>>> Interesting! Having large iterables within rows would be great for the 
>>>>>> interactions between SQL and the core SDK's schema/Row support, and we 
>>>>>> weren't sure how that could work, exactly.
>>>>>>
>>>>>> My (very basic) understanding would be that 
>>>>>> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a length 
>>>>>> followed by the encoding of SomeOtherCoder.
>>>>>>
>>>>>> So the new proposal would be that LengthPrefixedCoder(SomeOtherCoder) 
>>>>>> has an encoding where it has a length followed by some number of bytes 
>>>>>> and if it ends with a special token (ignoring escaping issues) then you 
>>>>>> have to gather bytes from more messages in order to assemble a stream to 
>>>>>> send to SomeOtherCoder? Have I got what you mean? So this is a 
>>>>>> different, yet compatible, approach to sending over a special token that 
>>>>>> has to be looked up separately via the state read API?
>>>>>>
>>>>>> Kenn
>>>>>>
>>>>>> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <[email protected]> wrote:
>>>>>>>
>>>>>>> There is a discussion happening on a PR 7127[1] where Robert is working 
>>>>>>> on providing the first implementation for supporting large iterables 
>>>>>>> resulting from a GroupByKey. This is inline with the original proposal 
>>>>>>> for remote references over the Fn Data & State API[2].
>>>>>>>
>>>>>>> I had thought about this issue more since the original write up was 
>>>>>>> done over a year ago and believe that we can simplify the 
>>>>>>> implementation by migrating the length prefix coder to be able to embed 
>>>>>>> a remote reference token at the end of the stream if the data is too 
>>>>>>> large. This allows any coder which supports lazy decoding to return a 
>>>>>>> view over a seekable stream instead of decoding all the data 
>>>>>>> (regardless whether all the data was sent or there is a state token 
>>>>>>> representing the remote reference).
>>>>>>>
>>>>>>> Allowing any arbitrary coder to support lazy decoding helps solve the 
>>>>>>> large iterable use case but also opens up the ability for types which 
>>>>>>> don't need to be fully decoded to provide lazy views. Imagine our Beam 
>>>>>>> rows using a format where only rows that are read are decoded while 
>>>>>>> everything else is left in its encoded form.
>>>>>>>
>>>>>>> I also originally thought that this could also help solve an issue 
>>>>>>> where large values[3] need to be chunked across multiple protobuf 
>>>>>>> messages over the Data API which complicates the reading side decoding 
>>>>>>> implementation since each SDK needs to provide an implementation that 
>>>>>>> blocks and waits for the next chunk to come across for the same logical 
>>>>>>> stream[4]. But there are issues with this because the runner may make a 
>>>>>>> bad coder choice such as iterable<length_prefix<blob>> (instead of 
>>>>>>> length_prefix<iterable<blob>>) which can lead to > 2gb of state keys if 
>>>>>>> there are many many values.
>>
>>
>> Yes. I think this would need to be a separate coder than the length prefix 
>> coder.
>>
>>
>>>>>>>
>>>>>>> Robert, would implementing the length prefix coder being backed by 
>>>>>>> state + adding a lazy decoding method to the iterable coder be 
>>>>>>> significantly more complicated then what you are proposing right now?
>>
>>
>> Yes, chopping things up at arbitrary byte boundaries (rather than element 
>> boundaries) tends to be significantly more subtle and complex (based on my 
>> experience with the data plane API). It would also require new public APIs 
>> for Coders.
>
>
> After some further thought, I don't think we need to have a different API for 
> coders, its just that they get a different implementation for the inputstream 
> when decoding. So the logic would be:
> public T decode(InputStream is) {
>   if (is instanceof SeekableInputStream) {
>     return view((SeekableInputStream) is);
>   }
>   return decodeInternal(is);
> }

SeekableInputStream is a new API. If we went this route of re-using
decode, it'd be an easy bug to accidentally pass a SeekableInputStream
to component coders which wouldn't do the right thing. (Perhaps all
coders would have to be modified?) And encoding is less obvious (e.g.
a subclass of OutputStream that takes a callback for the rest of the
bytes? As chosen by the caller or the callee?).

>> This is why I went with the more restricted (but still by far most common, 
>> and quite straightforward) case of supporting arbitrarily large iterables 
>> (which can still occur at any level of nesting, e.g. inside rows), leaving 
>> the general case as future work.
>>
>>
>>>>>>>
>>>>>>> What do others think about coders supporting a "lazy" decode mode in 
>>>>>>> coders?
>>>>>>>
>>>>>>> 1: https://github.com/apache/beam/pull/7127
>>>>>>> 2: 
>>>>>>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
>>>>>>> 3: 
>>>>>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
>>>>>>> 4: 
>>>>>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf
>
> --
> You received this message because you are subscribed to the Google Groups 
> "DataPLS Unified Worker" group.
> To unsubscribe from this group and stop receiving emails from it, send an 
> email to [email protected].
> To post to this group, send email to [email protected].
> To view this discussion on the web visit 
> https://groups.google.com/a/google.com/d/msgid/datapls-unified-worker/CAF9t7_4WamYNMEc9j1aTvaKMk%3DYLp-CA41AX3UQYH8OnSbz0kw%40mail.gmail.com.

Reply via email to