I share the same concern with Robert regarding re-implementing parts of IO. At the same time, in the past, I worked on internal libraries that try to re-use code from existing IO, and it's hardly possible because it feels like it wasn't designed for re-use. There are a lot of classes that are nested (non-static) or non-public. I can understand why they were made non-public, it's a hard abstraction to design well and keep compatibility. As Neville mentioned, decoupling readers and writers would not only benefit for this proposal but for any other use-case that has to deal with low-level API such as FileSystem API, that is hardly possible today without copy-pasting,
On Mon, Jul 15, 2019 at 5:05 PM Neville Li <[email protected]> wrote: > Re: avoiding mirroring IO functionality, what about: > > - Decouple the nested FileBasedSink.Writer and > FileBasedSource.FileBasedReader, make them top level and remove references > to parent classes. > - Simplify the interfaces, while maintaining support for block/offset read > & sequential write. > - As a bonus, the refactored IO classes can be used standalone in case > when the user wants to perform custom IO in a DoFn, i.e. a > PTransform<PCollection<URI>, PCollection<KV<URI, GenericRecord>>>. Today > this requires a lot of copy-pasted Avro boilerplate. > - For compatibility, we can delegate to the new classes from the old ones > and remove them in the next breaking release. > > Re: WriteFiles logic, I'm not sure about generalizing it, but what about > splitting the part handling writing temp files into a new > PTransform<PCollection<KV<ResourceId, Iterable<UserT>>>, > PCollection<WriteFilesResult<DestinationT>>>? That splits the bucket-shard > logic from actual file IO. > > On Mon, Jul 15, 2019 at 10:27 AM Robert Bradshaw <[email protected]> > wrote: > >> I agree that generalizing the existing FileIO may not be the right >> path forward, and I'd only make their innards public with great care. >> (Would this be used like like >> SmbSink(MyFileIO.sink(parameters).getWriter[Factory]())?) SMB is a bit >> unique that the source and sink are much more coupled than other >> sources and sinks (which happen to be completely independent, if >> complementary implementations, whereas SMB attempts to be a kind of >> pipe where one half is instanciated in each pipeline). >> >> In short, an SMB source/sink that is parameterized by an arbitrary, >> existing IO would be ideal (but possibly not feasible (per existing >> prioritizations)), or an SMB source/sink that works as a pair. What >> I'd like to avoid is a set of parallel SMB IO classes that (partially, >> and incompletely) mirror the existing IO ones (from an API >> perspective--how much implementation it makes sense to share is an >> orthogonal issue that I'm sure can be worked out.) >> >> On Mon, Jul 15, 2019 at 4:18 PM Neville Li <[email protected]> wrote: >> > >> > Hi Robert, >> > >> > I agree, it'd be nice to reuse FileIO logic of different file types. >> But given the current code structure of FileIO & scope of the change, I >> feel it's better left for future refactor PRs. >> > >> > Some thoughts: >> > - SMB file operation is simple single file sequential reads/writes, >> which already exists as Writer & FileBasedReader but are private inner >> classes, and have references to the parent Sink/Source instance. >> > - The readers also have extra offset/split logic but that can be worked >> around. >> > - It'll be nice to not duplicate temp->destination file logic but again >> WriteFiles is assuming a single integer shard key, so it'll take some >> refactoring to reuse it. >> > >> > All of these can be done in backwards compatible way. OTOH generalizing >> the existing components too much (esp. WriteFiles, which is already >> complex) might lead to two logic paths, one specialized for the SMB case. >> It might be easier to decouple some of them for better reuse. But again I >> feel it's a separate discussion. >> > >> > On Mon, Jul 15, 2019 at 9:45 AM Claire McGinty < >> [email protected]> wrote: >> >> >> >> Thanks Robert! >> >> >> >> We'd definitely like to be able to re-use existing I/O components--for >> example the Writer<DestinationT, OutputT>/FileBasedReader<T> (since they >> operate on a WritableByteChannel/ReadableByteChannel, which is the level of >> granularity we need) but the Writers, at least, seem to be mostly >> private-access. Do you foresee them being made public at any point? >> >> >> >> - Claire >> >> >> >> On Mon, Jul 15, 2019 at 9:31 AM Robert Bradshaw <[email protected]> >> wrote: >> >>> >> >>> I left some comments on the doc. >> >>> >> >>> I think the general idea is sound, but one thing that worries me is >> >>> the introduction of a parallel set of IOs that mirrors the (existing) >> >>> FileIOs. I would suggest either (1) incorporate this functionality >> >>> into the generic FileIO infrastructure, or let it be parameterized by >> >>> arbitrary IO (which I'm not sure is possible, especially for the Read >> >>> side (and better would be the capability of supporting arbitrary >> >>> sources, aka an optional "as-sharded-source" operation that returns a >> >>> PTransform<..., KV<shard-id, Iterable<KV<K, V>>> where the iterable is >> >>> promised to be in key order)) or support a single SMB aka >> >>> "PreGrouping" source/sink pair that's aways used together (and whose >> >>> underlying format is not necessarily public). >> >>> >> >>> On Sat, Jul 13, 2019 at 3:19 PM Neville Li <[email protected]> >> wrote: >> >>> > >> >>> > 4 people have commented but mostly clarifying details and not much >> on the overall design. >> >>> > >> >>> > It'd be great to have thumbs up/down on the design, specifically >> metadata, bucket & shard strategy, etc., since that affects backwards >> compatibility of output files. >> >>> > Some breaking changes, e.g. dynamic # of shards, are out of scope >> for V1 unless someone feels strongly about it. The current scope should >> cover all our use cases and leave room for optimization. >> >>> > >> >>> > Once green lighted we can start adopting internally, ironing out >> rough edges while iterating on the PRs in parallel. >> >>> > >> >>> > Most of the implementation is self-contained in the extensions:smb >> module, except making a few core classes/methods public for reuse. So >> despite the amount of work it's still fairly low risk to the code base. >> There're some proposed optimization & refactoring involving core (see >> appendix) but IMO they're better left for followup PRs. >> >>> > >> >>> > On Fri, Jul 12, 2019 at 11:34 PM Kenneth Knowles <[email protected]> >> wrote: >> >>> >> >> >>> >> I've seen some discussion on the doc. I cannot tell whether the >> questions are resolved or what the status of review is. Would you mind >> looping this thread with a quick summary? This is such a major piece of >> work I don't want it to sit with everyone thinking they are waiting on >> someone else, or any such thing. (not saying this is happening, just >> pinging to be sure) >> >>> >> >> >>> >> Kenn >> >>> >> >> >>> >> On Mon, Jul 1, 2019 at 1:09 PM Neville Li <[email protected]> >> wrote: >> >>> >>> >> >>> >>> Updated the doc a bit with more future work (appendix). IMO most >> of them are non-breaking and better done in separate PRs later since some >> involve pretty big refactoring and are outside the scope of MVP. >> >>> >>> >> >>> >>> For now we'd really like to get feedback on some fundamental >> design decisions and find a way to move forward. >> >>> >>> >> >>> >>> On Thu, Jun 27, 2019 at 4:39 PM Neville Li <[email protected]> >> wrote: >> >>> >>>> >> >>> >>>> Thanks. I responded to comments in the doc. More inline. >> >>> >>>> >> >>> >>>> On Thu, Jun 27, 2019 at 2:44 PM Chamikara Jayalath < >> [email protected]> wrote: >> >>> >>>>> >> >>> >>>>> Thanks added few comments. >> >>> >>>>> >> >>> >>>>> If I understood correctly, you basically assign elements with >> keys to different buckets which are written to unique files and merge files >> for the same key while reading ? >> >>> >>>>> >> >>> >>>>> Some of my concerns are. >> >>> >>>>> >> >>> >>>>> (1) Seems like you rely on an in-memory sorting of buckets. >> Will this end up limiting the size of a PCollection you can process ? >> >>> >>>> >> >>> >>>> The sorter transform we're using supports spilling and external >> sort. We can break up large key groups further by sharding, similar to fan >> out in some GBK transforms. >> >>> >>>> >> >>> >>>>> (2) Seems like you rely on Reshuffle.viaRandomKey() which is >> actually implemented using a shuffle (which you try to replace with this >> proposal). >> >>> >>>> >> >>> >>>> That's for distributing task metadata, so that each DoFn thread >> picks up a random bucket and sort merge key-values. It's not shuffling >> actual data. >> >>> >>>> >> >>> >>>>> >> >>> >>>>> (3) I think (at least some of the) shuffle implementations are >> implemented in ways similar to this (writing to files and merging). So I'm >> wondering if the performance benefits you see are for a very specific case >> and may limit the functionality in other ways. >> >>> >>>> >> >>> >>>> This is for the common pattern of few core data producer >> pipelines and many downstream consumer pipelines. It's not intended to >> replace shuffle/join within a single pipeline. On the producer side, by >> pre-grouping/sorting data and writing to bucket/shard output files, the >> consumer can sort/merge matching ones without a CoGBK. Essentially we're >> paying the shuffle cost upfront to avoid them repeatedly in each consumer >> pipeline that wants to join data. >> >>> >>>> >> >>> >>>>> >> >>> >>>>> Thanks, >> >>> >>>>> Cham >> >>> >>>>> >> >>> >>>>> >> >>> >>>>> On Thu, Jun 27, 2019 at 8:12 AM Neville Li < >> [email protected]> wrote: >> >>> >>>>>> >> >>> >>>>>> Ping again. Any chance someone takes a look to get this thing >> going? It's just a design doc and basic metadata/IO impl. We're not talking >> about actual source/sink code yet (already done but saved for future PRs). >> >>> >>>>>> >> >>> >>>>>> On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay <[email protected]> >> wrote: >> >>> >>>>>>> >> >>> >>>>>>> Thank you Claire, this looks promising. Explicitly adding a >> few folks that might have feedback: +Ismaël Mejía +Robert Bradshaw +Lukasz >> Cwik +Chamikara Jayalath >> >>> >>>>>>> >> >>> >>>>>>> On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty < >> [email protected]> wrote: >> >>> >>>>>>>> >> >>> >>>>>>>> Hey dev@! >> >>> >>>>>>>> >> >>> >>>>>>>> Myself and a few other Spotify data engineers have put >> together a design doc for SMB Join support in Beam, and have a working Java >> implementation we've started to put up for PR ([0], [1], [2]). There's more >> detailed information in the document, but the tl;dr is that SMB is a >> strategy to optimize joins for file-based sources by modifying the initial >> write operation to write records in sorted buckets based on the desired >> join key. This means that subsequent joins of datasets written in this way >> are only sequential file reads, no shuffling involved. We've seen some >> pretty substantial performance speedups with our implementation and would >> love to get it checked in to Beam's Java SDK. >> >>> >>>>>>>> >> >>> >>>>>>>> We'd appreciate any suggestions or feedback on our >> proposal--the design doc should be public to comment on. >> >>> >>>>>>>> >> >>> >>>>>>>> Thanks! >> >>> >>>>>>>> Claire / Neville >> > -- Cheers, Gleb
