It is not possible to implement SMB on top of the various top-level
SomeFileIO.{write,read} PTransforms. One need the internal details.It seems we should re-use (and expose) the existing FileSinks as a parameter to SMBSink (and also port the old-style sinks to use these). We also need the complementary FileSource that SMBSource could be parameterized by (and would also be useful in the readAll transforms that take a list of files (e.g. from a match) and actually read them (do these not exist already?). This particular use does not need (dynamic or otherwise) range restrictions, but it's easy to say "read from 0 to infinity" if it has them. It does require maintaining ordering (within a shard). Perhaps this could be sketched out more in a doc? On Tue, Jul 16, 2019 at 11:34 PM Eugene Kirpichov <[email protected]> wrote: > > I'd like to reiterate the request to not build anything on top of > FileBasedSource/Reader. > If the design requires having some interface for representing a function from > a filename to a stream of records, better introduce a new interface for that. > If it requires interoperability with other IOs that read files, better change > them to use the new interface. > > On Tue, Jul 16, 2019 at 9:08 AM Chamikara Jayalath <[email protected]> > wrote: >> >> Thanks this clarifies a lot. >> >> For writer, I think it's great if you can utilize existing FileIO.Sink >> implementations even if you have to reimplement some of the logic (for >> example compression, temp file handling) that is already implemented in Beam >> FileIO/WriteFiles transforms in your SMB sink transform. >> >> For reader, you are right that there's no FileIO.Read. What we have are >> various implementations of FileBasedSource/FileBasedReader classes that are >> currently intentionally hidden since Beam IO transforms are expected to be >> the intended public interface for users. If you can expose and re-use these >> classes with slight modifications (keeping backwards compatibility) I'm OK >> with it. Otherwise you'll have to write your own reader implementations. >> >> In general, seems like SMB has very strong requirements related to >> sharding/hot-key management that are not easily achievable by implementing >> SMB source/sink as a composite transform that utilizes existing source/sink >> transforms. This forces you to implement this logic in your own DoFns and >> existing Beam primitives are not easily re-usable in this context. >> >> Thanks, >> Cham >> >> On Tue, Jul 16, 2019 at 8:26 AM Neville Li <[email protected]> wrote: >>> >>> A little clarification of the IO requirement and my understanding of the >>> current state of IO. >>> >>> tl;dr: not sure if there're reusable bits for the reader. It's possible to >>> reuse some for the writer but with heavy refactoring. >>> >>> Reader >>> >>> For each bucket (containing the same key partition, sorted) across multiple >>> input data sets, we stream records from bucket files and merge sort. >>> We open the files in a DoFn, and emit KV<K, CoGbkResult> where the CGBKR >>> encapsulates Iterable<V> from each input. >>> Basically we need a simple API like ResourceId -> Iterator<T>, i.e. >>> sequential read, no block/offset/split requirement. >>> FileBasedSource.FileBasedReader seems the closest fit but they're nested & >>> decoupled. >>> There's no FileIO.Read, only a ReadMatches[1], which can be used with >>> ReadAllViaFileBasedSource<T>. But that's not the granularity we need, since >>> we lose ordering of the input records, and can't merge 2+ sources. >>> >>> Writer >>> >>> We get a `PCollection<BucketShardId, Iterable<T>>` after bucket and and >>> sort, where Iterable<T> is the records sorted by key and BucketShardId is >>> used to produce filename, e.g. bucket-00001-shard-00002.avro. >>> We write each Iterable<T> to a temp file and move to final destination when >>> done. Both should ideally reuse existing code. >>> Looks like FileIO.Sink (and impls in AvroIO, TextIO, TFRecordIO) supports >>> record writing into a WritableByteChannel, but some logic like compression >>> is handled in FileIO through ViaFileBasedSink which extends FileBasedSink. >>> FileIO uses WriteFiles[3] to shard and write of PCollection<T>. Again we >>> lose ordering of the output records or custom file naming scheme. However, >>> WriteShardsIntoTempFilesFn[4] and FinalizeTempFileBundles[5] in WriteFiles >>> seem closest to our need but would have to be split out and generalized. >>> >>> Note on reader block/offset/split requirement >>> >>> Because of the merge sort, we can't split or offset seek a bucket file. >>> Because without persisting the offset index of a key group somewhere, we >>> can't efficiently skip to a key group without exhausting the previous ones. >>> Furthermore we need to merge sort and align keys from multiple sources, >>> which may not have the same key distribution. It might be possible to >>> binary search for matching keys but that's extra complication. IMO the >>> reader work distribution is better solved by better bucket/shard strategy >>> in upstream writer. >>> >>> References >>> >>> ReadMatches extends PTransform<PCollection<MatchResult.Metadata>, >>> PCollection<ReadableFile>> >>> ReadAllViaFileBasedSource<T> extends PTransform<PCollection<ReadableFile>, >>> PCollection<T>> >>> WriteFiles<UserT, DestinationT, OutputT> extends >>> PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>> >>> WriteShardsIntoTempFilesFn extends DoFn<KV<ShardedKey<Integer>, >>> Iterable<UserT>>, FileResult<DestinationT>> >>> FinalizeTempFileBundles extends PTransform< >>> PCollection<List<FileResult<DestinationT>>>, WriteFilesResult<DestinationT>> >>> >>> >>> On Tue, Jul 16, 2019 at 5:15 AM Robert Bradshaw <[email protected]> wrote: >>>> >>>> On Mon, Jul 15, 2019 at 7:03 PM Eugene Kirpichov <[email protected]> >>>> wrote: >>>> > >>>> > Quick note: I didn't look through the document, but please do not build >>>> > on either FileBasedSink or FileBasedReader. They are both remnants of >>>> > the old, non-composable IO world; and in fact much of the composable IO >>>> > work emerged from frustration with their limitations and recognizing >>>> > that many other IOs were suffering from the same limitations. >>>> > Instead of FileBasedSink, build on FileIO.write; instead of >>>> > FileBasedReader, build on FileIO.read. >>>> >>>> +1 >>>> >>>> I think the sink could be written atop FileIO.write, possibly using >>>> dynamic destinations. At the very least the FileSink interface, which >>>> handles the details of writing a single shard, would be an ideal way >>>> to parameterize an SMB sink. It seems that none of our existing IOs >>>> (publically?) expose FileSink implementations. >>>> >>>> FileIO.read is not flexible enough to do the merging. Eugene, is there >>>> a composable analogue to FileSink, for sources, i.e. something that >>>> can turn a file handle (possibly with offsets) into a set of records >>>> other than FileBasedReader? >>>> >>>> > On Mon, Jul 15, 2019 at 9:01 AM Gleb Kanterov <[email protected]> wrote: >>>> >> >>>> >> I share the same concern with Robert regarding re-implementing parts of >>>> >> IO. At the same time, in the past, I worked on internal libraries that >>>> >> try to re-use code from existing IO, and it's hardly possible because >>>> >> it feels like it wasn't designed for re-use. There are a lot of classes >>>> >> that are nested (non-static) or non-public. I can understand why they >>>> >> were made non-public, it's a hard abstraction to design well and keep >>>> >> compatibility. As Neville mentioned, decoupling readers and writers >>>> >> would not only benefit for this proposal but for any other use-case >>>> >> that has to deal with low-level API such as FileSystem API, that is >>>> >> hardly possible today without copy-pasting, >>>> >> >>>> >> >>>> >> >>>> >> >>>> >> >>>> >> On Mon, Jul 15, 2019 at 5:05 PM Neville Li <[email protected]> >>>> >> wrote: >>>> >>> >>>> >>> Re: avoiding mirroring IO functionality, what about: >>>> >>> >>>> >>> - Decouple the nested FileBasedSink.Writer and >>>> >>> FileBasedSource.FileBasedReader, make them top level and remove >>>> >>> references to parent classes. >>>> >>> - Simplify the interfaces, while maintaining support for block/offset >>>> >>> read & sequential write. >>>> >>> - As a bonus, the refactored IO classes can be used standalone in case >>>> >>> when the user wants to perform custom IO in a DoFn, i.e. a >>>> >>> PTransform<PCollection<URI>, PCollection<KV<URI, GenericRecord>>>. >>>> >>> Today this requires a lot of copy-pasted Avro boilerplate. >>>> >>> - For compatibility, we can delegate to the new classes from the old >>>> >>> ones and remove them in the next breaking release. >>>> >>> >>>> >>> Re: WriteFiles logic, I'm not sure about generalizing it, but what >>>> >>> about splitting the part handling writing temp files into a new >>>> >>> PTransform<PCollection<KV<ResourceId, Iterable<UserT>>>, >>>> >>> PCollection<WriteFilesResult<DestinationT>>>? That splits the >>>> >>> bucket-shard logic from actual file IO. >>>> >>> >>>> >>> On Mon, Jul 15, 2019 at 10:27 AM Robert Bradshaw <[email protected]> >>>> >>> wrote: >>>> >>>> >>>> >>>> I agree that generalizing the existing FileIO may not be the right >>>> >>>> path forward, and I'd only make their innards public with great care. >>>> >>>> (Would this be used like like >>>> >>>> SmbSink(MyFileIO.sink(parameters).getWriter[Factory]())?) SMB is a bit >>>> >>>> unique that the source and sink are much more coupled than other >>>> >>>> sources and sinks (which happen to be completely independent, if >>>> >>>> complementary implementations, whereas SMB attempts to be a kind of >>>> >>>> pipe where one half is instanciated in each pipeline). >>>> >>>> >>>> >>>> In short, an SMB source/sink that is parameterized by an arbitrary, >>>> >>>> existing IO would be ideal (but possibly not feasible (per existing >>>> >>>> prioritizations)), or an SMB source/sink that works as a pair. What >>>> >>>> I'd like to avoid is a set of parallel SMB IO classes that (partially, >>>> >>>> and incompletely) mirror the existing IO ones (from an API >>>> >>>> perspective--how much implementation it makes sense to share is an >>>> >>>> orthogonal issue that I'm sure can be worked out.) >>>> >>>> >>>> >>>> On Mon, Jul 15, 2019 at 4:18 PM Neville Li <[email protected]> >>>> >>>> wrote: >>>> >>>> > >>>> >>>> > Hi Robert, >>>> >>>> > >>>> >>>> > I agree, it'd be nice to reuse FileIO logic of different file >>>> >>>> > types. But given the current code structure of FileIO & scope of >>>> >>>> > the change, I feel it's better left for future refactor PRs. >>>> >>>> > >>>> >>>> > Some thoughts: >>>> >>>> > - SMB file operation is simple single file sequential reads/writes, >>>> >>>> > which already exists as Writer & FileBasedReader but are private >>>> >>>> > inner classes, and have references to the parent Sink/Source >>>> >>>> > instance. >>>> >>>> > - The readers also have extra offset/split logic but that can be >>>> >>>> > worked around. >>>> >>>> > - It'll be nice to not duplicate temp->destination file logic but >>>> >>>> > again WriteFiles is assuming a single integer shard key, so it'll >>>> >>>> > take some refactoring to reuse it. >>>> >>>> > >>>> >>>> > All of these can be done in backwards compatible way. OTOH >>>> >>>> > generalizing the existing components too much (esp. WriteFiles, >>>> >>>> > which is already complex) might lead to two logic paths, one >>>> >>>> > specialized for the SMB case. It might be easier to decouple some >>>> >>>> > of them for better reuse. But again I feel it's a separate >>>> >>>> > discussion. >>>> >>>> > >>>> >>>> > On Mon, Jul 15, 2019 at 9:45 AM Claire McGinty >>>> >>>> > <[email protected]> wrote: >>>> >>>> >> >>>> >>>> >> Thanks Robert! >>>> >>>> >> >>>> >>>> >> We'd definitely like to be able to re-use existing I/O >>>> >>>> >> components--for example the Writer<DestinationT, >>>> >>>> >> OutputT>/FileBasedReader<T> (since they operate on a >>>> >>>> >> WritableByteChannel/ReadableByteChannel, which is the level of >>>> >>>> >> granularity we need) but the Writers, at least, seem to be mostly >>>> >>>> >> private-access. Do you foresee them being made public at any point? >>>> >>>> >> >>>> >>>> >> - Claire >>>> >>>> >> >>>> >>>> >> On Mon, Jul 15, 2019 at 9:31 AM Robert Bradshaw >>>> >>>> >> <[email protected]> wrote: >>>> >>>> >>> >>>> >>>> >>> I left some comments on the doc. >>>> >>>> >>> >>>> >>>> >>> I think the general idea is sound, but one thing that worries me >>>> >>>> >>> is >>>> >>>> >>> the introduction of a parallel set of IOs that mirrors the >>>> >>>> >>> (existing) >>>> >>>> >>> FileIOs. I would suggest either (1) incorporate this functionality >>>> >>>> >>> into the generic FileIO infrastructure, or let it be >>>> >>>> >>> parameterized by >>>> >>>> >>> arbitrary IO (which I'm not sure is possible, especially for the >>>> >>>> >>> Read >>>> >>>> >>> side (and better would be the capability of supporting arbitrary >>>> >>>> >>> sources, aka an optional "as-sharded-source" operation that >>>> >>>> >>> returns a >>>> >>>> >>> PTransform<..., KV<shard-id, Iterable<KV<K, V>>> where the >>>> >>>> >>> iterable is >>>> >>>> >>> promised to be in key order)) or support a single SMB aka >>>> >>>> >>> "PreGrouping" source/sink pair that's aways used together (and >>>> >>>> >>> whose >>>> >>>> >>> underlying format is not necessarily public). >>>> >>>> >>> >>>> >>>> >>> On Sat, Jul 13, 2019 at 3:19 PM Neville Li >>>> >>>> >>> <[email protected]> wrote: >>>> >>>> >>> > >>>> >>>> >>> > 4 people have commented but mostly clarifying details and not >>>> >>>> >>> > much on the overall design. >>>> >>>> >>> > >>>> >>>> >>> > It'd be great to have thumbs up/down on the design, >>>> >>>> >>> > specifically metadata, bucket & shard strategy, etc., since >>>> >>>> >>> > that affects backwards compatibility of output files. >>>> >>>> >>> > Some breaking changes, e.g. dynamic # of shards, are out of >>>> >>>> >>> > scope for V1 unless someone feels strongly about it. The >>>> >>>> >>> > current scope should cover all our use cases and leave room for >>>> >>>> >>> > optimization. >>>> >>>> >>> > >>>> >>>> >>> > Once green lighted we can start adopting internally, ironing >>>> >>>> >>> > out rough edges while iterating on the PRs in parallel. >>>> >>>> >>> > >>>> >>>> >>> > Most of the implementation is self-contained in the >>>> >>>> >>> > extensions:smb module, except making a few core classes/methods >>>> >>>> >>> > public for reuse. So despite the amount of work it's still >>>> >>>> >>> > fairly low risk to the code base. There're some proposed >>>> >>>> >>> > optimization & refactoring involving core (see appendix) but >>>> >>>> >>> > IMO they're better left for followup PRs. >>>> >>>> >>> > >>>> >>>> >>> > On Fri, Jul 12, 2019 at 11:34 PM Kenneth Knowles >>>> >>>> >>> > <[email protected]> wrote: >>>> >>>> >>> >> >>>> >>>> >>> >> I've seen some discussion on the doc. I cannot tell whether >>>> >>>> >>> >> the questions are resolved or what the status of review is. >>>> >>>> >>> >> Would you mind looping this thread with a quick summary? This >>>> >>>> >>> >> is such a major piece of work I don't want it to sit with >>>> >>>> >>> >> everyone thinking they are waiting on someone else, or any >>>> >>>> >>> >> such thing. (not saying this is happening, just pinging to be >>>> >>>> >>> >> sure) >>>> >>>> >>> >> >>>> >>>> >>> >> Kenn >>>> >>>> >>> >> >>>> >>>> >>> >> On Mon, Jul 1, 2019 at 1:09 PM Neville Li >>>> >>>> >>> >> <[email protected]> wrote: >>>> >>>> >>> >>> >>>> >>>> >>> >>> Updated the doc a bit with more future work (appendix). IMO >>>> >>>> >>> >>> most of them are non-breaking and better done in separate PRs >>>> >>>> >>> >>> later since some involve pretty big refactoring and are >>>> >>>> >>> >>> outside the scope of MVP. >>>> >>>> >>> >>> >>>> >>>> >>> >>> For now we'd really like to get feedback on some fundamental >>>> >>>> >>> >>> design decisions and find a way to move forward. >>>> >>>> >>> >>> >>>> >>>> >>> >>> On Thu, Jun 27, 2019 at 4:39 PM Neville Li >>>> >>>> >>> >>> <[email protected]> wrote: >>>> >>>> >>> >>>> >>>> >>>> >>> >>>> Thanks. I responded to comments in the doc. More inline. >>>> >>>> >>> >>>> >>>> >>>> >>> >>>> On Thu, Jun 27, 2019 at 2:44 PM Chamikara Jayalath >>>> >>>> >>> >>>> <[email protected]> wrote: >>>> >>>> >>> >>>>> >>>> >>>> >>> >>>>> Thanks added few comments. >>>> >>>> >>> >>>>> >>>> >>>> >>> >>>>> If I understood correctly, you basically assign elements >>>> >>>> >>> >>>>> with keys to different buckets which are written to unique >>>> >>>> >>> >>>>> files and merge files for the same key while reading ? >>>> >>>> >>> >>>>> >>>> >>>> >>> >>>>> Some of my concerns are. >>>> >>>> >>> >>>>> >>>> >>>> >>> >>>>> (1) Seems like you rely on an in-memory sorting of >>>> >>>> >>> >>>>> buckets. Will this end up limiting the size of a >>>> >>>> >>> >>>>> PCollection you can process ? >>>> >>>> >>> >>>> >>>> >>>> >>> >>>> The sorter transform we're using supports spilling and >>>> >>>> >>> >>>> external sort. We can break up large key groups further by >>>> >>>> >>> >>>> sharding, similar to fan out in some GBK transforms. >>>> >>>> >>> >>>> >>>> >>>> >>> >>>>> (2) Seems like you rely on Reshuffle.viaRandomKey() which >>>> >>>> >>> >>>>> is actually implemented using a shuffle (which you try to >>>> >>>> >>> >>>>> replace with this proposal). >>>> >>>> >>> >>>> >>>> >>>> >>> >>>> That's for distributing task metadata, so that each DoFn >>>> >>>> >>> >>>> thread picks up a random bucket and sort merge key-values. >>>> >>>> >>> >>>> It's not shuffling actual data. >>>> >>>> >>> >>>> >>>> >>>> >>> >>>>> >>>> >>>> >>> >>>>> (3) I think (at least some of the) shuffle implementations >>>> >>>> >>> >>>>> are implemented in ways similar to this (writing to files >>>> >>>> >>> >>>>> and merging). So I'm wondering if the performance benefits >>>> >>>> >>> >>>>> you see are for a very specific case and may limit the >>>> >>>> >>> >>>>> functionality in other ways. >>>> >>>> >>> >>>> >>>> >>>> >>> >>>> This is for the common pattern of few core data producer >>>> >>>> >>> >>>> pipelines and many downstream consumer pipelines. It's not >>>> >>>> >>> >>>> intended to replace shuffle/join within a single pipeline. >>>> >>>> >>> >>>> On the producer side, by pre-grouping/sorting data and >>>> >>>> >>> >>>> writing to bucket/shard output files, the consumer can >>>> >>>> >>> >>>> sort/merge matching ones without a CoGBK. Essentially we're >>>> >>>> >>> >>>> paying the shuffle cost upfront to avoid them repeatedly in >>>> >>>> >>> >>>> each consumer pipeline that wants to join data. >>>> >>>> >>> >>>> >>>> >>>> >>> >>>>> >>>> >>>> >>> >>>>> Thanks, >>>> >>>> >>> >>>>> Cham >>>> >>>> >>> >>>>> >>>> >>>> >>> >>>>> >>>> >>>> >>> >>>>> On Thu, Jun 27, 2019 at 8:12 AM Neville Li >>>> >>>> >>> >>>>> <[email protected]> wrote: >>>> >>>> >>> >>>>>> >>>> >>>> >>> >>>>>> Ping again. Any chance someone takes a look to get this >>>> >>>> >>> >>>>>> thing going? It's just a design doc and basic metadata/IO >>>> >>>> >>> >>>>>> impl. We're not talking about actual source/sink code yet >>>> >>>> >>> >>>>>> (already done but saved for future PRs). >>>> >>>> >>> >>>>>> >>>> >>>> >>> >>>>>> On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay >>>> >>>> >>> >>>>>> <[email protected]> wrote: >>>> >>>> >>> >>>>>>> >>>> >>>> >>> >>>>>>> Thank you Claire, this looks promising. Explicitly adding >>>> >>>> >>> >>>>>>> a few folks that might have feedback: +Ismaël Mejía >>>> >>>> >>> >>>>>>> +Robert Bradshaw +Lukasz Cwik +Chamikara Jayalath >>>> >>>> >>> >>>>>>> >>>> >>>> >>> >>>>>>> On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty >>>> >>>> >>> >>>>>>> <[email protected]> wrote: >>>> >>>> >>> >>>>>>>> >>>> >>>> >>> >>>>>>>> Hey dev@! >>>> >>>> >>> >>>>>>>> >>>> >>>> >>> >>>>>>>> Myself and a few other Spotify data engineers have put >>>> >>>> >>> >>>>>>>> together a design doc for SMB Join support in Beam, and >>>> >>>> >>> >>>>>>>> have a working Java implementation we've started to put >>>> >>>> >>> >>>>>>>> up for PR ([0], [1], [2]). There's more detailed >>>> >>>> >>> >>>>>>>> information in the document, but the tl;dr is that SMB >>>> >>>> >>> >>>>>>>> is a strategy to optimize joins for file-based sources >>>> >>>> >>> >>>>>>>> by modifying the initial write operation to write >>>> >>>> >>> >>>>>>>> records in sorted buckets based on the desired join key. >>>> >>>> >>> >>>>>>>> This means that subsequent joins of datasets written in >>>> >>>> >>> >>>>>>>> this way are only sequential file reads, no shuffling >>>> >>>> >>> >>>>>>>> involved. We've seen some pretty substantial performance >>>> >>>> >>> >>>>>>>> speedups with our implementation and would love to get >>>> >>>> >>> >>>>>>>> it checked in to Beam's Java SDK. >>>> >>>> >>> >>>>>>>> >>>> >>>> >>> >>>>>>>> We'd appreciate any suggestions or feedback on our >>>> >>>> >>> >>>>>>>> proposal--the design doc should be public to comment on. >>>> >>>> >>> >>>>>>>> >>>> >>>> >>> >>>>>>>> Thanks! >>>> >>>> >>> >>>>>>>> Claire / Neville >>>> >> >>>> >> >>>> >> >>>> >> -- >>>> >> Cheers, >>>> >> Gleb
