On Mon, Jul 15, 2019 at 7:03 PM Eugene Kirpichov <kirpic...@google.com> wrote: > > Quick note: I didn't look through the document, but please do not build on > either FileBasedSink or FileBasedReader. They are both remnants of the old, > non-composable IO world; and in fact much of the composable IO work emerged > from frustration with their limitations and recognizing that many other IOs > were suffering from the same limitations. > Instead of FileBasedSink, build on FileIO.write; instead of FileBasedReader, > build on FileIO.read.
+1 I think the sink could be written atop FileIO.write, possibly using dynamic destinations. At the very least the FileSink interface, which handles the details of writing a single shard, would be an ideal way to parameterize an SMB sink. It seems that none of our existing IOs (publically?) expose FileSink implementations. FileIO.read is not flexible enough to do the merging. Eugene, is there a composable analogue to FileSink, for sources, i.e. something that can turn a file handle (possibly with offsets) into a set of records other than FileBasedReader? > On Mon, Jul 15, 2019 at 9:01 AM Gleb Kanterov <g...@spotify.com> wrote: >> >> I share the same concern with Robert regarding re-implementing parts of IO. >> At the same time, in the past, I worked on internal libraries that try to >> re-use code from existing IO, and it's hardly possible because it feels like >> it wasn't designed for re-use. There are a lot of classes that are nested >> (non-static) or non-public. I can understand why they were made non-public, >> it's a hard abstraction to design well and keep compatibility. As Neville >> mentioned, decoupling readers and writers would not only benefit for this >> proposal but for any other use-case that has to deal with low-level API such >> as FileSystem API, that is hardly possible today without copy-pasting, >> >> >> >> >> >> On Mon, Jul 15, 2019 at 5:05 PM Neville Li <neville....@gmail.com> wrote: >>> >>> Re: avoiding mirroring IO functionality, what about: >>> >>> - Decouple the nested FileBasedSink.Writer and >>> FileBasedSource.FileBasedReader, make them top level and remove references >>> to parent classes. >>> - Simplify the interfaces, while maintaining support for block/offset read >>> & sequential write. >>> - As a bonus, the refactored IO classes can be used standalone in case when >>> the user wants to perform custom IO in a DoFn, i.e. a >>> PTransform<PCollection<URI>, PCollection<KV<URI, GenericRecord>>>. Today >>> this requires a lot of copy-pasted Avro boilerplate. >>> - For compatibility, we can delegate to the new classes from the old ones >>> and remove them in the next breaking release. >>> >>> Re: WriteFiles logic, I'm not sure about generalizing it, but what about >>> splitting the part handling writing temp files into a new >>> PTransform<PCollection<KV<ResourceId, Iterable<UserT>>>, >>> PCollection<WriteFilesResult<DestinationT>>>? That splits the bucket-shard >>> logic from actual file IO. >>> >>> On Mon, Jul 15, 2019 at 10:27 AM Robert Bradshaw <rober...@google.com> >>> wrote: >>>> >>>> I agree that generalizing the existing FileIO may not be the right >>>> path forward, and I'd only make their innards public with great care. >>>> (Would this be used like like >>>> SmbSink(MyFileIO.sink(parameters).getWriter[Factory]())?) SMB is a bit >>>> unique that the source and sink are much more coupled than other >>>> sources and sinks (which happen to be completely independent, if >>>> complementary implementations, whereas SMB attempts to be a kind of >>>> pipe where one half is instanciated in each pipeline). >>>> >>>> In short, an SMB source/sink that is parameterized by an arbitrary, >>>> existing IO would be ideal (but possibly not feasible (per existing >>>> prioritizations)), or an SMB source/sink that works as a pair. What >>>> I'd like to avoid is a set of parallel SMB IO classes that (partially, >>>> and incompletely) mirror the existing IO ones (from an API >>>> perspective--how much implementation it makes sense to share is an >>>> orthogonal issue that I'm sure can be worked out.) >>>> >>>> On Mon, Jul 15, 2019 at 4:18 PM Neville Li <neville....@gmail.com> wrote: >>>> > >>>> > Hi Robert, >>>> > >>>> > I agree, it'd be nice to reuse FileIO logic of different file types. But >>>> > given the current code structure of FileIO & scope of the change, I feel >>>> > it's better left for future refactor PRs. >>>> > >>>> > Some thoughts: >>>> > - SMB file operation is simple single file sequential reads/writes, >>>> > which already exists as Writer & FileBasedReader but are private inner >>>> > classes, and have references to the parent Sink/Source instance. >>>> > - The readers also have extra offset/split logic but that can be worked >>>> > around. >>>> > - It'll be nice to not duplicate temp->destination file logic but again >>>> > WriteFiles is assuming a single integer shard key, so it'll take some >>>> > refactoring to reuse it. >>>> > >>>> > All of these can be done in backwards compatible way. OTOH generalizing >>>> > the existing components too much (esp. WriteFiles, which is already >>>> > complex) might lead to two logic paths, one specialized for the SMB >>>> > case. It might be easier to decouple some of them for better reuse. But >>>> > again I feel it's a separate discussion. >>>> > >>>> > On Mon, Jul 15, 2019 at 9:45 AM Claire McGinty >>>> > <claire.d.mcgi...@gmail.com> wrote: >>>> >> >>>> >> Thanks Robert! >>>> >> >>>> >> We'd definitely like to be able to re-use existing I/O components--for >>>> >> example the Writer<DestinationT, OutputT>/FileBasedReader<T> (since >>>> >> they operate on a WritableByteChannel/ReadableByteChannel, which is the >>>> >> level of granularity we need) but the Writers, at least, seem to be >>>> >> mostly private-access. Do you foresee them being made public at any >>>> >> point? >>>> >> >>>> >> - Claire >>>> >> >>>> >> On Mon, Jul 15, 2019 at 9:31 AM Robert Bradshaw <rober...@google.com> >>>> >> wrote: >>>> >>> >>>> >>> I left some comments on the doc. >>>> >>> >>>> >>> I think the general idea is sound, but one thing that worries me is >>>> >>> the introduction of a parallel set of IOs that mirrors the (existing) >>>> >>> FileIOs. I would suggest either (1) incorporate this functionality >>>> >>> into the generic FileIO infrastructure, or let it be parameterized by >>>> >>> arbitrary IO (which I'm not sure is possible, especially for the Read >>>> >>> side (and better would be the capability of supporting arbitrary >>>> >>> sources, aka an optional "as-sharded-source" operation that returns a >>>> >>> PTransform<..., KV<shard-id, Iterable<KV<K, V>>> where the iterable is >>>> >>> promised to be in key order)) or support a single SMB aka >>>> >>> "PreGrouping" source/sink pair that's aways used together (and whose >>>> >>> underlying format is not necessarily public). >>>> >>> >>>> >>> On Sat, Jul 13, 2019 at 3:19 PM Neville Li <neville....@gmail.com> >>>> >>> wrote: >>>> >>> > >>>> >>> > 4 people have commented but mostly clarifying details and not much >>>> >>> > on the overall design. >>>> >>> > >>>> >>> > It'd be great to have thumbs up/down on the design, specifically >>>> >>> > metadata, bucket & shard strategy, etc., since that affects >>>> >>> > backwards compatibility of output files. >>>> >>> > Some breaking changes, e.g. dynamic # of shards, are out of scope >>>> >>> > for V1 unless someone feels strongly about it. The current scope >>>> >>> > should cover all our use cases and leave room for optimization. >>>> >>> > >>>> >>> > Once green lighted we can start adopting internally, ironing out >>>> >>> > rough edges while iterating on the PRs in parallel. >>>> >>> > >>>> >>> > Most of the implementation is self-contained in the extensions:smb >>>> >>> > module, except making a few core classes/methods public for reuse. >>>> >>> > So despite the amount of work it's still fairly low risk to the code >>>> >>> > base. There're some proposed optimization & refactoring involving >>>> >>> > core (see appendix) but IMO they're better left for followup PRs. >>>> >>> > >>>> >>> > On Fri, Jul 12, 2019 at 11:34 PM Kenneth Knowles <k...@apache.org> >>>> >>> > wrote: >>>> >>> >> >>>> >>> >> I've seen some discussion on the doc. I cannot tell whether the >>>> >>> >> questions are resolved or what the status of review is. Would you >>>> >>> >> mind looping this thread with a quick summary? This is such a major >>>> >>> >> piece of work I don't want it to sit with everyone thinking they >>>> >>> >> are waiting on someone else, or any such thing. (not saying this is >>>> >>> >> happening, just pinging to be sure) >>>> >>> >> >>>> >>> >> Kenn >>>> >>> >> >>>> >>> >> On Mon, Jul 1, 2019 at 1:09 PM Neville Li <neville....@gmail.com> >>>> >>> >> wrote: >>>> >>> >>> >>>> >>> >>> Updated the doc a bit with more future work (appendix). IMO most >>>> >>> >>> of them are non-breaking and better done in separate PRs later >>>> >>> >>> since some involve pretty big refactoring and are outside the >>>> >>> >>> scope of MVP. >>>> >>> >>> >>>> >>> >>> For now we'd really like to get feedback on some fundamental >>>> >>> >>> design decisions and find a way to move forward. >>>> >>> >>> >>>> >>> >>> On Thu, Jun 27, 2019 at 4:39 PM Neville Li <neville....@gmail.com> >>>> >>> >>> wrote: >>>> >>> >>>> >>>> >>> >>>> Thanks. I responded to comments in the doc. More inline. >>>> >>> >>>> >>>> >>> >>>> On Thu, Jun 27, 2019 at 2:44 PM Chamikara Jayalath >>>> >>> >>>> <chamik...@google.com> wrote: >>>> >>> >>>>> >>>> >>> >>>>> Thanks added few comments. >>>> >>> >>>>> >>>> >>> >>>>> If I understood correctly, you basically assign elements with >>>> >>> >>>>> keys to different buckets which are written to unique files and >>>> >>> >>>>> merge files for the same key while reading ? >>>> >>> >>>>> >>>> >>> >>>>> Some of my concerns are. >>>> >>> >>>>> >>>> >>> >>>>> (1) Seems like you rely on an in-memory sorting of buckets. >>>> >>> >>>>> Will this end up limiting the size of a PCollection you can >>>> >>> >>>>> process ? >>>> >>> >>>> >>>> >>> >>>> The sorter transform we're using supports spilling and external >>>> >>> >>>> sort. We can break up large key groups further by sharding, >>>> >>> >>>> similar to fan out in some GBK transforms. >>>> >>> >>>> >>>> >>> >>>>> (2) Seems like you rely on Reshuffle.viaRandomKey() which is >>>> >>> >>>>> actually implemented using a shuffle (which you try to replace >>>> >>> >>>>> with this proposal). >>>> >>> >>>> >>>> >>> >>>> That's for distributing task metadata, so that each DoFn thread >>>> >>> >>>> picks up a random bucket and sort merge key-values. It's not >>>> >>> >>>> shuffling actual data. >>>> >>> >>>> >>>> >>> >>>>> >>>> >>> >>>>> (3) I think (at least some of the) shuffle implementations are >>>> >>> >>>>> implemented in ways similar to this (writing to files and >>>> >>> >>>>> merging). So I'm wondering if the performance benefits you see >>>> >>> >>>>> are for a very specific case and may limit the functionality in >>>> >>> >>>>> other ways. >>>> >>> >>>> >>>> >>> >>>> This is for the common pattern of few core data producer >>>> >>> >>>> pipelines and many downstream consumer pipelines. It's not >>>> >>> >>>> intended to replace shuffle/join within a single pipeline. On the >>>> >>> >>>> producer side, by pre-grouping/sorting data and writing to >>>> >>> >>>> bucket/shard output files, the consumer can sort/merge matching >>>> >>> >>>> ones without a CoGBK. Essentially we're paying the shuffle cost >>>> >>> >>>> upfront to avoid them repeatedly in each consumer pipeline that >>>> >>> >>>> wants to join data. >>>> >>> >>>> >>>> >>> >>>>> >>>> >>> >>>>> Thanks, >>>> >>> >>>>> Cham >>>> >>> >>>>> >>>> >>> >>>>> >>>> >>> >>>>> On Thu, Jun 27, 2019 at 8:12 AM Neville Li >>>> >>> >>>>> <neville....@gmail.com> wrote: >>>> >>> >>>>>> >>>> >>> >>>>>> Ping again. Any chance someone takes a look to get this thing >>>> >>> >>>>>> going? It's just a design doc and basic metadata/IO impl. We're >>>> >>> >>>>>> not talking about actual source/sink code yet (already done but >>>> >>> >>>>>> saved for future PRs). >>>> >>> >>>>>> >>>> >>> >>>>>> On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay <al...@google.com> >>>> >>> >>>>>> wrote: >>>> >>> >>>>>>> >>>> >>> >>>>>>> Thank you Claire, this looks promising. Explicitly adding a >>>> >>> >>>>>>> few folks that might have feedback: +Ismaël Mejía +Robert >>>> >>> >>>>>>> Bradshaw +Lukasz Cwik +Chamikara Jayalath >>>> >>> >>>>>>> >>>> >>> >>>>>>> On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty >>>> >>> >>>>>>> <claire.d.mcgi...@gmail.com> wrote: >>>> >>> >>>>>>>> >>>> >>> >>>>>>>> Hey dev@! >>>> >>> >>>>>>>> >>>> >>> >>>>>>>> Myself and a few other Spotify data engineers have put >>>> >>> >>>>>>>> together a design doc for SMB Join support in Beam, and have >>>> >>> >>>>>>>> a working Java implementation we've started to put up for PR >>>> >>> >>>>>>>> ([0], [1], [2]). There's more detailed information in the >>>> >>> >>>>>>>> document, but the tl;dr is that SMB is a strategy to optimize >>>> >>> >>>>>>>> joins for file-based sources by modifying the initial write >>>> >>> >>>>>>>> operation to write records in sorted buckets based on the >>>> >>> >>>>>>>> desired join key. This means that subsequent joins of >>>> >>> >>>>>>>> datasets written in this way are only sequential file reads, >>>> >>> >>>>>>>> no shuffling involved. We've seen some pretty substantial >>>> >>> >>>>>>>> performance speedups with our implementation and would love >>>> >>> >>>>>>>> to get it checked in to Beam's Java SDK. >>>> >>> >>>>>>>> >>>> >>> >>>>>>>> We'd appreciate any suggestions or feedback on our >>>> >>> >>>>>>>> proposal--the design doc should be public to comment on. >>>> >>> >>>>>>>> >>>> >>> >>>>>>>> Thanks! >>>> >>> >>>>>>>> Claire / Neville >> >> >> >> -- >> Cheers, >> Gleb