I left some comments on the doc. I think the general idea is sound, but one thing that worries me is the introduction of a parallel set of IOs that mirrors the (existing) FileIOs. I would suggest either (1) incorporate this functionality into the generic FileIO infrastructure, or let it be parameterized by arbitrary IO (which I'm not sure is possible, especially for the Read side (and better would be the capability of supporting arbitrary sources, aka an optional "as-sharded-source" operation that returns a PTransform<..., KV<shard-id, Iterable<KV<K, V>>> where the iterable is promised to be in key order)) or support a single SMB aka "PreGrouping" source/sink pair that's aways used together (and whose underlying format is not necessarily public).
On Sat, Jul 13, 2019 at 3:19 PM Neville Li <neville....@gmail.com> wrote: > > 4 people have commented but mostly clarifying details and not much on the > overall design. > > It'd be great to have thumbs up/down on the design, specifically metadata, > bucket & shard strategy, etc., since that affects backwards compatibility of > output files. > Some breaking changes, e.g. dynamic # of shards, are out of scope for V1 > unless someone feels strongly about it. The current scope should cover all > our use cases and leave room for optimization. > > Once green lighted we can start adopting internally, ironing out rough edges > while iterating on the PRs in parallel. > > Most of the implementation is self-contained in the extensions:smb module, > except making a few core classes/methods public for reuse. So despite the > amount of work it's still fairly low risk to the code base. There're some > proposed optimization & refactoring involving core (see appendix) but IMO > they're better left for followup PRs. > > On Fri, Jul 12, 2019 at 11:34 PM Kenneth Knowles <k...@apache.org> wrote: >> >> I've seen some discussion on the doc. I cannot tell whether the questions >> are resolved or what the status of review is. Would you mind looping this >> thread with a quick summary? This is such a major piece of work I don't want >> it to sit with everyone thinking they are waiting on someone else, or any >> such thing. (not saying this is happening, just pinging to be sure) >> >> Kenn >> >> On Mon, Jul 1, 2019 at 1:09 PM Neville Li <neville....@gmail.com> wrote: >>> >>> Updated the doc a bit with more future work (appendix). IMO most of them >>> are non-breaking and better done in separate PRs later since some involve >>> pretty big refactoring and are outside the scope of MVP. >>> >>> For now we'd really like to get feedback on some fundamental design >>> decisions and find a way to move forward. >>> >>> On Thu, Jun 27, 2019 at 4:39 PM Neville Li <neville....@gmail.com> wrote: >>>> >>>> Thanks. I responded to comments in the doc. More inline. >>>> >>>> On Thu, Jun 27, 2019 at 2:44 PM Chamikara Jayalath <chamik...@google.com> >>>> wrote: >>>>> >>>>> Thanks added few comments. >>>>> >>>>> If I understood correctly, you basically assign elements with keys to >>>>> different buckets which are written to unique files and merge files for >>>>> the same key while reading ? >>>>> >>>>> Some of my concerns are. >>>>> >>>>> (1) Seems like you rely on an in-memory sorting of buckets. Will this >>>>> end up limiting the size of a PCollection you can process ? >>>> >>>> The sorter transform we're using supports spilling and external sort. We >>>> can break up large key groups further by sharding, similar to fan out in >>>> some GBK transforms. >>>> >>>>> (2) Seems like you rely on Reshuffle.viaRandomKey() which is actually >>>>> implemented using a shuffle (which you try to replace with this proposal). >>>> >>>> That's for distributing task metadata, so that each DoFn thread picks up a >>>> random bucket and sort merge key-values. It's not shuffling actual data. >>>> >>>>> >>>>> (3) I think (at least some of the) shuffle implementations are >>>>> implemented in ways similar to this (writing to files and merging). So >>>>> I'm wondering if the performance benefits you see are for a very specific >>>>> case and may limit the functionality in other ways. >>>> >>>> This is for the common pattern of few core data producer pipelines and >>>> many downstream consumer pipelines. It's not intended to replace >>>> shuffle/join within a single pipeline. On the producer side, by >>>> pre-grouping/sorting data and writing to bucket/shard output files, the >>>> consumer can sort/merge matching ones without a CoGBK. Essentially we're >>>> paying the shuffle cost upfront to avoid them repeatedly in each consumer >>>> pipeline that wants to join data. >>>> >>>>> >>>>> Thanks, >>>>> Cham >>>>> >>>>> >>>>> On Thu, Jun 27, 2019 at 8:12 AM Neville Li <neville....@gmail.com> wrote: >>>>>> >>>>>> Ping again. Any chance someone takes a look to get this thing going? >>>>>> It's just a design doc and basic metadata/IO impl. We're not talking >>>>>> about actual source/sink code yet (already done but saved for future >>>>>> PRs). >>>>>> >>>>>> On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay <al...@google.com> wrote: >>>>>>> >>>>>>> Thank you Claire, this looks promising. Explicitly adding a few folks >>>>>>> that might have feedback: +Ismaël Mejía +Robert Bradshaw +Lukasz Cwik >>>>>>> +Chamikara Jayalath >>>>>>> >>>>>>> On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty >>>>>>> <claire.d.mcgi...@gmail.com> wrote: >>>>>>>> >>>>>>>> Hey dev@! >>>>>>>> >>>>>>>> Myself and a few other Spotify data engineers have put together a >>>>>>>> design doc for SMB Join support in Beam, and have a working Java >>>>>>>> implementation we've started to put up for PR ([0], [1], [2]). There's >>>>>>>> more detailed information in the document, but the tl;dr is that SMB >>>>>>>> is a strategy to optimize joins for file-based sources by modifying >>>>>>>> the initial write operation to write records in sorted buckets based >>>>>>>> on the desired join key. This means that subsequent joins of datasets >>>>>>>> written in this way are only sequential file reads, no shuffling >>>>>>>> involved. We've seen some pretty substantial performance speedups with >>>>>>>> our implementation and would love to get it checked in to Beam's Java >>>>>>>> SDK. >>>>>>>> >>>>>>>> We'd appreciate any suggestions or feedback on our proposal--the >>>>>>>> design doc should be public to comment on. >>>>>>>> >>>>>>>> Thanks! >>>>>>>> Claire / Neville