It is not possible to implement SMB on top of the various top-level
SomeFileIO.{write,read} PTransforms. One need the internal details.

It seems we should re-use (and expose) the existing FileSinks as a
parameter to SMBSink (and also port the old-style sinks to use these).
We also need the complementary FileSource that SMBSource could be
parameterized by (and would also be useful in the readAll transforms
that take a list of files (e.g. from a match) and actually read them
(do these not exist already?). This particular use does not need
(dynamic or otherwise) range restrictions, but it's easy to say "read
from 0 to infinity" if it has them. It does require maintaining
ordering (within a shard).

Perhaps this could be sketched out more in a doc?

On Tue, Jul 16, 2019 at 11:34 PM Eugene Kirpichov <[email protected]> wrote:
>
> I'd like to reiterate the request to not build anything on top of 
> FileBasedSource/Reader.
> If the design requires having some interface for representing a function from 
> a filename to a stream of records, better introduce a new interface for that.
> If it requires interoperability with other IOs that read files, better change 
> them to use the new interface.
>
> On Tue, Jul 16, 2019 at 9:08 AM Chamikara Jayalath <[email protected]> 
> wrote:
>>
>> Thanks this clarifies a lot.
>>
>> For writer, I think it's great if you can utilize existing FileIO.Sink 
>> implementations even if you have to reimplement some of the logic (for 
>> example compression, temp file handling) that is already implemented in Beam 
>> FileIO/WriteFiles transforms in your SMB sink transform.
>>
>> For reader, you are right that there's no FileIO.Read. What we have are 
>> various implementations of FileBasedSource/FileBasedReader classes that are 
>> currently intentionally hidden since Beam IO transforms are expected to be 
>> the intended public interface for users. If you can expose and re-use these 
>> classes with slight modifications (keeping backwards compatibility) I'm OK 
>> with it. Otherwise you'll have to write your own reader implementations.
>>
>> In general, seems like SMB has very strong requirements related to 
>> sharding/hot-key management that are not easily achievable by implementing 
>> SMB source/sink as a composite transform that utilizes existing source/sink 
>> transforms. This forces you to implement this logic in your own DoFns and 
>> existing Beam primitives are not easily re-usable in this context.
>>
>> Thanks,
>> Cham
>>
>> On Tue, Jul 16, 2019 at 8:26 AM Neville Li <[email protected]> wrote:
>>>
>>> A little clarification of the IO requirement and my understanding of the 
>>> current state of IO.
>>>
>>> tl;dr: not sure if there're reusable bits for the reader. It's possible to 
>>> reuse some for the writer but with heavy refactoring.
>>>
>>> Reader
>>>
>>> For each bucket (containing the same key partition, sorted) across multiple 
>>> input data sets, we stream records from bucket files and merge sort.
>>> We open the files in a DoFn, and emit KV<K, CoGbkResult> where the CGBKR 
>>> encapsulates Iterable<V> from each input.
>>> Basically we need a simple API like ResourceId -> Iterator<T>, i.e. 
>>> sequential read, no block/offset/split requirement.
>>> FileBasedSource.FileBasedReader seems the closest fit but they're nested & 
>>> decoupled.
>>> There's no FileIO.Read, only a ReadMatches[1], which can be used with 
>>> ReadAllViaFileBasedSource<T>. But that's not the granularity we need, since 
>>> we lose ordering of the input records, and can't merge 2+ sources.
>>>
>>> Writer
>>>
>>> We get a `PCollection<BucketShardId, Iterable<T>>` after bucket and and 
>>> sort, where Iterable<T> is the records sorted by key and BucketShardId is 
>>> used to produce filename, e.g. bucket-00001-shard-00002.avro.
>>> We write each Iterable<T> to a temp file and move to final destination when 
>>> done. Both should ideally reuse existing code.
>>> Looks like FileIO.Sink (and impls in AvroIO, TextIO, TFRecordIO) supports 
>>> record writing into a WritableByteChannel, but some logic like compression 
>>> is handled in FileIO through ViaFileBasedSink which extends FileBasedSink.
>>> FileIO uses WriteFiles[3] to shard and write of PCollection<T>. Again we 
>>> lose ordering of the output records or custom file naming scheme. However, 
>>> WriteShardsIntoTempFilesFn[4] and FinalizeTempFileBundles[5] in WriteFiles 
>>> seem closest to our need but would have to be split out and generalized.
>>>
>>> Note on reader block/offset/split requirement
>>>
>>> Because of the merge sort, we can't split or offset seek a bucket file. 
>>> Because without persisting the offset index of a key group somewhere, we 
>>> can't efficiently skip to a key group without exhausting the previous ones. 
>>> Furthermore we need to merge sort and align keys from multiple sources, 
>>> which may not have the same key distribution. It might be possible to 
>>> binary search for matching keys but that's extra complication. IMO the 
>>> reader work distribution is better solved by better bucket/shard strategy 
>>> in upstream writer.
>>>
>>> References
>>>
>>> ReadMatches extends PTransform<PCollection<MatchResult.Metadata>, 
>>> PCollection<ReadableFile>>
>>> ReadAllViaFileBasedSource<T> extends PTransform<PCollection<ReadableFile>, 
>>> PCollection<T>>
>>> WriteFiles<UserT, DestinationT, OutputT> extends 
>>> PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>>
>>> WriteShardsIntoTempFilesFn extends DoFn<KV<ShardedKey<Integer>, 
>>> Iterable<UserT>>, FileResult<DestinationT>>
>>> FinalizeTempFileBundles extends PTransform< 
>>> PCollection<List<FileResult<DestinationT>>>, WriteFilesResult<DestinationT>>
>>>
>>>
>>> On Tue, Jul 16, 2019 at 5:15 AM Robert Bradshaw <[email protected]> wrote:
>>>>
>>>> On Mon, Jul 15, 2019 at 7:03 PM Eugene Kirpichov <[email protected]> 
>>>> wrote:
>>>> >
>>>> > Quick note: I didn't look through the document, but please do not build 
>>>> > on either FileBasedSink or FileBasedReader. They are both remnants of 
>>>> > the old, non-composable IO world; and in fact much of the composable IO 
>>>> > work emerged from frustration with their limitations and recognizing 
>>>> > that many other IOs were suffering from the same limitations.
>>>> > Instead of FileBasedSink, build on FileIO.write; instead of 
>>>> > FileBasedReader, build on FileIO.read.
>>>>
>>>> +1
>>>>
>>>> I think the sink could be written atop FileIO.write, possibly using
>>>> dynamic destinations. At the very least the FileSink interface, which
>>>> handles the details of writing a single shard, would be an ideal way
>>>> to parameterize an SMB sink. It seems that none of our existing IOs
>>>> (publically?) expose FileSink implementations.
>>>>
>>>> FileIO.read is not flexible enough to do the merging. Eugene, is there
>>>> a composable analogue to FileSink, for sources, i.e. something that
>>>> can turn a file handle (possibly with offsets) into a set of records
>>>> other than FileBasedReader?
>>>>
>>>> > On Mon, Jul 15, 2019 at 9:01 AM Gleb Kanterov <[email protected]> wrote:
>>>> >>
>>>> >> I share the same concern with Robert regarding re-implementing parts of 
>>>> >> IO. At the same time, in the past, I worked on internal libraries that 
>>>> >> try to re-use code from existing IO, and it's hardly possible because 
>>>> >> it feels like it wasn't designed for re-use. There are a lot of classes 
>>>> >> that are nested (non-static) or non-public. I can understand why they 
>>>> >> were made non-public, it's a hard abstraction to design well and keep 
>>>> >> compatibility. As Neville mentioned, decoupling readers and writers 
>>>> >> would not only benefit for this proposal but for any other use-case 
>>>> >> that has to deal with low-level API such as FileSystem API, that is 
>>>> >> hardly possible today without copy-pasting,
>>>> >>
>>>> >>
>>>> >>
>>>> >>
>>>> >>
>>>> >> On Mon, Jul 15, 2019 at 5:05 PM Neville Li <[email protected]> 
>>>> >> wrote:
>>>> >>>
>>>> >>> Re: avoiding mirroring IO functionality, what about:
>>>> >>>
>>>> >>> - Decouple the nested FileBasedSink.Writer and 
>>>> >>> FileBasedSource.FileBasedReader, make them top level and remove 
>>>> >>> references to parent classes.
>>>> >>> - Simplify the interfaces, while maintaining support for block/offset 
>>>> >>> read & sequential write.
>>>> >>> - As a bonus, the refactored IO classes can be used standalone in case 
>>>> >>> when the user wants to perform custom IO in a DoFn, i.e. a 
>>>> >>> PTransform<PCollection<URI>, PCollection<KV<URI, GenericRecord>>>. 
>>>> >>> Today this requires a lot of copy-pasted Avro boilerplate.
>>>> >>> - For compatibility, we can delegate to the new classes from the old 
>>>> >>> ones and remove them in the next breaking release.
>>>> >>>
>>>> >>> Re: WriteFiles logic, I'm not sure about generalizing it, but what 
>>>> >>> about splitting the part handling writing temp files into a new 
>>>> >>> PTransform<PCollection<KV<ResourceId, Iterable<UserT>>>, 
>>>> >>> PCollection<WriteFilesResult<DestinationT>>>? That splits the 
>>>> >>> bucket-shard logic from actual file IO.
>>>> >>>
>>>> >>> On Mon, Jul 15, 2019 at 10:27 AM Robert Bradshaw <[email protected]> 
>>>> >>> wrote:
>>>> >>>>
>>>> >>>> I agree that generalizing the existing FileIO may not be the right
>>>> >>>> path forward, and I'd only make their innards public with great care.
>>>> >>>> (Would this be used like like
>>>> >>>> SmbSink(MyFileIO.sink(parameters).getWriter[Factory]())?) SMB is a bit
>>>> >>>> unique that the source and sink are much more coupled than other
>>>> >>>> sources and sinks (which happen to be completely independent, if
>>>> >>>> complementary implementations, whereas SMB attempts to be a kind of
>>>> >>>> pipe where one half is instanciated in each pipeline).
>>>> >>>>
>>>> >>>> In short, an SMB source/sink that is parameterized by an arbitrary,
>>>> >>>> existing IO would be ideal (but possibly not feasible (per existing
>>>> >>>> prioritizations)), or an SMB source/sink that works as a pair. What
>>>> >>>> I'd like to avoid is a set of parallel SMB IO classes that (partially,
>>>> >>>> and incompletely) mirror the existing IO ones (from an API
>>>> >>>> perspective--how much implementation it makes sense to share is an
>>>> >>>> orthogonal issue that I'm sure can be worked out.)
>>>> >>>>
>>>> >>>> On Mon, Jul 15, 2019 at 4:18 PM Neville Li <[email protected]> 
>>>> >>>> wrote:
>>>> >>>> >
>>>> >>>> > Hi Robert,
>>>> >>>> >
>>>> >>>> > I agree, it'd be nice to reuse FileIO logic of different file 
>>>> >>>> > types. But given the current code structure of FileIO & scope of 
>>>> >>>> > the change, I feel it's better left for future refactor PRs.
>>>> >>>> >
>>>> >>>> > Some thoughts:
>>>> >>>> > - SMB file operation is simple single file sequential reads/writes, 
>>>> >>>> > which already exists as Writer & FileBasedReader but are private 
>>>> >>>> > inner classes, and have references to the parent Sink/Source 
>>>> >>>> > instance.
>>>> >>>> > - The readers also have extra offset/split logic but that can be 
>>>> >>>> > worked around.
>>>> >>>> > - It'll be nice to not duplicate temp->destination file logic but 
>>>> >>>> > again WriteFiles is assuming a single integer shard key, so it'll 
>>>> >>>> > take some refactoring to reuse it.
>>>> >>>> >
>>>> >>>> > All of these can be done in backwards compatible way. OTOH 
>>>> >>>> > generalizing the existing components too much (esp. WriteFiles, 
>>>> >>>> > which is already complex) might lead to two logic paths, one 
>>>> >>>> > specialized for the SMB case. It might be easier to decouple some 
>>>> >>>> > of them for better reuse. But again I feel it's a separate 
>>>> >>>> > discussion.
>>>> >>>> >
>>>> >>>> > On Mon, Jul 15, 2019 at 9:45 AM Claire McGinty 
>>>> >>>> > <[email protected]> wrote:
>>>> >>>> >>
>>>> >>>> >> Thanks Robert!
>>>> >>>> >>
>>>> >>>> >> We'd definitely like to be able to re-use existing I/O 
>>>> >>>> >> components--for example the Writer<DestinationT, 
>>>> >>>> >> OutputT>/FileBasedReader<T> (since they operate on a 
>>>> >>>> >> WritableByteChannel/ReadableByteChannel, which is the level of 
>>>> >>>> >> granularity we need) but the Writers, at least, seem to be mostly 
>>>> >>>> >> private-access. Do you foresee them being made public at any point?
>>>> >>>> >>
>>>> >>>> >> - Claire
>>>> >>>> >>
>>>> >>>> >> On Mon, Jul 15, 2019 at 9:31 AM Robert Bradshaw 
>>>> >>>> >> <[email protected]> wrote:
>>>> >>>> >>>
>>>> >>>> >>> I left some comments on the doc.
>>>> >>>> >>>
>>>> >>>> >>> I think the general idea is sound, but one thing that worries me 
>>>> >>>> >>> is
>>>> >>>> >>> the introduction of a parallel set of IOs that mirrors the 
>>>> >>>> >>> (existing)
>>>> >>>> >>> FileIOs. I would suggest either (1) incorporate this functionality
>>>> >>>> >>> into the generic FileIO infrastructure, or let it be 
>>>> >>>> >>> parameterized by
>>>> >>>> >>> arbitrary IO (which I'm not sure is possible, especially for the 
>>>> >>>> >>> Read
>>>> >>>> >>> side (and better would be the capability of supporting arbitrary
>>>> >>>> >>> sources, aka an optional "as-sharded-source" operation that 
>>>> >>>> >>> returns a
>>>> >>>> >>> PTransform<..., KV<shard-id, Iterable<KV<K, V>>> where the 
>>>> >>>> >>> iterable is
>>>> >>>> >>> promised to be in key order)) or support a single SMB aka
>>>> >>>> >>> "PreGrouping" source/sink pair that's aways used together (and 
>>>> >>>> >>> whose
>>>> >>>> >>> underlying format is not necessarily public).
>>>> >>>> >>>
>>>> >>>> >>> On Sat, Jul 13, 2019 at 3:19 PM Neville Li 
>>>> >>>> >>> <[email protected]> wrote:
>>>> >>>> >>> >
>>>> >>>> >>> > 4 people have commented but mostly clarifying details and not 
>>>> >>>> >>> > much on the overall design.
>>>> >>>> >>> >
>>>> >>>> >>> > It'd be great to have thumbs up/down on the design, 
>>>> >>>> >>> > specifically metadata, bucket & shard strategy, etc., since 
>>>> >>>> >>> > that affects backwards compatibility of output files.
>>>> >>>> >>> > Some breaking changes, e.g. dynamic # of shards, are out of 
>>>> >>>> >>> > scope for V1 unless someone feels strongly about it. The 
>>>> >>>> >>> > current scope should cover all our use cases and leave room for 
>>>> >>>> >>> > optimization.
>>>> >>>> >>> >
>>>> >>>> >>> > Once green lighted we can start adopting internally, ironing 
>>>> >>>> >>> > out rough edges while iterating on the PRs in parallel.
>>>> >>>> >>> >
>>>> >>>> >>> > Most of the implementation is self-contained in the 
>>>> >>>> >>> > extensions:smb module, except making a few core classes/methods 
>>>> >>>> >>> > public for reuse. So despite the amount of work it's still 
>>>> >>>> >>> > fairly low risk to the code base. There're some proposed 
>>>> >>>> >>> > optimization & refactoring involving core (see appendix) but 
>>>> >>>> >>> > IMO they're better left for followup PRs.
>>>> >>>> >>> >
>>>> >>>> >>> > On Fri, Jul 12, 2019 at 11:34 PM Kenneth Knowles 
>>>> >>>> >>> > <[email protected]> wrote:
>>>> >>>> >>> >>
>>>> >>>> >>> >> I've seen some discussion on the doc. I cannot tell whether 
>>>> >>>> >>> >> the questions are resolved or what the status of review is. 
>>>> >>>> >>> >> Would you mind looping this thread with a quick summary? This 
>>>> >>>> >>> >> is such a major piece of work I don't want it to sit with 
>>>> >>>> >>> >> everyone thinking they are waiting on someone else, or any 
>>>> >>>> >>> >> such thing. (not saying this is happening, just pinging to be 
>>>> >>>> >>> >> sure)
>>>> >>>> >>> >>
>>>> >>>> >>> >> Kenn
>>>> >>>> >>> >>
>>>> >>>> >>> >> On Mon, Jul 1, 2019 at 1:09 PM Neville Li 
>>>> >>>> >>> >> <[email protected]> wrote:
>>>> >>>> >>> >>>
>>>> >>>> >>> >>> Updated the doc a bit with more future work (appendix). IMO 
>>>> >>>> >>> >>> most of them are non-breaking and better done in separate PRs 
>>>> >>>> >>> >>> later since some involve pretty big refactoring and are 
>>>> >>>> >>> >>> outside the scope of MVP.
>>>> >>>> >>> >>>
>>>> >>>> >>> >>> For now we'd really like to get feedback on some fundamental 
>>>> >>>> >>> >>> design decisions and find a way to move forward.
>>>> >>>> >>> >>>
>>>> >>>> >>> >>> On Thu, Jun 27, 2019 at 4:39 PM Neville Li 
>>>> >>>> >>> >>> <[email protected]> wrote:
>>>> >>>> >>> >>>>
>>>> >>>> >>> >>>> Thanks. I responded to comments in the doc. More inline.
>>>> >>>> >>> >>>>
>>>> >>>> >>> >>>> On Thu, Jun 27, 2019 at 2:44 PM Chamikara Jayalath 
>>>> >>>> >>> >>>> <[email protected]> wrote:
>>>> >>>> >>> >>>>>
>>>> >>>> >>> >>>>> Thanks added few comments.
>>>> >>>> >>> >>>>>
>>>> >>>> >>> >>>>> If I understood correctly, you basically assign elements 
>>>> >>>> >>> >>>>> with keys to different buckets which are written to unique 
>>>> >>>> >>> >>>>> files and merge files for the same key while reading ?
>>>> >>>> >>> >>>>>
>>>> >>>> >>> >>>>> Some of my concerns are.
>>>> >>>> >>> >>>>>
>>>> >>>> >>> >>>>> (1)  Seems like you rely on an in-memory sorting of 
>>>> >>>> >>> >>>>> buckets. Will this end up limiting the size of a 
>>>> >>>> >>> >>>>> PCollection you can process ?
>>>> >>>> >>> >>>>
>>>> >>>> >>> >>>> The sorter transform we're using supports spilling and 
>>>> >>>> >>> >>>> external sort. We can break up large key groups further by 
>>>> >>>> >>> >>>> sharding, similar to fan out in some GBK transforms.
>>>> >>>> >>> >>>>
>>>> >>>> >>> >>>>> (2) Seems like you rely on Reshuffle.viaRandomKey() which 
>>>> >>>> >>> >>>>> is actually implemented using a shuffle (which you try to 
>>>> >>>> >>> >>>>> replace with this proposal).
>>>> >>>> >>> >>>>
>>>> >>>> >>> >>>> That's for distributing task metadata, so that each DoFn 
>>>> >>>> >>> >>>> thread picks up a random bucket and sort merge key-values. 
>>>> >>>> >>> >>>> It's not shuffling actual data.
>>>> >>>> >>> >>>>
>>>> >>>> >>> >>>>>
>>>> >>>> >>> >>>>> (3) I think (at least some of the) shuffle implementations 
>>>> >>>> >>> >>>>> are implemented in ways similar to this (writing to files 
>>>> >>>> >>> >>>>> and merging). So I'm wondering if the performance benefits 
>>>> >>>> >>> >>>>> you see are for a very specific case and may limit the 
>>>> >>>> >>> >>>>> functionality in other ways.
>>>> >>>> >>> >>>>
>>>> >>>> >>> >>>> This is for the common pattern of few core data producer 
>>>> >>>> >>> >>>> pipelines and many downstream consumer pipelines. It's not 
>>>> >>>> >>> >>>> intended to replace shuffle/join within a single pipeline. 
>>>> >>>> >>> >>>> On the producer side, by pre-grouping/sorting data and 
>>>> >>>> >>> >>>> writing to bucket/shard output files, the consumer can 
>>>> >>>> >>> >>>> sort/merge matching ones without a CoGBK. Essentially we're 
>>>> >>>> >>> >>>> paying the shuffle cost upfront to avoid them repeatedly in 
>>>> >>>> >>> >>>> each consumer pipeline that wants to join data.
>>>> >>>> >>> >>>>
>>>> >>>> >>> >>>>>
>>>> >>>> >>> >>>>> Thanks,
>>>> >>>> >>> >>>>> Cham
>>>> >>>> >>> >>>>>
>>>> >>>> >>> >>>>>
>>>> >>>> >>> >>>>> On Thu, Jun 27, 2019 at 8:12 AM Neville Li 
>>>> >>>> >>> >>>>> <[email protected]> wrote:
>>>> >>>> >>> >>>>>>
>>>> >>>> >>> >>>>>> Ping again. Any chance someone takes a look to get this 
>>>> >>>> >>> >>>>>> thing going? It's just a design doc and basic metadata/IO 
>>>> >>>> >>> >>>>>> impl. We're not talking about actual source/sink code yet 
>>>> >>>> >>> >>>>>> (already done but saved for future PRs).
>>>> >>>> >>> >>>>>>
>>>> >>>> >>> >>>>>> On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay 
>>>> >>>> >>> >>>>>> <[email protected]> wrote:
>>>> >>>> >>> >>>>>>>
>>>> >>>> >>> >>>>>>> Thank you Claire, this looks promising. Explicitly adding 
>>>> >>>> >>> >>>>>>> a few folks that might have feedback: +Ismaël Mejía 
>>>> >>>> >>> >>>>>>> +Robert Bradshaw +Lukasz Cwik +Chamikara Jayalath
>>>> >>>> >>> >>>>>>>
>>>> >>>> >>> >>>>>>> On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty 
>>>> >>>> >>> >>>>>>> <[email protected]> wrote:
>>>> >>>> >>> >>>>>>>>
>>>> >>>> >>> >>>>>>>> Hey dev@!
>>>> >>>> >>> >>>>>>>>
>>>> >>>> >>> >>>>>>>> Myself and a few other Spotify data engineers have put 
>>>> >>>> >>> >>>>>>>> together a design doc for SMB Join support in Beam, and 
>>>> >>>> >>> >>>>>>>> have a working Java implementation we've started to put 
>>>> >>>> >>> >>>>>>>> up for PR ([0], [1], [2]). There's more detailed 
>>>> >>>> >>> >>>>>>>> information in the document, but the tl;dr is that SMB 
>>>> >>>> >>> >>>>>>>> is a strategy to optimize joins for file-based sources 
>>>> >>>> >>> >>>>>>>> by modifying the initial write operation to write 
>>>> >>>> >>> >>>>>>>> records in sorted buckets based on the desired join key. 
>>>> >>>> >>> >>>>>>>> This means that subsequent joins of datasets written in 
>>>> >>>> >>> >>>>>>>> this way are only sequential file reads, no shuffling 
>>>> >>>> >>> >>>>>>>> involved. We've seen some pretty substantial performance 
>>>> >>>> >>> >>>>>>>> speedups with our implementation and would love to get 
>>>> >>>> >>> >>>>>>>> it checked in to Beam's Java SDK.
>>>> >>>> >>> >>>>>>>>
>>>> >>>> >>> >>>>>>>> We'd appreciate any suggestions or feedback on our 
>>>> >>>> >>> >>>>>>>> proposal--the design doc should be public to comment on.
>>>> >>>> >>> >>>>>>>>
>>>> >>>> >>> >>>>>>>> Thanks!
>>>> >>>> >>> >>>>>>>> Claire / Neville
>>>> >>
>>>> >>
>>>> >>
>>>> >> --
>>>> >> Cheers,
>>>> >> Gleb

Reply via email to