On Thu, Nov 29, 2018 at 7:08 PM Lukasz Cwik <lc...@google.com> wrote: > > On Thu, Nov 29, 2018 at 7:13 AM Robert Bradshaw <rober...@google.com> wrote: >> >> On Thu, Nov 29, 2018 at 2:18 AM Lukasz Cwik <lc...@google.com> wrote: >> > >> > I don't believe we would need to change any other coders since >> > SeekableInputStream wouldn't change how a regular InputStream would work >> > so coders that don't care about the implementation would still use it as a >> > forward only input stream. Coders that care about seeking would use the >> > new functionality. >> >> An API could be developed that makes this work, but the proposal of >> >> class SmartCoder<T> { >> public T decode(InputStream is) { >> if (is instanceof SeekableInputStream) { >> return view((SeekableInputStream) is); >> } >> return decodeInternal(is); >> } >> } >> >> would break it passed to (just as an example) the unmodified KV coder >> >> class KvCoder<K, V> { >> public Kv<K, V> decode(InputStream is) { >> return Kv.of(keyCoder.decode(is), valueCoder.decode(is)); >> } >> } >> >> when invoked with an InputStream that happens to be a >> SeekableInputStream if either keyCoder or valueCoder were instances of >> SmartCoder unless SmartCoder.view() did something really clever about >> advancing the provided stream the right amount without actually >> consuming it. This is particularly expensive/tricky for iterables >> where it's most useful. > > > Thanks for walking through this with me Robert. > > The issue is that the view needs to advance the stream if it wants to decode > the components separately, this works naturally for the iterable coder since > all decoding is done in order so that advances the stream automatically and > for any component coder where it also supports being a view. For any coder > that isn't advancing the stream in order has to have an index as part of its > encoding. Using the KV coder as the example, the two strategies would be as > follows: > > decode method is the same for both strategies > public KV<K, V> decode(InputStream is) { > if (is instanceof SeekableInputStream) { > return KVView((SeekableInputStream) is, keyCoder, valueCoder); > } > return Kv.of(keyCoder.decode(is), valueCoder.decode(is)); > } > > forward only view decoding: > class KVView<K, V> extends KV<K, V> { > K getKey() { > if (!keyDecoded) { > key = keyCoder.decode(is); > } > return key; > } > > V getValue() { > // ensures the input stream has advanced to the value position > getKey(); > > if (!valueDecoded) { > value = valueCoder.decode(is); > } > return value; > } > > index based decoding: > class KVView<K, V> extends KV<K, V> { > KVView(SeekableInputStream is, Coder<K> keyCoder, Coder<V> valueCoder) { > valueOffset = readBigEndianInt(is); > // ... > } > K getKey() { > if (!keyDecoded) { > is.seek(4); // 4 bytes for big int index > key = keyCoder.decode(is); > } > return key; > } > > V getValue() { > if (!valueDecoded) { > is.seek(valueOffset); > value = valueCoder.decode(is); > } > return value; > } > > I believe for the KV case and the iterable case we will find that our coders > are typically KV<LengthPrefix<Key>, LengthPrefix<Value>> and > Iterable<LengthPrefix<Value>> which would mean that a smart coder could > inspect the component coder and if its a length prefix coder, ask it to seek > to the end of its value within the input stream which mean that a smart coder > could understand the length of its components.
I understand how KV coder can be made smart. My concern is the difficulty of having dumb coders with smart coder components. E.g. imagine a row coder class DumbRowCoder { Row decode(InputStream is) { List<Object> parts = ... for (Coder c : componentCoders) { // Smart coders *must* advance the inputs stream in case the // subsequent coder is also dumb. // Efficient seek will require more than continuation tokens over the FnAPI. // Important ones like iterable are likely to be lazily written, and so won't know // their length when they start encoding, but iterating it to discover the length // defeats much of the goal of being lazy. parts.add(c.decode(is)); } } } >> > For the encoding portion, the state backed length prefix coder would send >> > the small snippet of data that it received plus the state key without >> > invoking the component coder to encode the value. The downstream receiving >> > party would need to lookup the remote reference to get all the data. >> >> I'm trying to follow what you're saying here. Are you restricting to >> the case of only encoding something that was formerly decoded with a >> state backed length prefix coder (and keeps the unencoded bytes >> around)? > > Yes. > >> It'd be good to support writing novel values lazily as well. >> Also, this brings up the issue of how to manage the lifetime of remote >> references if they can be extended in this way. > > I don't know of any data processing system that currently handles this and am > unsure if this is a problem in practice for many people. If someone gets a > giant string iterable and naively transforms it, for example by concatenating > all the strings, their worker will crash. Only solution I have seen is that > people don't naively transform the value but instead pass around a view over > the value that applies the transform as needed. I was thinking primarily of the case where the runner has such arbitrarily long iterables from a GBK and wants to encode them and send them to the SDK. > Extending remote references to also be able to be created by SDKs is an > interesting idea that I had explored a tiny bit in the past but then dropped > it due to time constraints and for the fact that we had much more immediate > things we wanted to get implemented. I think we're still in that boat :). >> > All other coders would not be lazy and would have to encode the entire >> > lazy view, this could be done by optimized by copying the >> > SeekableInputStream to the OutputStream. Note that the length prefix coder >> > is never used with IOs and hence those IOs could be given a type like >> > Iterable<Foo> which is lazy, but the encoding for that wouldn't be lazy >> >> Yes, that's how it is now. >> >> > and would output all the data from the SeekableInputStream. >> > >> > >> > On Wed, Nov 28, 2018 at 3:08 PM Robert Bradshaw <rober...@google.com> >> > wrote: >> >> >> >> On Wed, Nov 28, 2018 at 11:57 PM Lukasz Cwik <lc...@google.com> wrote: >> >> > >> >> > Re-adding +datapls-portability-t...@google.com >> >> > +datapls-unified-wor...@google.com >> >> > >> >> > On Wed, Nov 28, 2018 at 2:23 PM Robert Bradshaw <rober...@google.com> >> >> > wrote: >> >> >> >> >> >> Thanks for bringing this to the list. More below. >> >> >> >> >> >> On Wed, Nov 28, 2018 at 11:10 PM Kenneth Knowles <k...@apache.org> >> >> >> wrote: >> >> >>> >> >> >>> FWIW I deliberately limited the thread to not mix public and private >> >> >>> lists, so people intending private replies do not accidentally send >> >> >>> to dev@beam. >> >> >>> >> >> >>> I've left them on this time, to avoid contradicting your action, but >> >> >>> I recommend removing them. >> >> >>> >> >> >>> Kenn >> >> >>> >> >> >>> On Wed, Nov 28, 2018 at 12:59 PM Lukasz Cwik <lc...@google.com> wrote: >> >> >>>> >> >> >>>> Re-adding +datapls-portability-t...@google.com >> >> >>>> +datapls-unified-wor...@google.com >> >> >>>> >> >> >>>> On Wed, Nov 28, 2018 at 12:58 PM Lukasz Cwik <lc...@google.com> >> >> >>>> wrote: >> >> >>>>> >> >> >>>>> That is correct Kenn. An important point would be that >> >> >>>>> SomeOtherCoder would be given a seekable stream (instead of the >> >> >>>>> forward only stream it gets right now) so it can either decode all >> >> >>>>> the data or lazily decode parts as it needs to as in the case of an >> >> >>>>> iterable coder when used to support large iterables coming out of a >> >> >>>>> GroupByKey. >> >> >>>>> >> >> >>>>> On Wed, Nov 28, 2018 at 12:52 PM Kenneth Knowles <k...@apache.org> >> >> >>>>> wrote: >> >> >>>>>> >> >> >>>>>> Interesting! Having large iterables within rows would be great for >> >> >>>>>> the interactions between SQL and the core SDK's schema/Row >> >> >>>>>> support, and we weren't sure how that could work, exactly. >> >> >>>>>> >> >> >>>>>> My (very basic) understanding would be that >> >> >>>>>> LengthPrefixedCoder(SomeOtherCoder) has an encoding that is a >> >> >>>>>> length followed by the encoding of SomeOtherCoder. >> >> >>>>>> >> >> >>>>>> So the new proposal would be that >> >> >>>>>> LengthPrefixedCoder(SomeOtherCoder) has an encoding where it has a >> >> >>>>>> length followed by some number of bytes and if it ends with a >> >> >>>>>> special token (ignoring escaping issues) then you have to gather >> >> >>>>>> bytes from more messages in order to assemble a stream to send to >> >> >>>>>> SomeOtherCoder? Have I got what you mean? So this is a different, >> >> >>>>>> yet compatible, approach to sending over a special token that has >> >> >>>>>> to be looked up separately via the state read API? >> >> >>>>>> >> >> >>>>>> Kenn >> >> >>>>>> >> >> >>>>>> On Wed, Nov 28, 2018 at 12:01 PM Lukasz Cwik <lc...@google.com> >> >> >>>>>> wrote: >> >> >>>>>>> >> >> >>>>>>> There is a discussion happening on a PR 7127[1] where Robert is >> >> >>>>>>> working on providing the first implementation for supporting >> >> >>>>>>> large iterables resulting from a GroupByKey. This is inline with >> >> >>>>>>> the original proposal for remote references over the Fn Data & >> >> >>>>>>> State API[2]. >> >> >>>>>>> >> >> >>>>>>> I had thought about this issue more since the original write up >> >> >>>>>>> was done over a year ago and believe that we can simplify the >> >> >>>>>>> implementation by migrating the length prefix coder to be able to >> >> >>>>>>> embed a remote reference token at the end of the stream if the >> >> >>>>>>> data is too large. This allows any coder which supports lazy >> >> >>>>>>> decoding to return a view over a seekable stream instead of >> >> >>>>>>> decoding all the data (regardless whether all the data was sent >> >> >>>>>>> or there is a state token representing the remote reference). >> >> >>>>>>> >> >> >>>>>>> Allowing any arbitrary coder to support lazy decoding helps solve >> >> >>>>>>> the large iterable use case but also opens up the ability for >> >> >>>>>>> types which don't need to be fully decoded to provide lazy views. >> >> >>>>>>> Imagine our Beam rows using a format where only rows that are >> >> >>>>>>> read are decoded while everything else is left in its encoded >> >> >>>>>>> form. >> >> >>>>>>> >> >> >>>>>>> I also originally thought that this could also help solve an >> >> >>>>>>> issue where large values[3] need to be chunked across multiple >> >> >>>>>>> protobuf messages over the Data API which complicates the reading >> >> >>>>>>> side decoding implementation since each SDK needs to provide an >> >> >>>>>>> implementation that blocks and waits for the next chunk to come >> >> >>>>>>> across for the same logical stream[4]. But there are issues with >> >> >>>>>>> this because the runner may make a bad coder choice such as >> >> >>>>>>> iterable<length_prefix<blob>> (instead of >> >> >>>>>>> length_prefix<iterable<blob>>) which can lead to > 2gb of state >> >> >>>>>>> keys if there are many many values. >> >> >> >> >> >> >> >> >> Yes. I think this would need to be a separate coder than the length >> >> >> prefix coder. >> >> >> >> >> >> >> >> >>>>>>> >> >> >>>>>>> Robert, would implementing the length prefix coder being backed >> >> >>>>>>> by state + adding a lazy decoding method to the iterable coder be >> >> >>>>>>> significantly more complicated then what you are proposing right >> >> >>>>>>> now? >> >> >> >> >> >> >> >> >> Yes, chopping things up at arbitrary byte boundaries (rather than >> >> >> element boundaries) tends to be significantly more subtle and complex >> >> >> (based on my experience with the data plane API). It would also >> >> >> require new public APIs for Coders. >> >> > >> >> > >> >> > After some further thought, I don't think we need to have a different >> >> > API for coders, its just that they get a different implementation for >> >> > the inputstream when decoding. So the logic would be: >> >> > public T decode(InputStream is) { >> >> > if (is instanceof SeekableInputStream) { >> >> > return view((SeekableInputStream) is); >> >> > } >> >> > return decodeInternal(is); >> >> > } >> >> >> >> SeekableInputStream is a new API. If we went this route of re-using >> >> decode, it'd be an easy bug to accidentally pass a SeekableInputStream >> >> to component coders which wouldn't do the right thing. (Perhaps all >> >> coders would have to be modified?) And encoding is less obvious (e.g. >> >> a subclass of OutputStream that takes a callback for the rest of the >> >> bytes? As chosen by the caller or the callee?). >> >> >> >> >> This is why I went with the more restricted (but still by far most >> >> >> common, and quite straightforward) case of supporting arbitrarily >> >> >> large iterables (which can still occur at any level of nesting, e.g. >> >> >> inside rows), leaving the general case as future work. >> >> >> >> >> >> >> >> >>>>>>> >> >> >>>>>>> What do others think about coders supporting a "lazy" decode mode >> >> >>>>>>> in coders? >> >> >>>>>>> >> >> >>>>>>> 1: https://github.com/apache/beam/pull/7127 >> >> >>>>>>> 2: >> >> >>>>>>> https://docs.google.com/document/d/1BOozW0bzBuz4oHJEuZNDOHdzaV5Y56ix58Ozrqm2jFg/edit#heading=h.y6e78jyiwn50 >> >> >>>>>>> 3: >> >> >>>>>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.akxviyj4m0f0 >> >> >>>>>>> 4: >> >> >>>>>>> https://docs.google.com/document/d/1IGduUqmhWDi_69l9nG8kw73HZ5WI5wOps9Tshl5wpQA/edit#heading=h.u78ozd9rrlsf >> >> > >> >> > -- >> >> > You received this message because you are subscribed to the Google >> >> > Groups "DataPLS Unified Worker" group. >> >> > To unsubscribe from this group and stop receiving emails from it, send >> >> > an email to datapls-unified-worker+unsubscr...@google.com. >> >> > To post to this group, send email to datapls-unified-wor...@google.com. >> >> > To view this discussion on the web visit >> >> > https://groups.google.com/a/google.com/d/msgid/datapls-unified-worker/CAF9t7_4WamYNMEc9j1aTvaKMk%3DYLp-CA41AX3UQYH8OnSbz0kw%40mail.gmail.com.