I agree that generalizing the existing FileIO may not be the right path forward, and I'd only make their innards public with great care. (Would this be used like like SmbSink(MyFileIO.sink(parameters).getWriter[Factory]())?) SMB is a bit unique that the source and sink are much more coupled than other sources and sinks (which happen to be completely independent, if complementary implementations, whereas SMB attempts to be a kind of pipe where one half is instanciated in each pipeline).
In short, an SMB source/sink that is parameterized by an arbitrary, existing IO would be ideal (but possibly not feasible (per existing prioritizations)), or an SMB source/sink that works as a pair. What I'd like to avoid is a set of parallel SMB IO classes that (partially, and incompletely) mirror the existing IO ones (from an API perspective--how much implementation it makes sense to share is an orthogonal issue that I'm sure can be worked out.) On Mon, Jul 15, 2019 at 4:18 PM Neville Li <neville....@gmail.com> wrote: > > Hi Robert, > > I agree, it'd be nice to reuse FileIO logic of different file types. But > given the current code structure of FileIO & scope of the change, I feel it's > better left for future refactor PRs. > > Some thoughts: > - SMB file operation is simple single file sequential reads/writes, which > already exists as Writer & FileBasedReader but are private inner classes, and > have references to the parent Sink/Source instance. > - The readers also have extra offset/split logic but that can be worked > around. > - It'll be nice to not duplicate temp->destination file logic but again > WriteFiles is assuming a single integer shard key, so it'll take some > refactoring to reuse it. > > All of these can be done in backwards compatible way. OTOH generalizing the > existing components too much (esp. WriteFiles, which is already complex) > might lead to two logic paths, one specialized for the SMB case. It might be > easier to decouple some of them for better reuse. But again I feel it's a > separate discussion. > > On Mon, Jul 15, 2019 at 9:45 AM Claire McGinty <claire.d.mcgi...@gmail.com> > wrote: >> >> Thanks Robert! >> >> We'd definitely like to be able to re-use existing I/O components--for >> example the Writer<DestinationT, OutputT>/FileBasedReader<T> (since they >> operate on a WritableByteChannel/ReadableByteChannel, which is the level of >> granularity we need) but the Writers, at least, seem to be mostly >> private-access. Do you foresee them being made public at any point? >> >> - Claire >> >> On Mon, Jul 15, 2019 at 9:31 AM Robert Bradshaw <rober...@google.com> wrote: >>> >>> I left some comments on the doc. >>> >>> I think the general idea is sound, but one thing that worries me is >>> the introduction of a parallel set of IOs that mirrors the (existing) >>> FileIOs. I would suggest either (1) incorporate this functionality >>> into the generic FileIO infrastructure, or let it be parameterized by >>> arbitrary IO (which I'm not sure is possible, especially for the Read >>> side (and better would be the capability of supporting arbitrary >>> sources, aka an optional "as-sharded-source" operation that returns a >>> PTransform<..., KV<shard-id, Iterable<KV<K, V>>> where the iterable is >>> promised to be in key order)) or support a single SMB aka >>> "PreGrouping" source/sink pair that's aways used together (and whose >>> underlying format is not necessarily public). >>> >>> On Sat, Jul 13, 2019 at 3:19 PM Neville Li <neville....@gmail.com> wrote: >>> > >>> > 4 people have commented but mostly clarifying details and not much on the >>> > overall design. >>> > >>> > It'd be great to have thumbs up/down on the design, specifically >>> > metadata, bucket & shard strategy, etc., since that affects backwards >>> > compatibility of output files. >>> > Some breaking changes, e.g. dynamic # of shards, are out of scope for V1 >>> > unless someone feels strongly about it. The current scope should cover >>> > all our use cases and leave room for optimization. >>> > >>> > Once green lighted we can start adopting internally, ironing out rough >>> > edges while iterating on the PRs in parallel. >>> > >>> > Most of the implementation is self-contained in the extensions:smb >>> > module, except making a few core classes/methods public for reuse. So >>> > despite the amount of work it's still fairly low risk to the code base. >>> > There're some proposed optimization & refactoring involving core (see >>> > appendix) but IMO they're better left for followup PRs. >>> > >>> > On Fri, Jul 12, 2019 at 11:34 PM Kenneth Knowles <k...@apache.org> wrote: >>> >> >>> >> I've seen some discussion on the doc. I cannot tell whether the >>> >> questions are resolved or what the status of review is. Would you mind >>> >> looping this thread with a quick summary? This is such a major piece of >>> >> work I don't want it to sit with everyone thinking they are waiting on >>> >> someone else, or any such thing. (not saying this is happening, just >>> >> pinging to be sure) >>> >> >>> >> Kenn >>> >> >>> >> On Mon, Jul 1, 2019 at 1:09 PM Neville Li <neville....@gmail.com> wrote: >>> >>> >>> >>> Updated the doc a bit with more future work (appendix). IMO most of >>> >>> them are non-breaking and better done in separate PRs later since some >>> >>> involve pretty big refactoring and are outside the scope of MVP. >>> >>> >>> >>> For now we'd really like to get feedback on some fundamental design >>> >>> decisions and find a way to move forward. >>> >>> >>> >>> On Thu, Jun 27, 2019 at 4:39 PM Neville Li <neville....@gmail.com> >>> >>> wrote: >>> >>>> >>> >>>> Thanks. I responded to comments in the doc. More inline. >>> >>>> >>> >>>> On Thu, Jun 27, 2019 at 2:44 PM Chamikara Jayalath >>> >>>> <chamik...@google.com> wrote: >>> >>>>> >>> >>>>> Thanks added few comments. >>> >>>>> >>> >>>>> If I understood correctly, you basically assign elements with keys to >>> >>>>> different buckets which are written to unique files and merge files >>> >>>>> for the same key while reading ? >>> >>>>> >>> >>>>> Some of my concerns are. >>> >>>>> >>> >>>>> (1) Seems like you rely on an in-memory sorting of buckets. Will >>> >>>>> this end up limiting the size of a PCollection you can process ? >>> >>>> >>> >>>> The sorter transform we're using supports spilling and external sort. >>> >>>> We can break up large key groups further by sharding, similar to fan >>> >>>> out in some GBK transforms. >>> >>>> >>> >>>>> (2) Seems like you rely on Reshuffle.viaRandomKey() which is actually >>> >>>>> implemented using a shuffle (which you try to replace with this >>> >>>>> proposal). >>> >>>> >>> >>>> That's for distributing task metadata, so that each DoFn thread picks >>> >>>> up a random bucket and sort merge key-values. It's not shuffling >>> >>>> actual data. >>> >>>> >>> >>>>> >>> >>>>> (3) I think (at least some of the) shuffle implementations are >>> >>>>> implemented in ways similar to this (writing to files and merging). >>> >>>>> So I'm wondering if the performance benefits you see are for a very >>> >>>>> specific case and may limit the functionality in other ways. >>> >>>> >>> >>>> This is for the common pattern of few core data producer pipelines and >>> >>>> many downstream consumer pipelines. It's not intended to replace >>> >>>> shuffle/join within a single pipeline. On the producer side, by >>> >>>> pre-grouping/sorting data and writing to bucket/shard output files, >>> >>>> the consumer can sort/merge matching ones without a CoGBK. Essentially >>> >>>> we're paying the shuffle cost upfront to avoid them repeatedly in each >>> >>>> consumer pipeline that wants to join data. >>> >>>> >>> >>>>> >>> >>>>> Thanks, >>> >>>>> Cham >>> >>>>> >>> >>>>> >>> >>>>> On Thu, Jun 27, 2019 at 8:12 AM Neville Li <neville....@gmail.com> >>> >>>>> wrote: >>> >>>>>> >>> >>>>>> Ping again. Any chance someone takes a look to get this thing going? >>> >>>>>> It's just a design doc and basic metadata/IO impl. We're not talking >>> >>>>>> about actual source/sink code yet (already done but saved for future >>> >>>>>> PRs). >>> >>>>>> >>> >>>>>> On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay <al...@google.com> wrote: >>> >>>>>>> >>> >>>>>>> Thank you Claire, this looks promising. Explicitly adding a few >>> >>>>>>> folks that might have feedback: +Ismaël Mejía +Robert Bradshaw >>> >>>>>>> +Lukasz Cwik +Chamikara Jayalath >>> >>>>>>> >>> >>>>>>> On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty >>> >>>>>>> <claire.d.mcgi...@gmail.com> wrote: >>> >>>>>>>> >>> >>>>>>>> Hey dev@! >>> >>>>>>>> >>> >>>>>>>> Myself and a few other Spotify data engineers have put together a >>> >>>>>>>> design doc for SMB Join support in Beam, and have a working Java >>> >>>>>>>> implementation we've started to put up for PR ([0], [1], [2]). >>> >>>>>>>> There's more detailed information in the document, but the tl;dr >>> >>>>>>>> is that SMB is a strategy to optimize joins for file-based sources >>> >>>>>>>> by modifying the initial write operation to write records in >>> >>>>>>>> sorted buckets based on the desired join key. This means that >>> >>>>>>>> subsequent joins of datasets written in this way are only >>> >>>>>>>> sequential file reads, no shuffling involved. We've seen some >>> >>>>>>>> pretty substantial performance speedups with our implementation >>> >>>>>>>> and would love to get it checked in to Beam's Java SDK. >>> >>>>>>>> >>> >>>>>>>> We'd appreciate any suggestions or feedback on our proposal--the >>> >>>>>>>> design doc should be public to comment on. >>> >>>>>>>> >>> >>>>>>>> Thanks! >>> >>>>>>>> Claire / Neville