On Tue, May 21, 2019 at 9:33 PM Lukasz Cwik <lc...@google.com> wrote:
> I don't think the runner needs to know the size of the iterable ahead of > time for encoding, the binary format could be something like: > first page | token for first page | token for second page > The only difference between the current encoding and the new encoding is > whether there are one or two tokens that follow the first page. > > You are correct in saying that the runner does need to have a way to > reference the first page after the fact though, this could be very cheap or > could be expensive and really is a runner implementation detail. For > example, for Dataflow this would be cheap. > Even on dataflow, the UW would still have to remember the pointer to the head of the list just in case it needed it later, and this also puts more pressure on the set of items (even just pointers) kept in its state service. It may be more of a burden on other runners. > I can only think of cases where a user decides to store the iterable in > memory as part of their DoFn with the intent to output it during > finishBundle. The most prominent one that comes to mind is partial GBK > implementations but they should already be memory pressure aware (LRU or > flush when cache full). > And in this sense it's no different than other potentially large elements, iterable or not. If a benchmark could be put together that would demonstrate savings, that could be convincing, but I have trouble seeing how an SDK would take advantage of this (what would trigger the discard of the first page? What complexity would that incur vs. the marginal benefit (if any)?). Mostly this feels like a solution in search of a problem. Is there a problem that you're trying to solve with this? > On Tue, May 21, 2019 at 12:42 AM Robert Bradshaw <rober...@google.com> > wrote: > >> The primary con I see with this is that the runner must know ahead of >> time, when it starts encoding the iterable, whether or not to treat it >> as a large one (to also cache the part it's encoding to the data >> channel, or at least some kind of pointer to it). With the current >> protocol it can be lazy, and decide, once it's seen that the iterable >> is decently large, to defer the remainder. This is particularly >> pertinent as large iterables are used everywhere an iterable *may* be >> large but in practice most of the time they are not which is the case >> we want to optimize for. >> >> This change also somewhat complicates the protocol, and given that our >> buffer sizes are on the order single digit or maybe of tens of MB (the >> point at which the communication overhead is sufficiently amortized to >> be negligible over the data transmission costs) there seems little >> memory benefit for the SDK to have the option to forget this first >> page and ask for it later. (It also feels a bit odd because nowhere >> else do we simultaneously provide a chunk of data together with a >> "self-pointer" to get the data again, and if it's needed here it'd be >> needed for more than just iterables.) >> >> I think support for "large item iterables" (or I would just say "large >> items" whether or not they're in an iterable) will require more >> invasive changes to the protocol (per the referenced discussion) that >> this is not likely a step towards. >> >> On Tue, May 21, 2019 at 2:09 AM Ruoyun Huang <ruo...@google.com> wrote: >> > >> > Hi, Folks, >> > >> > We propose to make a tweak to existing fnapi Large Iterable (result >> from GBK) protocol. Would like to see what everyone thinks. >> > >> > >> > To clarify a few terms used: >> > >> > [large iterable] A list of elements that are too expensive to hold them >> all in memory; To store a single element is relatively cheap. >> > >> > [Large item iterable] Same as above, except for even one single element >> is too expensive to keep in memory. >> > >> > [Page] A subset of elements from an iterable, up to a size limit. >> > >> > [Subsequent pages] All pages except for the first one. >> > >> > >> > Status quo: Currently large iterable is implemented by using a lazy >> decoding mechanism (related community discussions [1]), and is now only >> implemented/available in python. To summarize how it works: The first page >> is transmitted over data channel, and all the subsequent pages are >> transmitted over state channel. For each subsequent page, we store on >> Runner side: a token, and a mapping from token to corresponding iterator. >> In the meanwhile, SDK side always holds the first page in memory [until >> completely done]. >> > >> > >> > There is no token created/sent for starting position of iterable. >> > >> > >> > Proposed Tweak: Create a token for starting position of EACH iterable, >> and send it over to SDK. If SDK needs to re-iterate the large iterable, SDK >> may request all the pages (including the first page) via state channel. >> > >> > >> > Benefit: SDK side no longer holds first page in memory. Thus better >> memory efficiency. In addition to that, it makes easier to handle >> large-item-iterable in the future. >> > >> > >> > Cons: [Any other downside?] >> > >> > More data communication (one extra page) is needed. >> > >> > >> > Collateral impact: None. The only difference this proposal makes, is to >> add *one* token (i.e. starting position of large-iterable) into existing >> protocol. This token can be just redundant, with the existing behavior >> remains unchanged. Once this information is in place, SDKs can make their >> own choices whether to store first page in memory or not. >> > >> > >> > Why now? Large iterable is available only in Python, but it is coming >> to other SDKs soon. It would probably be easier to add this extra >> information early, thus less modification needed everywhere later on. >> > >> > >> > [1]: >> https://lists.apache.org/thread.html/70cac361b659516933c505b513d43986c25c13da59eabfd28457f1f2@%3Cdev.beam.apache.org%3E >> > >> > >> > >> >