On Thu, Nov 29, 2018 at 7:08 PM Lukasz Cwik <lc...@google.com> wrote:
>
> On Thu, Nov 29, 2018 at 7:13 AM Robert Bradshaw <rober...@google.com> wrote:
>>
>> On Thu, Nov 29, 2018 at 2:18 AM Lukasz Cwik <lc...@google.com> wrote:
>> >
>> > I don't believe we would need to change any other coders since 
>> > SeekableInputStream wouldn't change how a regular InputStream would work 
>> > so coders that don't care about the implementation would still use it as a 
>> > forward only input stream. Coders that care about seeking would use the 
>> > new functionality.
>>
>> An API could be developed that makes this work, but the proposal of
>>
>> class SmartCoder<T> {
>>   public T decode(InputStream is) {
>>     if (is instanceof SeekableInputStream) {
>>       return view((SeekableInputStream) is);
>>     }
>>     return decodeInternal(is);
>>   }
>> }
>>
>> would break it passed to (just as an example) the unmodified KV coder
>>
>> class KvCoder<K, V> {
>>   public Kv<K, V> decode(InputStream is) {
>>     return Kv.of(keyCoder.decode(is), valueCoder.decode(is));
>>   }
>> }
>>
>> when invoked with an InputStream that happens to be a
>> SeekableInputStream if either keyCoder or valueCoder were instances of
>> SmartCoder unless SmartCoder.view() did something really clever about
>> advancing the provided stream the right amount without actually
>> consuming it. This is particularly expensive/tricky for iterables
>> where it's most useful.
>
>
> Thanks for walking through this with me Robert.
>
> The issue is that the view needs to advance the stream if it wants to decode 
> the components separately, this works naturally for the iterable coder since 
> all decoding is done in order so that advances the stream automatically and 
> for any component coder where it also supports being a view. For any coder 
> that isn't advancing the stream in order has to have an index as part of its 
> encoding. Using the KV coder as the example, the two strategies would be as 
> follows:
>
> decode method is the same for both strategies
> public KV<K, V> decode(InputStream is) {
>   if (is instanceof SeekableInputStream) {
>     return KVView((SeekableInputStream) is, keyCoder, valueCoder);
>   }
>   return Kv.of(keyCoder.decode(is), valueCoder.decode(is));
> }
>
> forward only view decoding:
> class KVView<K, V> extends KV<K, V> {
>   K getKey() {
>     if (!keyDecoded) {
>       key = keyCoder.decode(is);
>     }
>     return key;
>   }
>
>   V getValue() {
>     // ensures the input stream has advanced to the value position
>     getKey();
>
>     if (!valueDecoded) {
>      value = valueCoder.decode(is);
>     }
>     return value;
> }
>
> index based decoding:
> class KVView<K, V> extends KV<K, V> {
>   KVView(SeekableInputStream is, Coder<K> keyCoder, Coder<V> valueCoder) {
>     valueOffset = readBigEndianInt(is);
>     // ...
>   }
>   K getKey() {
>     if (!keyDecoded) {
>       is.seek(4);  // 4 bytes for big int index
>       key = keyCoder.decode(is);
>     }
>     return key;
>   }
>
>   V getValue() {
>     if (!valueDecoded) {
>      is.seek(valueOffset);
>      value = valueCoder.decode(is);
>     }
>     return value;
> }
>
> I believe for the KV case and the iterable case we will find that our coders 
> are typically KV<LengthPrefix<Key>, LengthPrefix<Value>> and 
> Iterable<LengthPrefix<Value>> which would mean that a smart coder could 
> inspect the component coder and if its a length prefix coder, ask it to seek 
> to the end of its value within the input stream which mean that a smart coder 
> could understand the length of its components.

I understand how KV coder can be made smart. My concern is the
difficulty of having dumb coders with smart coder components. E.g.
imagine a row coder

class DumbRowCoder {
  Row decode(InputStream is) {
    List<Object> parts = ...
    for (Coder c : componentCoders) {
      // Smart coders *must* advance the inputs stream in case the
      // subsequent coder is also dumb.
      // Efficient seek will require more than continuation tokens
over the FnAPI.
      // Important ones like iterable are likely to be lazily written,
and so won't know
      // their length when they start encoding, but iterating it to
discover the length
      // defeats much of the goal of being lazy.
      parts.add(c.decode(is));
    }
  }
}

>> > For the encoding portion, the state backed length prefix coder would send 
>> > the small snippet of data that it received plus the state key without 
>> > invoking the component coder to encode the value. The downstream receiving 
>> > party would need to lookup the remote reference to get all the data.
>>
>> I'm trying to follow what you're saying here. Are you restricting to
>> the case of only encoding something that was formerly decoded with a
>> state backed length prefix coder (and keeps the unencoded bytes
>> around)?
>
> Yes.
>
>> It'd be good to support writing novel values lazily as well.
>> Also, this brings up the issue of how to manage the lifetime of remote
>> references if they can be extended in this way.
>
> I don't know of any data processing system that currently handles this and am 
> unsure if this is a problem in practice for many people. If someone gets a 
> giant string iterable and naively transforms it, for example by concatenating 
> all the strings, their worker will crash.  Only solution I have seen is that 
> people don't naively transform the value but instead pass around a view over 
> the value that applies the transform as needed.

I was thinking primarily of the case where the runner has such
arbitrarily long iterables from a GBK and wants to encode them and
send them to the SDK.

> Extending remote references to also be able to be created by SDKs is an 
> interesting idea that I had explored a tiny bit in the past but then dropped 
> it due to time constraints and for the fact that we had much more immediate 
> things we wanted to get implemented.

I think we're still in that boat :).

>> > All other coders would not be lazy and would have to encode the entire 
>> > lazy view, this could be done by optimized by copying the 
>> > SeekableInputStream to the OutputStream. Note that the length prefix coder 
>> > is never used with IOs and hence those IOs could be given a type like 
>> > Iterable<Foo> which is lazy, but the encoding for that wouldn't be lazy
>>
>> Yes, that's how it is now.
>>
>> > and would output all the data from the SeekableInputStream.
>> >
>> >
>> > On Wed, Nov 28, 2018 at 3:08 PM Robert Bradshaw <rober...@google.com> 
>> > wrote:
>> >>
>> >> On Wed, Nov 28, 2018 at 11:57 PM Lukasz Cwik <lc...@google.com> wrote:
>> >> >
>> >> > Re-adding +datapls-portability-t...@google.com 
>> >> > +datapls-unified-wor...@google.com
>> >> >
>> >> > On Wed, Nov 28, 2018 at 2:23 PM Robert Bradshaw <rober...@google.com> 
>> >> > wrote:
>> >> >>
>> >> >> Thanks for bringing this to the list. More below.
>> >> >>
>> >> >> On Wed, Nov 28, 2018 at 11:10 PM Kenneth Knowles <k...@apache.org> 
>> >> >> wrote:
>> >> >>>
>> >> >>> FWIW I deliberately limited the thread to not mix public and private 
>> >> >>> lists, so people intending private replies do not accidentally send 
>> >> >>> to dev@beam.
>> >> >>>
>> >> >>> I've left them on this time, to avoid contradicting your action, but 
>> >> >>> I recommend removing them.
>> >> >>>
>> >> >>> Kenn
>> >> >>>
>> >> >>> On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik <lc...@google.com> wrote:
>> >> >>>>
>> >> >>>> Re-adding +datapls-portability-t...@google.com 
>> >> >>>> +datapls-unified-wor...@google.com
>> >> >>>>
>> >> >>>> On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik <lc...@google.com> 
>> >> >>>> wrote:
>> >> >>>>>
>> >> >>>>> That is correct Kenn. An important point would be that 
>> >> >>>>> SomeOtherCoder would be given a seekable stream (instead of the 
>> >> >>>>> forward only stream it gets right now) so it can either decode all 
>> >> >>>>> the data or lazily decode parts as it needs to as in the case of an 
>> >> >>>>> iterable coder when used to support large iterables coming out of a 
>> >> >>>>> GroupByKey.
>> >> >>>>>
>> >> >>>>> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles <k...@apache.org> 
>> >> >>>>> wrote:
>> >> >>>>>>
>> >> >>>>>> Interesting! Having large iterables within rows would be great for 
>> >> >>>>>> the interactions between SQL and the core SDK's schema/Row 
>> >> >>>>>> support, and we weren't sure how that could work, exactly.
>> >> >>>>>>
>> >> >>>>>> My (very basic) understanding would be that 
>> >> >>>>>> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a 
>> >> >>>>>> length followed by the encoding of SomeOtherCoder.
>> >> >>>>>>
>> >> >>>>>> So the new proposal would be that 
>> >> >>>>>> LengthPrefixedCoder(SomeOtherCoder) has an encoding where it has a 
>> >> >>>>>> length followed by some number of bytes and if it ends with a 
>> >> >>>>>> special token (ignoring escaping issues) then you have to gather 
>> >> >>>>>> bytes from more messages in order to assemble a stream to send to 
>> >> >>>>>> SomeOtherCoder? Have I got what you mean? So this is a different, 
>> >> >>>>>> yet compatible, approach to sending over a special token that has 
>> >> >>>>>> to be looked up separately via the state read API?
>> >> >>>>>>
>> >> >>>>>> Kenn
>> >> >>>>>>
>> >> >>>>>> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <lc...@google.com> 
>> >> >>>>>> wrote:
>> >> >>>>>>>
>> >> >>>>>>> There is a discussion happening on a PR 7127[1] where Robert is 
>> >> >>>>>>> working on providing the first implementation for supporting 
>> >> >>>>>>> large iterables resulting from a GroupByKey. This is inline with 
>> >> >>>>>>> the original proposal for remote references over the Fn Data & 
>> >> >>>>>>> State API[2].
>> >> >>>>>>>
>> >> >>>>>>> I had thought about this issue more since the original write up 
>> >> >>>>>>> was done over a year ago and believe that we can simplify the 
>> >> >>>>>>> implementation by migrating the length prefix coder to be able to 
>> >> >>>>>>> embed a remote reference token at the end of the stream if the 
>> >> >>>>>>> data is too large. This allows any coder which supports lazy 
>> >> >>>>>>> decoding to return a view over a seekable stream instead of 
>> >> >>>>>>> decoding all the data (regardless whether all the data was sent 
>> >> >>>>>>> or there is a state token representing the remote reference).
>> >> >>>>>>>
>> >> >>>>>>> Allowing any arbitrary coder to support lazy decoding helps solve 
>> >> >>>>>>> the large iterable use case but also opens up the ability for 
>> >> >>>>>>> types which don't need to be fully decoded to provide lazy views. 
>> >> >>>>>>> Imagine our Beam rows using a format where only rows that are 
>> >> >>>>>>> read are decoded while everything else is left in its encoded 
>> >> >>>>>>> form.
>> >> >>>>>>>
>> >> >>>>>>> I also originally thought that this could also help solve an 
>> >> >>>>>>> issue where large values[3] need to be chunked across multiple 
>> >> >>>>>>> protobuf messages over the Data API which complicates the reading 
>> >> >>>>>>> side decoding implementation since each SDK needs to provide an 
>> >> >>>>>>> implementation that blocks and waits for the next chunk to come 
>> >> >>>>>>> across for the same logical stream[4]. But there are issues with 
>> >> >>>>>>> this because the runner may make a bad coder choice such as 
>> >> >>>>>>> iterable<length_prefix<blob>> (instead of 
>> >> >>>>>>> length_prefix<iterable<blob>>) which can lead to > 2gb of state 
>> >> >>>>>>> keys if there are many many values.
>> >> >>
>> >> >>
>> >> >> Yes. I think this would need to be a separate coder than the length 
>> >> >> prefix coder.
>> >> >>
>> >> >>
>> >> >>>>>>>
>> >> >>>>>>> Robert, would implementing the length prefix coder being backed 
>> >> >>>>>>> by state + adding a lazy decoding method to the iterable coder be 
>> >> >>>>>>> significantly more complicated then what you are proposing right 
>> >> >>>>>>> now?
>> >> >>
>> >> >>
>> >> >> Yes, chopping things up at arbitrary byte boundaries (rather than 
>> >> >> element boundaries) tends to be significantly more subtle and complex 
>> >> >> (based on my experience with the data plane API). It would also 
>> >> >> require new public APIs for Coders.
>> >> >
>> >> >
>> >> > After some further thought, I don't think we need to have a different 
>> >> > API for coders, its just that they get a different implementation for 
>> >> > the inputstream when decoding. So the logic would be:
>> >> > public T decode(InputStream is) {
>> >> >   if (is instanceof SeekableInputStream) {
>> >> >     return view((SeekableInputStream) is);
>> >> >   }
>> >> >   return decodeInternal(is);
>> >> > }
>> >>
>> >> SeekableInputStream is a new API. If we went this route of re-using
>> >> decode, it'd be an easy bug to accidentally pass a SeekableInputStream
>> >> to component coders which wouldn't do the right thing. (Perhaps all
>> >> coders would have to be modified?) And encoding is less obvious (e.g.
>> >> a subclass of OutputStream that takes a callback for the rest of the
>> >> bytes? As chosen by the caller or the callee?).
>> >>
>> >> >> This is why I went with the more restricted (but still by far most 
>> >> >> common, and quite straightforward) case of supporting arbitrarily 
>> >> >> large iterables (which can still occur at any level of nesting, e.g. 
>> >> >> inside rows), leaving the general case as future work.
>> >> >>
>> >> >>
>> >> >>>>>>>
>> >> >>>>>>> What do others think about coders supporting a "lazy" decode mode 
>> >> >>>>>>> in coders?
>> >> >>>>>>>
>> >> >>>>>>> 1: https://github.com/apache/beam/pull/7127
>> >> >>>>>>> 2: 
>> >> >>>>>>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50
>> >> >>>>>>> 3: 
>> >> >>>>>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0
>> >> >>>>>>> 4: 
>> >> >>>>>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf
>> >> >
>> >> > --
>> >> > You received this message because you are subscribed to the Google 
>> >> > Groups "DataPLS Unified Worker" group.
>> >> > To unsubscribe from this group and stop receiving emails from it, send 
>> >> > an email to datapls-unified-worker+unsubscr...@google.com.
>> >> > To post to this group, send email to datapls-unified-wor...@google.com.
>> >> > To view this discussion on the web visit 
>> >> > https://groups.google.com/a/google.com/d/msgid/datapls-unified-worker/CAF9t7_4WamYNMEc9j1aTvaKMk%3DYLp-CA41AX3UQYH8OnSbz0kw%40mail.gmail.com.

Reply via email to