I share the same concern with Robert regarding re-implementing parts of IO.
At the same time, in the past, I worked on internal libraries that try to
re-use code from existing IO, and it's hardly possible because it feels
like it wasn't designed for re-use. There are a lot of classes that are
nested (non-static) or non-public. I can understand why they were made
non-public, it's a hard abstraction to design well and keep compatibility.
As Neville mentioned, decoupling readers and writers would not only benefit
for this proposal but for any other use-case that has to deal with
low-level API such as FileSystem API, that is hardly possible today without
copy-pasting,





On Mon, Jul 15, 2019 at 5:05 PM Neville Li <[email protected]> wrote:

> Re: avoiding mirroring IO functionality, what about:
>
> - Decouple the nested FileBasedSink.Writer and
> FileBasedSource.FileBasedReader, make them top level and remove references
> to parent classes.
> - Simplify the interfaces, while maintaining support for block/offset read
> & sequential write.
> - As a bonus, the refactored IO classes can be used standalone in case
> when the user wants to perform custom IO in a DoFn, i.e. a
> PTransform<PCollection<URI>, PCollection<KV<URI, GenericRecord>>>. Today
> this requires a lot of copy-pasted Avro boilerplate.
> - For compatibility, we can delegate to the new classes from the old ones
> and remove them in the next breaking release.
>
> Re: WriteFiles logic, I'm not sure about generalizing it, but what about
> splitting the part handling writing temp files into a new
> PTransform<PCollection<KV<ResourceId, Iterable<UserT>>>,
> PCollection<WriteFilesResult<DestinationT>>>? That splits the bucket-shard
> logic from actual file IO.
>
> On Mon, Jul 15, 2019 at 10:27 AM Robert Bradshaw <[email protected]>
> wrote:
>
>> I agree that generalizing the existing FileIO may not be the right
>> path forward, and I'd only make their innards public with great care.
>> (Would this be used like like
>> SmbSink(MyFileIO.sink(parameters).getWriter[Factory]())?) SMB is a bit
>> unique that the source and sink are much more coupled than other
>> sources and sinks (which happen to be completely independent, if
>> complementary implementations, whereas SMB attempts to be a kind of
>> pipe where one half is instanciated in each pipeline).
>>
>> In short, an SMB source/sink that is parameterized by an arbitrary,
>> existing IO would be ideal (but possibly not feasible (per existing
>> prioritizations)), or an SMB source/sink that works as a pair. What
>> I'd like to avoid is a set of parallel SMB IO classes that (partially,
>> and incompletely) mirror the existing IO ones (from an API
>> perspective--how much implementation it makes sense to share is an
>> orthogonal issue that I'm sure can be worked out.)
>>
>> On Mon, Jul 15, 2019 at 4:18 PM Neville Li <[email protected]> wrote:
>> >
>> > Hi Robert,
>> >
>> > I agree, it'd be nice to reuse FileIO logic of different file types.
>> But given the current code structure of FileIO & scope of the change, I
>> feel it's better left for future refactor PRs.
>> >
>> > Some thoughts:
>> > - SMB file operation is simple single file sequential reads/writes,
>> which already exists as Writer & FileBasedReader but are private inner
>> classes, and have references to the parent Sink/Source instance.
>> > - The readers also have extra offset/split logic but that can be worked
>> around.
>> > - It'll be nice to not duplicate temp->destination file logic but again
>> WriteFiles is assuming a single integer shard key, so it'll take some
>> refactoring to reuse it.
>> >
>> > All of these can be done in backwards compatible way. OTOH generalizing
>> the existing components too much (esp. WriteFiles, which is already
>> complex) might lead to two logic paths, one specialized for the SMB case.
>> It might be easier to decouple some of them for better reuse. But again I
>> feel it's a separate discussion.
>> >
>> > On Mon, Jul 15, 2019 at 9:45 AM Claire McGinty <
>> [email protected]> wrote:
>> >>
>> >> Thanks Robert!
>> >>
>> >> We'd definitely like to be able to re-use existing I/O components--for
>> example the Writer<DestinationT, OutputT>/FileBasedReader<T> (since they
>> operate on a WritableByteChannel/ReadableByteChannel, which is the level of
>> granularity we need) but the Writers, at least, seem to be mostly
>> private-access. Do you foresee them being made public at any point?
>> >>
>> >> - Claire
>> >>
>> >> On Mon, Jul 15, 2019 at 9:31 AM Robert Bradshaw <[email protected]>
>> wrote:
>> >>>
>> >>> I left some comments on the doc.
>> >>>
>> >>> I think the general idea is sound, but one thing that worries me is
>> >>> the introduction of a parallel set of IOs that mirrors the (existing)
>> >>> FileIOs. I would suggest either (1) incorporate this functionality
>> >>> into the generic FileIO infrastructure, or let it be parameterized by
>> >>> arbitrary IO (which I'm not sure is possible, especially for the Read
>> >>> side (and better would be the capability of supporting arbitrary
>> >>> sources, aka an optional "as-sharded-source" operation that returns a
>> >>> PTransform<..., KV<shard-id, Iterable<KV<K, V>>> where the iterable is
>> >>> promised to be in key order)) or support a single SMB aka
>> >>> "PreGrouping" source/sink pair that's aways used together (and whose
>> >>> underlying format is not necessarily public).
>> >>>
>> >>> On Sat, Jul 13, 2019 at 3:19 PM Neville Li <[email protected]>
>> wrote:
>> >>> >
>> >>> > 4 people have commented but mostly clarifying details and not much
>> on the overall design.
>> >>> >
>> >>> > It'd be great to have thumbs up/down on the design, specifically
>> metadata, bucket & shard strategy, etc., since that affects backwards
>> compatibility of output files.
>> >>> > Some breaking changes, e.g. dynamic # of shards, are out of scope
>> for V1 unless someone feels strongly about it. The current scope should
>> cover all our use cases and leave room for optimization.
>> >>> >
>> >>> > Once green lighted we can start adopting internally, ironing out
>> rough edges while iterating on the PRs in parallel.
>> >>> >
>> >>> > Most of the implementation is self-contained in the extensions:smb
>> module, except making a few core classes/methods public for reuse. So
>> despite the amount of work it's still fairly low risk to the code base.
>> There're some proposed optimization & refactoring involving core (see
>> appendix) but IMO they're better left for followup PRs.
>> >>> >
>> >>> > On Fri, Jul 12, 2019 at 11:34 PM Kenneth Knowles <[email protected]>
>> wrote:
>> >>> >>
>> >>> >> I've seen some discussion on the doc. I cannot tell whether the
>> questions are resolved or what the status of review is. Would you mind
>> looping this thread with a quick summary? This is such a major piece of
>> work I don't want it to sit with everyone thinking they are waiting on
>> someone else, or any such thing. (not saying this is happening, just
>> pinging to be sure)
>> >>> >>
>> >>> >> Kenn
>> >>> >>
>> >>> >> On Mon, Jul 1, 2019 at 1:09 PM Neville Li <[email protected]>
>> wrote:
>> >>> >>>
>> >>> >>> Updated the doc a bit with more future work (appendix). IMO most
>> of them are non-breaking and better done in separate PRs later since some
>> involve pretty big refactoring and are outside the scope of MVP.
>> >>> >>>
>> >>> >>> For now we'd really like to get feedback on some fundamental
>> design decisions and find a way to move forward.
>> >>> >>>
>> >>> >>> On Thu, Jun 27, 2019 at 4:39 PM Neville Li <[email protected]>
>> wrote:
>> >>> >>>>
>> >>> >>>> Thanks. I responded to comments in the doc. More inline.
>> >>> >>>>
>> >>> >>>> On Thu, Jun 27, 2019 at 2:44 PM Chamikara Jayalath <
>> [email protected]> wrote:
>> >>> >>>>>
>> >>> >>>>> Thanks added few comments.
>> >>> >>>>>
>> >>> >>>>> If I understood correctly, you basically assign elements with
>> keys to different buckets which are written to unique files and merge files
>> for the same key while reading ?
>> >>> >>>>>
>> >>> >>>>> Some of my concerns are.
>> >>> >>>>>
>> >>> >>>>> (1)  Seems like you rely on an in-memory sorting of buckets.
>> Will this end up limiting the size of a PCollection you can process ?
>> >>> >>>>
>> >>> >>>> The sorter transform we're using supports spilling and external
>> sort. We can break up large key groups further by sharding, similar to fan
>> out in some GBK transforms.
>> >>> >>>>
>> >>> >>>>> (2) Seems like you rely on Reshuffle.viaRandomKey() which is
>> actually implemented using a shuffle (which you try to replace with this
>> proposal).
>> >>> >>>>
>> >>> >>>> That's for distributing task metadata, so that each DoFn thread
>> picks up a random bucket and sort merge key-values. It's not shuffling
>> actual data.
>> >>> >>>>
>> >>> >>>>>
>> >>> >>>>> (3) I think (at least some of the) shuffle implementations are
>> implemented in ways similar to this (writing to files and merging). So I'm
>> wondering if the performance benefits you see are for a very specific case
>> and may limit the functionality in other ways.
>> >>> >>>>
>> >>> >>>> This is for the common pattern of few core data producer
>> pipelines and many downstream consumer pipelines. It's not intended to
>> replace shuffle/join within a single pipeline. On the producer side, by
>> pre-grouping/sorting data and writing to bucket/shard output files, the
>> consumer can sort/merge matching ones without a CoGBK. Essentially we're
>> paying the shuffle cost upfront to avoid them repeatedly in each consumer
>> pipeline that wants to join data.
>> >>> >>>>
>> >>> >>>>>
>> >>> >>>>> Thanks,
>> >>> >>>>> Cham
>> >>> >>>>>
>> >>> >>>>>
>> >>> >>>>> On Thu, Jun 27, 2019 at 8:12 AM Neville Li <
>> [email protected]> wrote:
>> >>> >>>>>>
>> >>> >>>>>> Ping again. Any chance someone takes a look to get this thing
>> going? It's just a design doc and basic metadata/IO impl. We're not talking
>> about actual source/sink code yet (already done but saved for future PRs).
>> >>> >>>>>>
>> >>> >>>>>> On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay <[email protected]>
>> wrote:
>> >>> >>>>>>>
>> >>> >>>>>>> Thank you Claire, this looks promising. Explicitly adding a
>> few folks that might have feedback: +Ismaël Mejía +Robert Bradshaw +Lukasz
>> Cwik +Chamikara Jayalath
>> >>> >>>>>>>
>> >>> >>>>>>> On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty <
>> [email protected]> wrote:
>> >>> >>>>>>>>
>> >>> >>>>>>>> Hey dev@!
>> >>> >>>>>>>>
>> >>> >>>>>>>> Myself and a few other Spotify data engineers have put
>> together a design doc for SMB Join support in Beam, and have a working Java
>> implementation we've started to put up for PR ([0], [1], [2]). There's more
>> detailed information in the document, but the tl;dr is that SMB is a
>> strategy to optimize joins for file-based sources by modifying the initial
>> write operation to write records in sorted buckets based on the desired
>> join key. This means that subsequent joins of datasets written in this way
>> are only sequential file reads, no shuffling involved. We've seen some
>> pretty substantial performance speedups with our implementation and would
>> love to get it checked in to Beam's Java SDK.
>> >>> >>>>>>>>
>> >>> >>>>>>>> We'd appreciate any suggestions or feedback on our
>> proposal--the design doc should be public to comment on.
>> >>> >>>>>>>>
>> >>> >>>>>>>> Thanks!
>> >>> >>>>>>>> Claire / Neville
>>
>

-- 
Cheers,
Gleb

Reply via email to