I'd like to reiterate the request to not build anything on top of FileBasedSource/Reader. If the design requires having some interface for representing a function from a filename to a stream of records, better introduce a new interface for that. If it requires interoperability with other IOs that read files, better change them to use the new interface.
On Tue, Jul 16, 2019 at 9:08 AM Chamikara Jayalath <chamik...@google.com> wrote: > Thanks this clarifies a lot. > > For writer, I think it's great if you can utilize existing FileIO.Sink > implementations even if you have to reimplement some of the logic (for > example compression, temp file handling) that is already implemented in > Beam FileIO/WriteFiles transforms in your SMB sink transform. > > For reader, you are right that there's no FileIO.Read. What we have are > various implementations of FileBasedSource/FileBasedReader classes that are > currently intentionally hidden since Beam IO transforms are expected to be > the intended public interface for users. If you can expose and re-use these > classes with slight modifications (keeping backwards compatibility) I'm OK > with it. Otherwise you'll have to write your own reader implementations. > > In general, seems like SMB has very strong requirements related to > sharding/hot-key management that are not easily achievable by implementing > SMB source/sink as a composite transform that utilizes existing source/sink > transforms. This forces you to implement this logic in your own DoFns and > existing Beam primitives are not easily re-usable in this context. > > Thanks, > Cham > > On Tue, Jul 16, 2019 at 8:26 AM Neville Li <neville....@gmail.com> wrote: > >> A little clarification of the IO requirement and my understanding of the >> current state of IO. >> >> tl;dr: not sure if there're reusable bits for the reader. It's possible >> to reuse some for the writer but with heavy refactoring. >> >> *Reader* >> >> - For each bucket (containing the same key partition, sorted) across >> multiple input data sets, we stream records from bucket files and merge >> sort. >> - We open the files in a DoFn, and emit KV<K, CoGbkResult> where the >> CGBKR encapsulates Iterable<V> from each input. >> - Basically we need a simple API like ResourceId -> Iterator<T>, i.e. >> sequential read, no block/offset/split requirement. >> - FileBasedSource.FileBasedReader seems the closest fit but they're >> nested & decoupled. >> - There's no FileIO.Read, only a ReadMatches[1], which can be used >> with ReadAllViaFileBasedSource<T>. But that's not the granularity we need, >> since we lose ordering of the input records, and can't merge 2+ sources. >> >> *Writer* >> >> - We get a `PCollection<BucketShardId, Iterable<T>>` after bucket and >> and sort, where Iterable<T> is the records sorted by key and BucketShardId >> is used to produce filename, e.g. bucket-00001-shard-00002.avro. >> - We write each Iterable<T> to a temp file and move to final >> destination when done. Both should ideally reuse existing code. >> - Looks like FileIO.Sink (and impls in AvroIO, TextIO, TFRecordIO) >> supports record writing into a WritableByteChannel, but some logic like >> compression is handled in FileIO through ViaFileBasedSink which extends >> FileBasedSink. >> - FileIO uses WriteFiles[3] to shard and write of PCollection<T>. >> Again we lose ordering of the output records or custom file naming scheme. >> However, WriteShardsIntoTempFilesFn[4] and FinalizeTempFileBundles[5] in >> WriteFiles seem closest to our need but would have to be split out and >> generalized. >> >> *Note on reader block/offset/split requirement* >> >> - Because of the merge sort, we can't split or offset seek a bucket >> file. Because without persisting the offset index of a key group >> somewhere, >> we can't efficiently skip to a key group without exhausting the previous >> ones. Furthermore we need to merge sort and align keys from multiple >> sources, which may not have the same key distribution. It might be >> possible >> to binary search for matching keys but that's extra complication. IMO the >> reader work distribution is better solved by better bucket/shard strategy >> in upstream writer. >> >> *References* >> >> 1. ReadMatches extends PTransform<PCollection<MatchResult.Metadata>, >> PCollection<ReadableFile>> >> 2. ReadAllViaFileBasedSource<T> extends >> PTransform<PCollection<ReadableFile>, PCollection<T>> >> 3. WriteFiles<UserT, DestinationT, OutputT> extends >> PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>> >> 4. WriteShardsIntoTempFilesFn extends DoFn<KV<ShardedKey<Integer>, >> Iterable<UserT>>, FileResult<DestinationT>> >> 5. FinalizeTempFileBundles extends PTransform< >> PCollection<List<FileResult<DestinationT>>>, >> WriteFilesResult<DestinationT>> >> >> >> On Tue, Jul 16, 2019 at 5:15 AM Robert Bradshaw <rober...@google.com> >> wrote: >> >>> On Mon, Jul 15, 2019 at 7:03 PM Eugene Kirpichov <kirpic...@google.com> >>> wrote: >>> > >>> > Quick note: I didn't look through the document, but please do not >>> build on either FileBasedSink or FileBasedReader. They are both remnants of >>> the old, non-composable IO world; and in fact much of the composable IO >>> work emerged from frustration with their limitations and recognizing that >>> many other IOs were suffering from the same limitations. >>> > Instead of FileBasedSink, build on FileIO.write; instead of >>> FileBasedReader, build on FileIO.read. >>> >>> +1 >>> >>> I think the sink could be written atop FileIO.write, possibly using >>> dynamic destinations. At the very least the FileSink interface, which >>> handles the details of writing a single shard, would be an ideal way >>> to parameterize an SMB sink. It seems that none of our existing IOs >>> (publically?) expose FileSink implementations. >>> >>> FileIO.read is not flexible enough to do the merging. Eugene, is there >>> a composable analogue to FileSink, for sources, i.e. something that >>> can turn a file handle (possibly with offsets) into a set of records >>> other than FileBasedReader? >>> >>> > On Mon, Jul 15, 2019 at 9:01 AM Gleb Kanterov <g...@spotify.com> >>> wrote: >>> >> >>> >> I share the same concern with Robert regarding re-implementing parts >>> of IO. At the same time, in the past, I worked on internal libraries that >>> try to re-use code from existing IO, and it's hardly possible because it >>> feels like it wasn't designed for re-use. There are a lot of classes that >>> are nested (non-static) or non-public. I can understand why they were made >>> non-public, it's a hard abstraction to design well and keep compatibility. >>> As Neville mentioned, decoupling readers and writers would not only benefit >>> for this proposal but for any other use-case that has to deal with >>> low-level API such as FileSystem API, that is hardly possible today without >>> copy-pasting, >>> >> >>> >> >>> >> >>> >> >>> >> >>> >> On Mon, Jul 15, 2019 at 5:05 PM Neville Li <neville....@gmail.com> >>> wrote: >>> >>> >>> >>> Re: avoiding mirroring IO functionality, what about: >>> >>> >>> >>> - Decouple the nested FileBasedSink.Writer and >>> FileBasedSource.FileBasedReader, make them top level and remove references >>> to parent classes. >>> >>> - Simplify the interfaces, while maintaining support for >>> block/offset read & sequential write. >>> >>> - As a bonus, the refactored IO classes can be used standalone in >>> case when the user wants to perform custom IO in a DoFn, i.e. a >>> PTransform<PCollection<URI>, PCollection<KV<URI, GenericRecord>>>. Today >>> this requires a lot of copy-pasted Avro boilerplate. >>> >>> - For compatibility, we can delegate to the new classes from the old >>> ones and remove them in the next breaking release. >>> >>> >>> >>> Re: WriteFiles logic, I'm not sure about generalizing it, but what >>> about splitting the part handling writing temp files into a new >>> PTransform<PCollection<KV<ResourceId, Iterable<UserT>>>, >>> PCollection<WriteFilesResult<DestinationT>>>? That splits the bucket-shard >>> logic from actual file IO. >>> >>> >>> >>> On Mon, Jul 15, 2019 at 10:27 AM Robert Bradshaw < >>> rober...@google.com> wrote: >>> >>>> >>> >>>> I agree that generalizing the existing FileIO may not be the right >>> >>>> path forward, and I'd only make their innards public with great >>> care. >>> >>>> (Would this be used like like >>> >>>> SmbSink(MyFileIO.sink(parameters).getWriter[Factory]())?) SMB is a >>> bit >>> >>>> unique that the source and sink are much more coupled than other >>> >>>> sources and sinks (which happen to be completely independent, if >>> >>>> complementary implementations, whereas SMB attempts to be a kind of >>> >>>> pipe where one half is instanciated in each pipeline). >>> >>>> >>> >>>> In short, an SMB source/sink that is parameterized by an arbitrary, >>> >>>> existing IO would be ideal (but possibly not feasible (per existing >>> >>>> prioritizations)), or an SMB source/sink that works as a pair. What >>> >>>> I'd like to avoid is a set of parallel SMB IO classes that >>> (partially, >>> >>>> and incompletely) mirror the existing IO ones (from an API >>> >>>> perspective--how much implementation it makes sense to share is an >>> >>>> orthogonal issue that I'm sure can be worked out.) >>> >>>> >>> >>>> On Mon, Jul 15, 2019 at 4:18 PM Neville Li <neville....@gmail.com> >>> wrote: >>> >>>> > >>> >>>> > Hi Robert, >>> >>>> > >>> >>>> > I agree, it'd be nice to reuse FileIO logic of different file >>> types. But given the current code structure of FileIO & scope of the >>> change, I feel it's better left for future refactor PRs. >>> >>>> > >>> >>>> > Some thoughts: >>> >>>> > - SMB file operation is simple single file sequential >>> reads/writes, which already exists as Writer & FileBasedReader but are >>> private inner classes, and have references to the parent Sink/Source >>> instance. >>> >>>> > - The readers also have extra offset/split logic but that can be >>> worked around. >>> >>>> > - It'll be nice to not duplicate temp->destination file logic but >>> again WriteFiles is assuming a single integer shard key, so it'll take some >>> refactoring to reuse it. >>> >>>> > >>> >>>> > All of these can be done in backwards compatible way. OTOH >>> generalizing the existing components too much (esp. WriteFiles, which is >>> already complex) might lead to two logic paths, one specialized for the SMB >>> case. It might be easier to decouple some of them for better reuse. But >>> again I feel it's a separate discussion. >>> >>>> > >>> >>>> > On Mon, Jul 15, 2019 at 9:45 AM Claire McGinty < >>> claire.d.mcgi...@gmail.com> wrote: >>> >>>> >> >>> >>>> >> Thanks Robert! >>> >>>> >> >>> >>>> >> We'd definitely like to be able to re-use existing I/O >>> components--for example the Writer<DestinationT, >>> OutputT>/FileBasedReader<T> (since they operate on a >>> WritableByteChannel/ReadableByteChannel, which is the level of granularity >>> we need) but the Writers, at least, seem to be mostly private-access. Do >>> you foresee them being made public at any point? >>> >>>> >> >>> >>>> >> - Claire >>> >>>> >> >>> >>>> >> On Mon, Jul 15, 2019 at 9:31 AM Robert Bradshaw < >>> rober...@google.com> wrote: >>> >>>> >>> >>> >>>> >>> I left some comments on the doc. >>> >>>> >>> >>> >>>> >>> I think the general idea is sound, but one thing that worries >>> me is >>> >>>> >>> the introduction of a parallel set of IOs that mirrors the >>> (existing) >>> >>>> >>> FileIOs. I would suggest either (1) incorporate this >>> functionality >>> >>>> >>> into the generic FileIO infrastructure, or let it be >>> parameterized by >>> >>>> >>> arbitrary IO (which I'm not sure is possible, especially for >>> the Read >>> >>>> >>> side (and better would be the capability of supporting arbitrary >>> >>>> >>> sources, aka an optional "as-sharded-source" operation that >>> returns a >>> >>>> >>> PTransform<..., KV<shard-id, Iterable<KV<K, V>>> where the >>> iterable is >>> >>>> >>> promised to be in key order)) or support a single SMB aka >>> >>>> >>> "PreGrouping" source/sink pair that's aways used together (and >>> whose >>> >>>> >>> underlying format is not necessarily public). >>> >>>> >>> >>> >>>> >>> On Sat, Jul 13, 2019 at 3:19 PM Neville Li < >>> neville....@gmail.com> wrote: >>> >>>> >>> > >>> >>>> >>> > 4 people have commented but mostly clarifying details and not >>> much on the overall design. >>> >>>> >>> > >>> >>>> >>> > It'd be great to have thumbs up/down on the design, >>> specifically metadata, bucket & shard strategy, etc., since that affects >>> backwards compatibility of output files. >>> >>>> >>> > Some breaking changes, e.g. dynamic # of shards, are out of >>> scope for V1 unless someone feels strongly about it. The current scope >>> should cover all our use cases and leave room for optimization. >>> >>>> >>> > >>> >>>> >>> > Once green lighted we can start adopting internally, ironing >>> out rough edges while iterating on the PRs in parallel. >>> >>>> >>> > >>> >>>> >>> > Most of the implementation is self-contained in the >>> extensions:smb module, except making a few core classes/methods public for >>> reuse. So despite the amount of work it's still fairly low risk to the code >>> base. There're some proposed optimization & refactoring involving core (see >>> appendix) but IMO they're better left for followup PRs. >>> >>>> >>> > >>> >>>> >>> > On Fri, Jul 12, 2019 at 11:34 PM Kenneth Knowles < >>> k...@apache.org> wrote: >>> >>>> >>> >> >>> >>>> >>> >> I've seen some discussion on the doc. I cannot tell whether >>> the questions are resolved or what the status of review is. Would you mind >>> looping this thread with a quick summary? This is such a major piece of >>> work I don't want it to sit with everyone thinking they are waiting on >>> someone else, or any such thing. (not saying this is happening, just >>> pinging to be sure) >>> >>>> >>> >> >>> >>>> >>> >> Kenn >>> >>>> >>> >> >>> >>>> >>> >> On Mon, Jul 1, 2019 at 1:09 PM Neville Li < >>> neville....@gmail.com> wrote: >>> >>>> >>> >>> >>> >>>> >>> >>> Updated the doc a bit with more future work (appendix). IMO >>> most of them are non-breaking and better done in separate PRs later since >>> some involve pretty big refactoring and are outside the scope of MVP. >>> >>>> >>> >>> >>> >>>> >>> >>> For now we'd really like to get feedback on some >>> fundamental design decisions and find a way to move forward. >>> >>>> >>> >>> >>> >>>> >>> >>> On Thu, Jun 27, 2019 at 4:39 PM Neville Li < >>> neville....@gmail.com> wrote: >>> >>>> >>> >>>> >>> >>>> >>> >>>> Thanks. I responded to comments in the doc. More inline. >>> >>>> >>> >>>> >>> >>>> >>> >>>> On Thu, Jun 27, 2019 at 2:44 PM Chamikara Jayalath < >>> chamik...@google.com> wrote: >>> >>>> >>> >>>>> >>> >>>> >>> >>>>> Thanks added few comments. >>> >>>> >>> >>>>> >>> >>>> >>> >>>>> If I understood correctly, you basically assign elements >>> with keys to different buckets which are written to unique files and merge >>> files for the same key while reading ? >>> >>>> >>> >>>>> >>> >>>> >>> >>>>> Some of my concerns are. >>> >>>> >>> >>>>> >>> >>>> >>> >>>>> (1) Seems like you rely on an in-memory sorting of >>> buckets. Will this end up limiting the size of a PCollection you can >>> process ? >>> >>>> >>> >>>> >>> >>>> >>> >>>> The sorter transform we're using supports spilling and >>> external sort. We can break up large key groups further by sharding, >>> similar to fan out in some GBK transforms. >>> >>>> >>> >>>> >>> >>>> >>> >>>>> (2) Seems like you rely on Reshuffle.viaRandomKey() which >>> is actually implemented using a shuffle (which you try to replace with this >>> proposal). >>> >>>> >>> >>>> >>> >>>> >>> >>>> That's for distributing task metadata, so that each DoFn >>> thread picks up a random bucket and sort merge key-values. It's not >>> shuffling actual data. >>> >>>> >>> >>>> >>> >>>> >>> >>>>> >>> >>>> >>> >>>>> (3) I think (at least some of the) shuffle >>> implementations are implemented in ways similar to this (writing to files >>> and merging). So I'm wondering if the performance benefits you see are for >>> a very specific case and may limit the functionality in other ways. >>> >>>> >>> >>>> >>> >>>> >>> >>>> This is for the common pattern of few core data producer >>> pipelines and many downstream consumer pipelines. It's not intended to >>> replace shuffle/join within a single pipeline. On the producer side, by >>> pre-grouping/sorting data and writing to bucket/shard output files, the >>> consumer can sort/merge matching ones without a CoGBK. Essentially we're >>> paying the shuffle cost upfront to avoid them repeatedly in each consumer >>> pipeline that wants to join data. >>> >>>> >>> >>>> >>> >>>> >>> >>>>> >>> >>>> >>> >>>>> Thanks, >>> >>>> >>> >>>>> Cham >>> >>>> >>> >>>>> >>> >>>> >>> >>>>> >>> >>>> >>> >>>>> On Thu, Jun 27, 2019 at 8:12 AM Neville Li < >>> neville....@gmail.com> wrote: >>> >>>> >>> >>>>>> >>> >>>> >>> >>>>>> Ping again. Any chance someone takes a look to get this >>> thing going? It's just a design doc and basic metadata/IO impl. We're not >>> talking about actual source/sink code yet (already done but saved for >>> future PRs). >>> >>>> >>> >>>>>> >>> >>>> >>> >>>>>> On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay < >>> al...@google.com> wrote: >>> >>>> >>> >>>>>>> >>> >>>> >>> >>>>>>> Thank you Claire, this looks promising. Explicitly >>> adding a few folks that might have feedback: +Ismaël Mejía +Robert Bradshaw >>> +Lukasz Cwik +Chamikara Jayalath >>> >>>> >>> >>>>>>> >>> >>>> >>> >>>>>>> On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty < >>> claire.d.mcgi...@gmail.com> wrote: >>> >>>> >>> >>>>>>>> >>> >>>> >>> >>>>>>>> Hey dev@! >>> >>>> >>> >>>>>>>> >>> >>>> >>> >>>>>>>> Myself and a few other Spotify data engineers have put >>> together a design doc for SMB Join support in Beam, and have a working Java >>> implementation we've started to put up for PR ([0], [1], [2]). There's more >>> detailed information in the document, but the tl;dr is that SMB is a >>> strategy to optimize joins for file-based sources by modifying the initial >>> write operation to write records in sorted buckets based on the desired >>> join key. This means that subsequent joins of datasets written in this way >>> are only sequential file reads, no shuffling involved. We've seen some >>> pretty substantial performance speedups with our implementation and would >>> love to get it checked in to Beam's Java SDK. >>> >>>> >>> >>>>>>>> >>> >>>> >>> >>>>>>>> We'd appreciate any suggestions or feedback on our >>> proposal--the design doc should be public to comment on. >>> >>>> >>> >>>>>>>> >>> >>>> >>> >>>>>>>> Thanks! >>> >>>> >>> >>>>>>>> Claire / Neville >>> >> >>> >> >>> >> >>> >> -- >>> >> Cheers, >>> >> Gleb >>> >>