>From the peanut gallery, keeping a separate implementation for SMB seems fine. Dependencies are serious liabilities for both upstream and downstream. It seems like the reuse angle is generating extra work, and potentially making already-complex implementations more complex, instead of helping things.
Kenn On Wed, Jul 24, 2019 at 11:59 AM Neville Li <neville....@gmail.com> wrote: > I spoke too soon. Turns out for unsharded writes, numShards can't be > determined until the last finalize transform, which is again different from > the current SMB proposal (static number of buckets & shards). > I'll end up with more code specialized for SMB in order to generalize > existing sink code, which I think we all want to avoid. > > Seems the only option is duplicating some logic like temp file handling, > which is exactly what we did in the original PR. > I can reuse Compression & Sink<T> for file level writes but that seems > about the most I can reuse right now. > > On Tue, Jul 23, 2019 at 6:36 PM Neville Li <neville....@gmail.com> wrote: > >> So I spent one afternoon trying some ideas for reusing the last few >> transforms WriteFiles. >> >> WriteShardsIntoTempFilesFn extends DoFn<KV<*ShardedKey<Integer>*, >> Iterable<UserT>>, *FileResult*<DestinationT>> >> => GatherResults<ResultT> extends PTransform<PCollection<ResultT>, >> PCollection<List<ResultT>>> >> => FinalizeTempFileBundles extends PTransform<PCollection<List< >> *FileResult<DestinationT>*>>, WriteFilesResult<DestinationT>> >> >> I replaced FileResult<DestinationT> with KV<DestinationT, ResourceId> so >> I can use pre-compute SMB destination file names for the transforms. >> I'm also thinking of parameterizing ShardedKey<Integer> for SMB's >> bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are >> private and easy to change/pull out. >> >> OTOH they are somewhat coupled with the package private >> {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of >> temp file handing logic lives). Might be hard to decouple either modifying >> existing code or creating new transforms, unless if we re-write most of >> FileBasedSink from scratch. >> >> Let me know if I'm on the wrong track. >> >> WIP Branch https://github.com/spotify/beam/tree/neville/write-files >> >> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath <chamik...@google.com> >> wrote: >> >>> >>> >>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <kirpic...@google.com> >>>> wrote: >>>> > >>>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <rober...@google.com> >>>> wrote: >>>> >> >>>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <neville....@gmail.com> >>>> wrote: >>>> >> > >>>> >> > Thanks Robert. Agree with the FileIO point. I'll look into it and >>>> see what needs to be done. >>>> >> > >>>> >> > Eugene pointed out that we shouldn't build on >>>> FileBased{Source,Sink}. So for writes I'll probably build on top of >>>> WriteFiles. >>>> >> >>>> >> Meaning it could be parameterized by FileIO.Sink, right? >>>> >> >>>> >> >>>> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779 >>>> > >>>> > Yeah if possible, parameterize FileIO.Sink. >>>> > I would recommend against building on top of WriteFiles either. >>>> FileIO being implemented on top of WriteFiles was supposed to be a >>>> temporary measure - the longer-term plan was to rewrite it from scratch >>>> (albeit with a similar structure) and throw away WriteFiles. >>>> > If possible, I would recommend to pursue this path: if there are >>>> parts of WriteFiles you want to reuse, I would recommend to implement them >>>> as new transforms, not at all tied to FileBasedSink (but ok if tied to >>>> FileIO.Sink), with the goal in mind that FileIO could be rewritten on top >>>> of these new transforms, or maybe parts of WriteFiles could be swapped out >>>> for them incrementally. >>>> >>>> Thanks for the feedback. There's a lot that was done, but looking at >>>> the code it feels like there's a lot that was not yet done either, and >>>> the longer-term plan wasn't clear (though perhaps I'm just not finding >>>> the right docs). >>>> >>> >>> I'm also a bit unfamiliar with original plans for WriteFiles and for >>> updating source interfaces, but I prefer not significantly modifying >>> existing IO transforms to suite the SMB use-case. If there are existing >>> pieces of code that can be easily re-used that is fine, but existing >>> sources/sinks are designed to perform a PCollection -> file transformation >>> and vice versa with (usually) runner determined sharding. Things specific >>> to SMB such as sharding restrictions, writing metadata to a separate file, >>> reading multiple files from the same abstraction, does not sound like >>> features that should be included in our usual file read/write transforms. >>> >>> >>>> >> > Read might be a bigger change w.r.t. collocating ordered elements >>>> across files within a bucket and TBH I'm not even sure where to start. >>>> >> >>>> >> Yeah, here we need an interface that gives us ReadableFile -> >>>> >> Iterable<T>. There are existing PTransform<PCollection<ReadableFile>, >>>> >> PCollection<T>> but such an interface is insufficient to extract >>>> >> ordered records per shard. It seems the only concrete implementations >>>> >> are based on FileBasedSource, which we'd like to avoid, but there's >>>> no >>>> >> alternative. An SDF, if exposed, would likely be overkill and >>>> >> cumbersome to call (given the reflection machinery involved in >>>> >> invoking DoFns). >>>> > >>>> > Seems easiest to just define a new regular Java interface for this. >>>> > Could be either, indeed, ReadableFile -> Iterable<T>, or something >>>> analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void. Depends on how >>>> much control over iteration you need. >>>> >>>> For this application, one wants to iterate over several files in >>>> parallel. The downside of a new interface is that it shares almost >>>> nothing with the "normal" sources (e.g. when features (or >>>> optimizations) get added to one, they won't get added to the other). >>> >>> >>> >>>> >>>> > And yes, DoFn's including SDF's are not designed to be used as Java >>>> interfaces per se. If you need DoFn machinery in this interface (e.g. side >>>> inputs), use Contextful - s.apache.org/context-fn. >>>> >>>> Yeah, one of the primary downsides to the NewDoFns is how hard it is >>>> to build new DoFns out of others (or, really, use them in any context >>>> other than as an argument to ParDo). >>>> >>>> >> > I'll file separate PRs for core changes needed for discussion. >>>> WDYT? >>>> >> >>>> >> Sounds good. >>>> >>> >>> +1 >>> >>> >>>> >> >>>> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw < >>>> rober...@google.com> wrote: >>>> >> >> >>>> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <neville....@gmail.com> >>>> wrote: >>>> >> >> > >>>> >> >> > Forking this thread to discuss action items regarding the >>>> change. We can keep technical discussion in the original thread. >>>> >> >> > >>>> >> >> > Background: our SMB POC showed promising performance & cost >>>> saving improvements and we'd like to adopt it for production soon (by EOY). >>>> We want to contribute it to Beam so it's better generalized and maintained. >>>> We also want to avoid divergence between our internal version and the PR >>>> while it's in progress, specifically any breaking change in the produced >>>> SMB data. >>>> >> >> >>>> >> >> All good goals. >>>> >> >> >>>> >> >> > To achieve that I'd like to propose a few action items. >>>> >> >> > >>>> >> >> > 1. Reach a consensus about bucket and shard strategy, key >>>> handling, bucket file and metadata format, etc., anything that affect >>>> produced SMB data. >>>> >> >> > 2. Revise the existing PR according to #1 >>>> >> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink, >>>> Compression, etc., but keep the existing file level abstraction >>>> >> >> > 4. (Optional) Merge code into extensions::smb but mark clearly >>>> as @experimental >>>> >> >> > 5. Incorporate ideas from the discussion, e.g. ShardingFn, >>>> GroupByKeyAndSortValues, FileIO generalization, key URN, etc. >>>> >> >> > >>>> >> >> > #1-4 gives us something usable in the short term, while #1 >>>> guarantees that production data produced today are usable when #5 lands on >>>> master. #4 also gives early adopters a chance to give feedback. >>>> >> >> > Due to the scope of #5, it might take much longer and a couple >>>> of big PRs to achieve, which we can keep iterating on. >>>> >> >> > >>>> >> >> > What are your thoughts on this? >>>> >> >> >>>> >> >> I would like to see some resolution on the FileIO abstractions >>>> before >>>> >> >> merging into experimental. (We have a FileBasedSink that would >>>> mostly >>>> >> >> already work, so it's a matter of coming up with an analogous >>>> Source >>>> >> >> interface.) Specifically I would not want to merge a set of per >>>> file >>>> >> >> type smb IOs without a path forward to this or the determination >>>> that >>>> >> >> it's not possible/desirable. >>>> >>>