Re: avoiding mirroring IO functionality, what about:

- Decouple the nested FileBasedSink.Writer and
FileBasedSource.FileBasedReader, make them top level and remove references
to parent classes.
- Simplify the interfaces, while maintaining support for block/offset read
& sequential write.
- As a bonus, the refactored IO classes can be used standalone in case when
the user wants to perform custom IO in a DoFn, i.e. a
PTransform<PCollection<URI>, PCollection<KV<URI, GenericRecord>>>. Today
this requires a lot of copy-pasted Avro boilerplate.
- For compatibility, we can delegate to the new classes from the old ones
and remove them in the next breaking release.

Re: WriteFiles logic, I'm not sure about generalizing it, but what about
splitting the part handling writing temp files into a new
PTransform<PCollection<KV<ResourceId, Iterable<UserT>>>,
PCollection<WriteFilesResult<DestinationT>>>? That splits the bucket-shard
logic from actual file IO.

On Mon, Jul 15, 2019 at 10:27 AM Robert Bradshaw <rober...@google.com>
wrote:

> I agree that generalizing the existing FileIO may not be the right
> path forward, and I'd only make their innards public with great care.
> (Would this be used like like
> SmbSink(MyFileIO.sink(parameters).getWriter[Factory]())?) SMB is a bit
> unique that the source and sink are much more coupled than other
> sources and sinks (which happen to be completely independent, if
> complementary implementations, whereas SMB attempts to be a kind of
> pipe where one half is instanciated in each pipeline).
>
> In short, an SMB source/sink that is parameterized by an arbitrary,
> existing IO would be ideal (but possibly not feasible (per existing
> prioritizations)), or an SMB source/sink that works as a pair. What
> I'd like to avoid is a set of parallel SMB IO classes that (partially,
> and incompletely) mirror the existing IO ones (from an API
> perspective--how much implementation it makes sense to share is an
> orthogonal issue that I'm sure can be worked out.)
>
> On Mon, Jul 15, 2019 at 4:18 PM Neville Li <neville....@gmail.com> wrote:
> >
> > Hi Robert,
> >
> > I agree, it'd be nice to reuse FileIO logic of different file types. But
> given the current code structure of FileIO & scope of the change, I feel
> it's better left for future refactor PRs.
> >
> > Some thoughts:
> > - SMB file operation is simple single file sequential reads/writes,
> which already exists as Writer & FileBasedReader but are private inner
> classes, and have references to the parent Sink/Source instance.
> > - The readers also have extra offset/split logic but that can be worked
> around.
> > - It'll be nice to not duplicate temp->destination file logic but again
> WriteFiles is assuming a single integer shard key, so it'll take some
> refactoring to reuse it.
> >
> > All of these can be done in backwards compatible way. OTOH generalizing
> the existing components too much (esp. WriteFiles, which is already
> complex) might lead to two logic paths, one specialized for the SMB case.
> It might be easier to decouple some of them for better reuse. But again I
> feel it's a separate discussion.
> >
> > On Mon, Jul 15, 2019 at 9:45 AM Claire McGinty <
> claire.d.mcgi...@gmail.com> wrote:
> >>
> >> Thanks Robert!
> >>
> >> We'd definitely like to be able to re-use existing I/O components--for
> example the Writer<DestinationT, OutputT>/FileBasedReader<T> (since they
> operate on a WritableByteChannel/ReadableByteChannel, which is the level of
> granularity we need) but the Writers, at least, seem to be mostly
> private-access. Do you foresee them being made public at any point?
> >>
> >> - Claire
> >>
> >> On Mon, Jul 15, 2019 at 9:31 AM Robert Bradshaw <rober...@google.com>
> wrote:
> >>>
> >>> I left some comments on the doc.
> >>>
> >>> I think the general idea is sound, but one thing that worries me is
> >>> the introduction of a parallel set of IOs that mirrors the (existing)
> >>> FileIOs. I would suggest either (1) incorporate this functionality
> >>> into the generic FileIO infrastructure, or let it be parameterized by
> >>> arbitrary IO (which I'm not sure is possible, especially for the Read
> >>> side (and better would be the capability of supporting arbitrary
> >>> sources, aka an optional "as-sharded-source" operation that returns a
> >>> PTransform<..., KV<shard-id, Iterable<KV<K, V>>> where the iterable is
> >>> promised to be in key order)) or support a single SMB aka
> >>> "PreGrouping" source/sink pair that's aways used together (and whose
> >>> underlying format is not necessarily public).
> >>>
> >>> On Sat, Jul 13, 2019 at 3:19 PM Neville Li <neville....@gmail.com>
> wrote:
> >>> >
> >>> > 4 people have commented but mostly clarifying details and not much
> on the overall design.
> >>> >
> >>> > It'd be great to have thumbs up/down on the design, specifically
> metadata, bucket & shard strategy, etc., since that affects backwards
> compatibility of output files.
> >>> > Some breaking changes, e.g. dynamic # of shards, are out of scope
> for V1 unless someone feels strongly about it. The current scope should
> cover all our use cases and leave room for optimization.
> >>> >
> >>> > Once green lighted we can start adopting internally, ironing out
> rough edges while iterating on the PRs in parallel.
> >>> >
> >>> > Most of the implementation is self-contained in the extensions:smb
> module, except making a few core classes/methods public for reuse. So
> despite the amount of work it's still fairly low risk to the code base.
> There're some proposed optimization & refactoring involving core (see
> appendix) but IMO they're better left for followup PRs.
> >>> >
> >>> > On Fri, Jul 12, 2019 at 11:34 PM Kenneth Knowles <k...@apache.org>
> wrote:
> >>> >>
> >>> >> I've seen some discussion on the doc. I cannot tell whether the
> questions are resolved or what the status of review is. Would you mind
> looping this thread with a quick summary? This is such a major piece of
> work I don't want it to sit with everyone thinking they are waiting on
> someone else, or any such thing. (not saying this is happening, just
> pinging to be sure)
> >>> >>
> >>> >> Kenn
> >>> >>
> >>> >> On Mon, Jul 1, 2019 at 1:09 PM Neville Li <neville....@gmail.com>
> wrote:
> >>> >>>
> >>> >>> Updated the doc a bit with more future work (appendix). IMO most
> of them are non-breaking and better done in separate PRs later since some
> involve pretty big refactoring and are outside the scope of MVP.
> >>> >>>
> >>> >>> For now we'd really like to get feedback on some fundamental
> design decisions and find a way to move forward.
> >>> >>>
> >>> >>> On Thu, Jun 27, 2019 at 4:39 PM Neville Li <neville....@gmail.com>
> wrote:
> >>> >>>>
> >>> >>>> Thanks. I responded to comments in the doc. More inline.
> >>> >>>>
> >>> >>>> On Thu, Jun 27, 2019 at 2:44 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
> >>> >>>>>
> >>> >>>>> Thanks added few comments.
> >>> >>>>>
> >>> >>>>> If I understood correctly, you basically assign elements with
> keys to different buckets which are written to unique files and merge files
> for the same key while reading ?
> >>> >>>>>
> >>> >>>>> Some of my concerns are.
> >>> >>>>>
> >>> >>>>> (1)  Seems like you rely on an in-memory sorting of buckets.
> Will this end up limiting the size of a PCollection you can process ?
> >>> >>>>
> >>> >>>> The sorter transform we're using supports spilling and external
> sort. We can break up large key groups further by sharding, similar to fan
> out in some GBK transforms.
> >>> >>>>
> >>> >>>>> (2) Seems like you rely on Reshuffle.viaRandomKey() which is
> actually implemented using a shuffle (which you try to replace with this
> proposal).
> >>> >>>>
> >>> >>>> That's for distributing task metadata, so that each DoFn thread
> picks up a random bucket and sort merge key-values. It's not shuffling
> actual data.
> >>> >>>>
> >>> >>>>>
> >>> >>>>> (3) I think (at least some of the) shuffle implementations are
> implemented in ways similar to this (writing to files and merging). So I'm
> wondering if the performance benefits you see are for a very specific case
> and may limit the functionality in other ways.
> >>> >>>>
> >>> >>>> This is for the common pattern of few core data producer
> pipelines and many downstream consumer pipelines. It's not intended to
> replace shuffle/join within a single pipeline. On the producer side, by
> pre-grouping/sorting data and writing to bucket/shard output files, the
> consumer can sort/merge matching ones without a CoGBK. Essentially we're
> paying the shuffle cost upfront to avoid them repeatedly in each consumer
> pipeline that wants to join data.
> >>> >>>>
> >>> >>>>>
> >>> >>>>> Thanks,
> >>> >>>>> Cham
> >>> >>>>>
> >>> >>>>>
> >>> >>>>> On Thu, Jun 27, 2019 at 8:12 AM Neville Li <
> neville....@gmail.com> wrote:
> >>> >>>>>>
> >>> >>>>>> Ping again. Any chance someone takes a look to get this thing
> going? It's just a design doc and basic metadata/IO impl. We're not talking
> about actual source/sink code yet (already done but saved for future PRs).
> >>> >>>>>>
> >>> >>>>>> On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay <al...@google.com>
> wrote:
> >>> >>>>>>>
> >>> >>>>>>> Thank you Claire, this looks promising. Explicitly adding a
> few folks that might have feedback: +Ismaël Mejía +Robert Bradshaw +Lukasz
> Cwik +Chamikara Jayalath
> >>> >>>>>>>
> >>> >>>>>>> On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty <
> claire.d.mcgi...@gmail.com> wrote:
> >>> >>>>>>>>
> >>> >>>>>>>> Hey dev@!
> >>> >>>>>>>>
> >>> >>>>>>>> Myself and a few other Spotify data engineers have put
> together a design doc for SMB Join support in Beam, and have a working Java
> implementation we've started to put up for PR ([0], [1], [2]). There's more
> detailed information in the document, but the tl;dr is that SMB is a
> strategy to optimize joins for file-based sources by modifying the initial
> write operation to write records in sorted buckets based on the desired
> join key. This means that subsequent joins of datasets written in this way
> are only sequential file reads, no shuffling involved. We've seen some
> pretty substantial performance speedups with our implementation and would
> love to get it checked in to Beam's Java SDK.
> >>> >>>>>>>>
> >>> >>>>>>>> We'd appreciate any suggestions or feedback on our
> proposal--the design doc should be public to comment on.
> >>> >>>>>>>>
> >>> >>>>>>>> Thanks!
> >>> >>>>>>>> Claire / Neville
>

Reply via email to