I'd like to reiterate the request to not build anything on top of
FileBasedSource/Reader.
If the design requires having some interface for representing a function
from a filename to a stream of records, better introduce a new interface
for that.
If it requires interoperability with other IOs that read files, better
change them to use the new interface.

On Tue, Jul 16, 2019 at 9:08 AM Chamikara Jayalath <chamik...@google.com>
wrote:

> Thanks this clarifies a lot.
>
> For writer, I think it's great if you can utilize existing FileIO.Sink
> implementations even if you have to reimplement some of the logic (for
> example compression, temp file handling) that is already implemented in
> Beam FileIO/WriteFiles transforms in your SMB sink transform.
>
> For reader, you are right that there's no FileIO.Read. What we have are
> various implementations of FileBasedSource/FileBasedReader classes that are
> currently intentionally hidden since Beam IO transforms are expected to be
> the intended public interface for users. If you can expose and re-use these
> classes with slight modifications (keeping backwards compatibility) I'm OK
> with it. Otherwise you'll have to write your own reader implementations.
>
> In general, seems like SMB has very strong requirements related to
> sharding/hot-key management that are not easily achievable by implementing
> SMB source/sink as a composite transform that utilizes existing source/sink
> transforms. This forces you to implement this logic in your own DoFns and
> existing Beam primitives are not easily re-usable in this context.
>
> Thanks,
> Cham
>
> On Tue, Jul 16, 2019 at 8:26 AM Neville Li <neville....@gmail.com> wrote:
>
>> A little clarification of the IO requirement and my understanding of the
>> current state of IO.
>>
>> tl;dr: not sure if there're reusable bits for the reader. It's possible
>> to reuse some for the writer but with heavy refactoring.
>>
>> *Reader*
>>
>>    - For each bucket (containing the same key partition, sorted) across
>>    multiple input data sets, we stream records from bucket files and merge
>>    sort.
>>    - We open the files in a DoFn, and emit KV<K, CoGbkResult> where the
>>    CGBKR encapsulates Iterable<V> from each input.
>>    - Basically we need a simple API like ResourceId -> Iterator<T>, i.e.
>>    sequential read, no block/offset/split requirement.
>>    - FileBasedSource.FileBasedReader seems the closest fit but they're
>>    nested & decoupled.
>>    - There's no FileIO.Read, only a ReadMatches[1], which can be used
>>    with ReadAllViaFileBasedSource<T>. But that's not the granularity we need,
>>    since we lose ordering of the input records, and can't merge 2+ sources.
>>
>> *Writer*
>>
>>    - We get a `PCollection<BucketShardId, Iterable<T>>` after bucket and
>>    and sort, where Iterable<T> is the records sorted by key and BucketShardId
>>    is used to produce filename, e.g. bucket-00001-shard-00002.avro.
>>    - We write each Iterable<T> to a temp file and move to final
>>    destination when done. Both should ideally reuse existing code.
>>    - Looks like FileIO.Sink (and impls in AvroIO, TextIO, TFRecordIO)
>>    supports record writing into a WritableByteChannel, but some logic like
>>    compression is handled in FileIO through ViaFileBasedSink which extends
>>    FileBasedSink.
>>    - FileIO uses WriteFiles[3] to shard and write of PCollection<T>.
>>    Again we lose ordering of the output records or custom file naming scheme.
>>    However, WriteShardsIntoTempFilesFn[4] and FinalizeTempFileBundles[5] in
>>    WriteFiles seem closest to our need but would have to be split out and
>>    generalized.
>>
>> *Note on reader block/offset/split requirement*
>>
>>    - Because of the merge sort, we can't split or offset seek a bucket
>>    file. Because without persisting the offset index of a key group 
>> somewhere,
>>    we can't efficiently skip to a key group without exhausting the previous
>>    ones. Furthermore we need to merge sort and align keys from multiple
>>    sources, which may not have the same key distribution. It might be 
>> possible
>>    to binary search for matching keys but that's extra complication. IMO the
>>    reader work distribution is better solved by better bucket/shard strategy
>>    in upstream writer.
>>
>> *References*
>>
>>    1. ReadMatches extends PTransform<PCollection<MatchResult.Metadata>,
>>    PCollection<ReadableFile>>
>>    2. ReadAllViaFileBasedSource<T> extends
>>    PTransform<PCollection<ReadableFile>, PCollection<T>>
>>    3. WriteFiles<UserT, DestinationT, OutputT> extends
>>    PTransform<PCollection<UserT>, WriteFilesResult<DestinationT>>
>>    4. WriteShardsIntoTempFilesFn extends DoFn<KV<ShardedKey<Integer>,
>>    Iterable<UserT>>, FileResult<DestinationT>>
>>    5. FinalizeTempFileBundles extends PTransform<
>>    PCollection<List<FileResult<DestinationT>>>, 
>> WriteFilesResult<DestinationT>>
>>
>>
>> On Tue, Jul 16, 2019 at 5:15 AM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> On Mon, Jul 15, 2019 at 7:03 PM Eugene Kirpichov <kirpic...@google.com>
>>> wrote:
>>> >
>>> > Quick note: I didn't look through the document, but please do not
>>> build on either FileBasedSink or FileBasedReader. They are both remnants of
>>> the old, non-composable IO world; and in fact much of the composable IO
>>> work emerged from frustration with their limitations and recognizing that
>>> many other IOs were suffering from the same limitations.
>>> > Instead of FileBasedSink, build on FileIO.write; instead of
>>> FileBasedReader, build on FileIO.read.
>>>
>>> +1
>>>
>>> I think the sink could be written atop FileIO.write, possibly using
>>> dynamic destinations. At the very least the FileSink interface, which
>>> handles the details of writing a single shard, would be an ideal way
>>> to parameterize an SMB sink. It seems that none of our existing IOs
>>> (publically?) expose FileSink implementations.
>>>
>>> FileIO.read is not flexible enough to do the merging. Eugene, is there
>>> a composable analogue to FileSink, for sources, i.e. something that
>>> can turn a file handle (possibly with offsets) into a set of records
>>> other than FileBasedReader?
>>>
>>> > On Mon, Jul 15, 2019 at 9:01 AM Gleb Kanterov <g...@spotify.com>
>>> wrote:
>>> >>
>>> >> I share the same concern with Robert regarding re-implementing parts
>>> of IO. At the same time, in the past, I worked on internal libraries that
>>> try to re-use code from existing IO, and it's hardly possible because it
>>> feels like it wasn't designed for re-use. There are a lot of classes that
>>> are nested (non-static) or non-public. I can understand why they were made
>>> non-public, it's a hard abstraction to design well and keep compatibility.
>>> As Neville mentioned, decoupling readers and writers would not only benefit
>>> for this proposal but for any other use-case that has to deal with
>>> low-level API such as FileSystem API, that is hardly possible today without
>>> copy-pasting,
>>> >>
>>> >>
>>> >>
>>> >>
>>> >>
>>> >> On Mon, Jul 15, 2019 at 5:05 PM Neville Li <neville....@gmail.com>
>>> wrote:
>>> >>>
>>> >>> Re: avoiding mirroring IO functionality, what about:
>>> >>>
>>> >>> - Decouple the nested FileBasedSink.Writer and
>>> FileBasedSource.FileBasedReader, make them top level and remove references
>>> to parent classes.
>>> >>> - Simplify the interfaces, while maintaining support for
>>> block/offset read & sequential write.
>>> >>> - As a bonus, the refactored IO classes can be used standalone in
>>> case when the user wants to perform custom IO in a DoFn, i.e. a
>>> PTransform<PCollection<URI>, PCollection<KV<URI, GenericRecord>>>. Today
>>> this requires a lot of copy-pasted Avro boilerplate.
>>> >>> - For compatibility, we can delegate to the new classes from the old
>>> ones and remove them in the next breaking release.
>>> >>>
>>> >>> Re: WriteFiles logic, I'm not sure about generalizing it, but what
>>> about splitting the part handling writing temp files into a new
>>> PTransform<PCollection<KV<ResourceId, Iterable<UserT>>>,
>>> PCollection<WriteFilesResult<DestinationT>>>? That splits the bucket-shard
>>> logic from actual file IO.
>>> >>>
>>> >>> On Mon, Jul 15, 2019 at 10:27 AM Robert Bradshaw <
>>> rober...@google.com> wrote:
>>> >>>>
>>> >>>> I agree that generalizing the existing FileIO may not be the right
>>> >>>> path forward, and I'd only make their innards public with great
>>> care.
>>> >>>> (Would this be used like like
>>> >>>> SmbSink(MyFileIO.sink(parameters).getWriter[Factory]())?) SMB is a
>>> bit
>>> >>>> unique that the source and sink are much more coupled than other
>>> >>>> sources and sinks (which happen to be completely independent, if
>>> >>>> complementary implementations, whereas SMB attempts to be a kind of
>>> >>>> pipe where one half is instanciated in each pipeline).
>>> >>>>
>>> >>>> In short, an SMB source/sink that is parameterized by an arbitrary,
>>> >>>> existing IO would be ideal (but possibly not feasible (per existing
>>> >>>> prioritizations)), or an SMB source/sink that works as a pair. What
>>> >>>> I'd like to avoid is a set of parallel SMB IO classes that
>>> (partially,
>>> >>>> and incompletely) mirror the existing IO ones (from an API
>>> >>>> perspective--how much implementation it makes sense to share is an
>>> >>>> orthogonal issue that I'm sure can be worked out.)
>>> >>>>
>>> >>>> On Mon, Jul 15, 2019 at 4:18 PM Neville Li <neville....@gmail.com>
>>> wrote:
>>> >>>> >
>>> >>>> > Hi Robert,
>>> >>>> >
>>> >>>> > I agree, it'd be nice to reuse FileIO logic of different file
>>> types. But given the current code structure of FileIO & scope of the
>>> change, I feel it's better left for future refactor PRs.
>>> >>>> >
>>> >>>> > Some thoughts:
>>> >>>> > - SMB file operation is simple single file sequential
>>> reads/writes, which already exists as Writer & FileBasedReader but are
>>> private inner classes, and have references to the parent Sink/Source
>>> instance.
>>> >>>> > - The readers also have extra offset/split logic but that can be
>>> worked around.
>>> >>>> > - It'll be nice to not duplicate temp->destination file logic but
>>> again WriteFiles is assuming a single integer shard key, so it'll take some
>>> refactoring to reuse it.
>>> >>>> >
>>> >>>> > All of these can be done in backwards compatible way. OTOH
>>> generalizing the existing components too much (esp. WriteFiles, which is
>>> already complex) might lead to two logic paths, one specialized for the SMB
>>> case. It might be easier to decouple some of them for better reuse. But
>>> again I feel it's a separate discussion.
>>> >>>> >
>>> >>>> > On Mon, Jul 15, 2019 at 9:45 AM Claire McGinty <
>>> claire.d.mcgi...@gmail.com> wrote:
>>> >>>> >>
>>> >>>> >> Thanks Robert!
>>> >>>> >>
>>> >>>> >> We'd definitely like to be able to re-use existing I/O
>>> components--for example the Writer<DestinationT,
>>> OutputT>/FileBasedReader<T> (since they operate on a
>>> WritableByteChannel/ReadableByteChannel, which is the level of granularity
>>> we need) but the Writers, at least, seem to be mostly private-access. Do
>>> you foresee them being made public at any point?
>>> >>>> >>
>>> >>>> >> - Claire
>>> >>>> >>
>>> >>>> >> On Mon, Jul 15, 2019 at 9:31 AM Robert Bradshaw <
>>> rober...@google.com> wrote:
>>> >>>> >>>
>>> >>>> >>> I left some comments on the doc.
>>> >>>> >>>
>>> >>>> >>> I think the general idea is sound, but one thing that worries
>>> me is
>>> >>>> >>> the introduction of a parallel set of IOs that mirrors the
>>> (existing)
>>> >>>> >>> FileIOs. I would suggest either (1) incorporate this
>>> functionality
>>> >>>> >>> into the generic FileIO infrastructure, or let it be
>>> parameterized by
>>> >>>> >>> arbitrary IO (which I'm not sure is possible, especially for
>>> the Read
>>> >>>> >>> side (and better would be the capability of supporting arbitrary
>>> >>>> >>> sources, aka an optional "as-sharded-source" operation that
>>> returns a
>>> >>>> >>> PTransform<..., KV<shard-id, Iterable<KV<K, V>>> where the
>>> iterable is
>>> >>>> >>> promised to be in key order)) or support a single SMB aka
>>> >>>> >>> "PreGrouping" source/sink pair that's aways used together (and
>>> whose
>>> >>>> >>> underlying format is not necessarily public).
>>> >>>> >>>
>>> >>>> >>> On Sat, Jul 13, 2019 at 3:19 PM Neville Li <
>>> neville....@gmail.com> wrote:
>>> >>>> >>> >
>>> >>>> >>> > 4 people have commented but mostly clarifying details and not
>>> much on the overall design.
>>> >>>> >>> >
>>> >>>> >>> > It'd be great to have thumbs up/down on the design,
>>> specifically metadata, bucket & shard strategy, etc., since that affects
>>> backwards compatibility of output files.
>>> >>>> >>> > Some breaking changes, e.g. dynamic # of shards, are out of
>>> scope for V1 unless someone feels strongly about it. The current scope
>>> should cover all our use cases and leave room for optimization.
>>> >>>> >>> >
>>> >>>> >>> > Once green lighted we can start adopting internally, ironing
>>> out rough edges while iterating on the PRs in parallel.
>>> >>>> >>> >
>>> >>>> >>> > Most of the implementation is self-contained in the
>>> extensions:smb module, except making a few core classes/methods public for
>>> reuse. So despite the amount of work it's still fairly low risk to the code
>>> base. There're some proposed optimization & refactoring involving core (see
>>> appendix) but IMO they're better left for followup PRs.
>>> >>>> >>> >
>>> >>>> >>> > On Fri, Jul 12, 2019 at 11:34 PM Kenneth Knowles <
>>> k...@apache.org> wrote:
>>> >>>> >>> >>
>>> >>>> >>> >> I've seen some discussion on the doc. I cannot tell whether
>>> the questions are resolved or what the status of review is. Would you mind
>>> looping this thread with a quick summary? This is such a major piece of
>>> work I don't want it to sit with everyone thinking they are waiting on
>>> someone else, or any such thing. (not saying this is happening, just
>>> pinging to be sure)
>>> >>>> >>> >>
>>> >>>> >>> >> Kenn
>>> >>>> >>> >>
>>> >>>> >>> >> On Mon, Jul 1, 2019 at 1:09 PM Neville Li <
>>> neville....@gmail.com> wrote:
>>> >>>> >>> >>>
>>> >>>> >>> >>> Updated the doc a bit with more future work (appendix). IMO
>>> most of them are non-breaking and better done in separate PRs later since
>>> some involve pretty big refactoring and are outside the scope of MVP.
>>> >>>> >>> >>>
>>> >>>> >>> >>> For now we'd really like to get feedback on some
>>> fundamental design decisions and find a way to move forward.
>>> >>>> >>> >>>
>>> >>>> >>> >>> On Thu, Jun 27, 2019 at 4:39 PM Neville Li <
>>> neville....@gmail.com> wrote:
>>> >>>> >>> >>>>
>>> >>>> >>> >>>> Thanks. I responded to comments in the doc. More inline.
>>> >>>> >>> >>>>
>>> >>>> >>> >>>> On Thu, Jun 27, 2019 at 2:44 PM Chamikara Jayalath <
>>> chamik...@google.com> wrote:
>>> >>>> >>> >>>>>
>>> >>>> >>> >>>>> Thanks added few comments.
>>> >>>> >>> >>>>>
>>> >>>> >>> >>>>> If I understood correctly, you basically assign elements
>>> with keys to different buckets which are written to unique files and merge
>>> files for the same key while reading ?
>>> >>>> >>> >>>>>
>>> >>>> >>> >>>>> Some of my concerns are.
>>> >>>> >>> >>>>>
>>> >>>> >>> >>>>> (1)  Seems like you rely on an in-memory sorting of
>>> buckets. Will this end up limiting the size of a PCollection you can
>>> process ?
>>> >>>> >>> >>>>
>>> >>>> >>> >>>> The sorter transform we're using supports spilling and
>>> external sort. We can break up large key groups further by sharding,
>>> similar to fan out in some GBK transforms.
>>> >>>> >>> >>>>
>>> >>>> >>> >>>>> (2) Seems like you rely on Reshuffle.viaRandomKey() which
>>> is actually implemented using a shuffle (which you try to replace with this
>>> proposal).
>>> >>>> >>> >>>>
>>> >>>> >>> >>>> That's for distributing task metadata, so that each DoFn
>>> thread picks up a random bucket and sort merge key-values. It's not
>>> shuffling actual data.
>>> >>>> >>> >>>>
>>> >>>> >>> >>>>>
>>> >>>> >>> >>>>> (3) I think (at least some of the) shuffle
>>> implementations are implemented in ways similar to this (writing to files
>>> and merging). So I'm wondering if the performance benefits you see are for
>>> a very specific case and may limit the functionality in other ways.
>>> >>>> >>> >>>>
>>> >>>> >>> >>>> This is for the common pattern of few core data producer
>>> pipelines and many downstream consumer pipelines. It's not intended to
>>> replace shuffle/join within a single pipeline. On the producer side, by
>>> pre-grouping/sorting data and writing to bucket/shard output files, the
>>> consumer can sort/merge matching ones without a CoGBK. Essentially we're
>>> paying the shuffle cost upfront to avoid them repeatedly in each consumer
>>> pipeline that wants to join data.
>>> >>>> >>> >>>>
>>> >>>> >>> >>>>>
>>> >>>> >>> >>>>> Thanks,
>>> >>>> >>> >>>>> Cham
>>> >>>> >>> >>>>>
>>> >>>> >>> >>>>>
>>> >>>> >>> >>>>> On Thu, Jun 27, 2019 at 8:12 AM Neville Li <
>>> neville....@gmail.com> wrote:
>>> >>>> >>> >>>>>>
>>> >>>> >>> >>>>>> Ping again. Any chance someone takes a look to get this
>>> thing going? It's just a design doc and basic metadata/IO impl. We're not
>>> talking about actual source/sink code yet (already done but saved for
>>> future PRs).
>>> >>>> >>> >>>>>>
>>> >>>> >>> >>>>>> On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay <
>>> al...@google.com> wrote:
>>> >>>> >>> >>>>>>>
>>> >>>> >>> >>>>>>> Thank you Claire, this looks promising. Explicitly
>>> adding a few folks that might have feedback: +Ismaël Mejía +Robert Bradshaw
>>> +Lukasz Cwik +Chamikara Jayalath
>>> >>>> >>> >>>>>>>
>>> >>>> >>> >>>>>>> On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty <
>>> claire.d.mcgi...@gmail.com> wrote:
>>> >>>> >>> >>>>>>>>
>>> >>>> >>> >>>>>>>> Hey dev@!
>>> >>>> >>> >>>>>>>>
>>> >>>> >>> >>>>>>>> Myself and a few other Spotify data engineers have put
>>> together a design doc for SMB Join support in Beam, and have a working Java
>>> implementation we've started to put up for PR ([0], [1], [2]). There's more
>>> detailed information in the document, but the tl;dr is that SMB is a
>>> strategy to optimize joins for file-based sources by modifying the initial
>>> write operation to write records in sorted buckets based on the desired
>>> join key. This means that subsequent joins of datasets written in this way
>>> are only sequential file reads, no shuffling involved. We've seen some
>>> pretty substantial performance speedups with our implementation and would
>>> love to get it checked in to Beam's Java SDK.
>>> >>>> >>> >>>>>>>>
>>> >>>> >>> >>>>>>>> We'd appreciate any suggestions or feedback on our
>>> proposal--the design doc should be public to comment on.
>>> >>>> >>> >>>>>>>>
>>> >>>> >>> >>>>>>>> Thanks!
>>> >>>> >>> >>>>>>>> Claire / Neville
>>> >>
>>> >>
>>> >>
>>> >> --
>>> >> Cheers,
>>> >> Gleb
>>>
>>

Reply via email to