I agree that generalizing the existing FileIO may not be the right
path forward, and I'd only make their innards public with great care.
(Would this be used like like
SmbSink(MyFileIO.sink(parameters).getWriter[Factory]())?) SMB is a bit
unique that the source and sink are much more coupled than other
sources and sinks (which happen to be completely independent, if
complementary implementations, whereas SMB attempts to be a kind of
pipe where one half is instanciated in each pipeline).

In short, an SMB source/sink that is parameterized by an arbitrary,
existing IO would be ideal (but possibly not feasible (per existing
prioritizations)), or an SMB source/sink that works as a pair. What
I'd like to avoid is a set of parallel SMB IO classes that (partially,
and incompletely) mirror the existing IO ones (from an API
perspective--how much implementation it makes sense to share is an
orthogonal issue that I'm sure can be worked out.)

On Mon, Jul 15, 2019 at 4:18 PM Neville Li <neville....@gmail.com> wrote:
>
> Hi Robert,
>
> I agree, it'd be nice to reuse FileIO logic of different file types. But 
> given the current code structure of FileIO & scope of the change, I feel it's 
> better left for future refactor PRs.
>
> Some thoughts:
> - SMB file operation is simple single file sequential reads/writes, which 
> already exists as Writer & FileBasedReader but are private inner classes, and 
> have references to the parent Sink/Source instance.
> - The readers also have extra offset/split logic but that can be worked 
> around.
> - It'll be nice to not duplicate temp->destination file logic but again 
> WriteFiles is assuming a single integer shard key, so it'll take some 
> refactoring to reuse it.
>
> All of these can be done in backwards compatible way. OTOH generalizing the 
> existing components too much (esp. WriteFiles, which is already complex) 
> might lead to two logic paths, one specialized for the SMB case. It might be 
> easier to decouple some of them for better reuse. But again I feel it's a 
> separate discussion.
>
> On Mon, Jul 15, 2019 at 9:45 AM Claire McGinty <claire.d.mcgi...@gmail.com> 
> wrote:
>>
>> Thanks Robert!
>>
>> We'd definitely like to be able to re-use existing I/O components--for 
>> example the Writer<DestinationT, OutputT>/FileBasedReader<T> (since they 
>> operate on a WritableByteChannel/ReadableByteChannel, which is the level of 
>> granularity we need) but the Writers, at least, seem to be mostly 
>> private-access. Do you foresee them being made public at any point?
>>
>> - Claire
>>
>> On Mon, Jul 15, 2019 at 9:31 AM Robert Bradshaw <rober...@google.com> wrote:
>>>
>>> I left some comments on the doc.
>>>
>>> I think the general idea is sound, but one thing that worries me is
>>> the introduction of a parallel set of IOs that mirrors the (existing)
>>> FileIOs. I would suggest either (1) incorporate this functionality
>>> into the generic FileIO infrastructure, or let it be parameterized by
>>> arbitrary IO (which I'm not sure is possible, especially for the Read
>>> side (and better would be the capability of supporting arbitrary
>>> sources, aka an optional "as-sharded-source" operation that returns a
>>> PTransform<..., KV<shard-id, Iterable<KV<K, V>>> where the iterable is
>>> promised to be in key order)) or support a single SMB aka
>>> "PreGrouping" source/sink pair that's aways used together (and whose
>>> underlying format is not necessarily public).
>>>
>>> On Sat, Jul 13, 2019 at 3:19 PM Neville Li <neville....@gmail.com> wrote:
>>> >
>>> > 4 people have commented but mostly clarifying details and not much on the 
>>> > overall design.
>>> >
>>> > It'd be great to have thumbs up/down on the design, specifically 
>>> > metadata, bucket & shard strategy, etc., since that affects backwards 
>>> > compatibility of output files.
>>> > Some breaking changes, e.g. dynamic # of shards, are out of scope for V1 
>>> > unless someone feels strongly about it. The current scope should cover 
>>> > all our use cases and leave room for optimization.
>>> >
>>> > Once green lighted we can start adopting internally, ironing out rough 
>>> > edges while iterating on the PRs in parallel.
>>> >
>>> > Most of the implementation is self-contained in the extensions:smb 
>>> > module, except making a few core classes/methods public for reuse. So 
>>> > despite the amount of work it's still fairly low risk to the code base. 
>>> > There're some proposed optimization & refactoring involving core (see 
>>> > appendix) but IMO they're better left for followup PRs.
>>> >
>>> > On Fri, Jul 12, 2019 at 11:34 PM Kenneth Knowles <k...@apache.org> wrote:
>>> >>
>>> >> I've seen some discussion on the doc. I cannot tell whether the 
>>> >> questions are resolved or what the status of review is. Would you mind 
>>> >> looping this thread with a quick summary? This is such a major piece of 
>>> >> work I don't want it to sit with everyone thinking they are waiting on 
>>> >> someone else, or any such thing. (not saying this is happening, just 
>>> >> pinging to be sure)
>>> >>
>>> >> Kenn
>>> >>
>>> >> On Mon, Jul 1, 2019 at 1:09 PM Neville Li <neville....@gmail.com> wrote:
>>> >>>
>>> >>> Updated the doc a bit with more future work (appendix). IMO most of 
>>> >>> them are non-breaking and better done in separate PRs later since some 
>>> >>> involve pretty big refactoring and are outside the scope of MVP.
>>> >>>
>>> >>> For now we'd really like to get feedback on some fundamental design 
>>> >>> decisions and find a way to move forward.
>>> >>>
>>> >>> On Thu, Jun 27, 2019 at 4:39 PM Neville Li <neville....@gmail.com> 
>>> >>> wrote:
>>> >>>>
>>> >>>> Thanks. I responded to comments in the doc. More inline.
>>> >>>>
>>> >>>> On Thu, Jun 27, 2019 at 2:44 PM Chamikara Jayalath 
>>> >>>> <chamik...@google.com> wrote:
>>> >>>>>
>>> >>>>> Thanks added few comments.
>>> >>>>>
>>> >>>>> If I understood correctly, you basically assign elements with keys to 
>>> >>>>> different buckets which are written to unique files and merge files 
>>> >>>>> for the same key while reading ?
>>> >>>>>
>>> >>>>> Some of my concerns are.
>>> >>>>>
>>> >>>>> (1)  Seems like you rely on an in-memory sorting of buckets. Will 
>>> >>>>> this end up limiting the size of a PCollection you can process ?
>>> >>>>
>>> >>>> The sorter transform we're using supports spilling and external sort. 
>>> >>>> We can break up large key groups further by sharding, similar to fan 
>>> >>>> out in some GBK transforms.
>>> >>>>
>>> >>>>> (2) Seems like you rely on Reshuffle.viaRandomKey() which is actually 
>>> >>>>> implemented using a shuffle (which you try to replace with this 
>>> >>>>> proposal).
>>> >>>>
>>> >>>> That's for distributing task metadata, so that each DoFn thread picks 
>>> >>>> up a random bucket and sort merge key-values. It's not shuffling 
>>> >>>> actual data.
>>> >>>>
>>> >>>>>
>>> >>>>> (3) I think (at least some of the) shuffle implementations are 
>>> >>>>> implemented in ways similar to this (writing to files and merging). 
>>> >>>>> So I'm wondering if the performance benefits you see are for a very 
>>> >>>>> specific case and may limit the functionality in other ways.
>>> >>>>
>>> >>>> This is for the common pattern of few core data producer pipelines and 
>>> >>>> many downstream consumer pipelines. It's not intended to replace 
>>> >>>> shuffle/join within a single pipeline. On the producer side, by 
>>> >>>> pre-grouping/sorting data and writing to bucket/shard output files, 
>>> >>>> the consumer can sort/merge matching ones without a CoGBK. Essentially 
>>> >>>> we're paying the shuffle cost upfront to avoid them repeatedly in each 
>>> >>>> consumer pipeline that wants to join data.
>>> >>>>
>>> >>>>>
>>> >>>>> Thanks,
>>> >>>>> Cham
>>> >>>>>
>>> >>>>>
>>> >>>>> On Thu, Jun 27, 2019 at 8:12 AM Neville Li <neville....@gmail.com> 
>>> >>>>> wrote:
>>> >>>>>>
>>> >>>>>> Ping again. Any chance someone takes a look to get this thing going? 
>>> >>>>>> It's just a design doc and basic metadata/IO impl. We're not talking 
>>> >>>>>> about actual source/sink code yet (already done but saved for future 
>>> >>>>>> PRs).
>>> >>>>>>
>>> >>>>>> On Fri, Jun 21, 2019 at 1:38 PM Ahmet Altay <al...@google.com> wrote:
>>> >>>>>>>
>>> >>>>>>> Thank you Claire, this looks promising. Explicitly adding a few 
>>> >>>>>>> folks that might have feedback: +Ismaël Mejía +Robert Bradshaw 
>>> >>>>>>> +Lukasz Cwik +Chamikara Jayalath
>>> >>>>>>>
>>> >>>>>>> On Mon, Jun 17, 2019 at 2:12 PM Claire McGinty 
>>> >>>>>>> <claire.d.mcgi...@gmail.com> wrote:
>>> >>>>>>>>
>>> >>>>>>>> Hey dev@!
>>> >>>>>>>>
>>> >>>>>>>> Myself and a few other Spotify data engineers have put together a 
>>> >>>>>>>> design doc for SMB Join support in Beam, and have a working Java 
>>> >>>>>>>> implementation we've started to put up for PR ([0], [1], [2]). 
>>> >>>>>>>> There's more detailed information in the document, but the tl;dr 
>>> >>>>>>>> is that SMB is a strategy to optimize joins for file-based sources 
>>> >>>>>>>> by modifying the initial write operation to write records in 
>>> >>>>>>>> sorted buckets based on the desired join key. This means that 
>>> >>>>>>>> subsequent joins of datasets written in this way are only 
>>> >>>>>>>> sequential file reads, no shuffling involved. We've seen some 
>>> >>>>>>>> pretty substantial performance speedups with our implementation 
>>> >>>>>>>> and would love to get it checked in to Beam's Java SDK.
>>> >>>>>>>>
>>> >>>>>>>> We'd appreciate any suggestions or feedback on our proposal--the 
>>> >>>>>>>> design doc should be public to comment on.
>>> >>>>>>>>
>>> >>>>>>>> Thanks!
>>> >>>>>>>> Claire / Neville

Reply via email to