I spoke too soon. Turns out for unsharded writes, numShards can't be determined until the last finalize transform, which is again different from the current SMB proposal (static number of buckets & shards). I'll end up with more code specialized for SMB in order to generalize existing sink code, which I think we all want to avoid.
Seems the only option is duplicating some logic like temp file handling, which is exactly what we did in the original PR. I can reuse Compression & Sink<T> for file level writes but that seems about the most I can reuse right now. On Tue, Jul 23, 2019 at 6:36 PM Neville Li <neville....@gmail.com> wrote: > So I spent one afternoon trying some ideas for reusing the last few > transforms WriteFiles. > > WriteShardsIntoTempFilesFn extends DoFn<KV<*ShardedKey<Integer>*, > Iterable<UserT>>, *FileResult*<DestinationT>> > => GatherResults<ResultT> extends PTransform<PCollection<ResultT>, > PCollection<List<ResultT>>> > => FinalizeTempFileBundles extends PTransform<PCollection<List< > *FileResult<DestinationT>*>>, WriteFilesResult<DestinationT>> > > I replaced FileResult<DestinationT> with KV<DestinationT, ResourceId> so I > can use pre-compute SMB destination file names for the transforms. > I'm also thinking of parameterizing ShardedKey<Integer> for SMB's > bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are > private and easy to change/pull out. > > OTOH they are somewhat coupled with the package private > {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of > temp file handing logic lives). Might be hard to decouple either modifying > existing code or creating new transforms, unless if we re-write most of > FileBasedSink from scratch. > > Let me know if I'm on the wrong track. > > WIP Branch https://github.com/spotify/beam/tree/neville/write-files > > On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath <chamik...@google.com> > wrote: > >> >> >> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <rober...@google.com> >> wrote: >> >>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <kirpic...@google.com> >>> wrote: >>> > >>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >> >>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <neville....@gmail.com> >>> wrote: >>> >> > >>> >> > Thanks Robert. Agree with the FileIO point. I'll look into it and >>> see what needs to be done. >>> >> > >>> >> > Eugene pointed out that we shouldn't build on >>> FileBased{Source,Sink}. So for writes I'll probably build on top of >>> WriteFiles. >>> >> >>> >> Meaning it could be parameterized by FileIO.Sink, right? >>> >> >>> >> >>> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779 >>> > >>> > Yeah if possible, parameterize FileIO.Sink. >>> > I would recommend against building on top of WriteFiles either. FileIO >>> being implemented on top of WriteFiles was supposed to be a temporary >>> measure - the longer-term plan was to rewrite it from scratch (albeit with >>> a similar structure) and throw away WriteFiles. >>> > If possible, I would recommend to pursue this path: if there are parts >>> of WriteFiles you want to reuse, I would recommend to implement them as new >>> transforms, not at all tied to FileBasedSink (but ok if tied to >>> FileIO.Sink), with the goal in mind that FileIO could be rewritten on top >>> of these new transforms, or maybe parts of WriteFiles could be swapped out >>> for them incrementally. >>> >>> Thanks for the feedback. There's a lot that was done, but looking at >>> the code it feels like there's a lot that was not yet done either, and >>> the longer-term plan wasn't clear (though perhaps I'm just not finding >>> the right docs). >>> >> >> I'm also a bit unfamiliar with original plans for WriteFiles and for >> updating source interfaces, but I prefer not significantly modifying >> existing IO transforms to suite the SMB use-case. If there are existing >> pieces of code that can be easily re-used that is fine, but existing >> sources/sinks are designed to perform a PCollection -> file transformation >> and vice versa with (usually) runner determined sharding. Things specific >> to SMB such as sharding restrictions, writing metadata to a separate file, >> reading multiple files from the same abstraction, does not sound like >> features that should be included in our usual file read/write transforms. >> >> >>> >> > Read might be a bigger change w.r.t. collocating ordered elements >>> across files within a bucket and TBH I'm not even sure where to start. >>> >> >>> >> Yeah, here we need an interface that gives us ReadableFile -> >>> >> Iterable<T>. There are existing PTransform<PCollection<ReadableFile>, >>> >> PCollection<T>> but such an interface is insufficient to extract >>> >> ordered records per shard. It seems the only concrete implementations >>> >> are based on FileBasedSource, which we'd like to avoid, but there's no >>> >> alternative. An SDF, if exposed, would likely be overkill and >>> >> cumbersome to call (given the reflection machinery involved in >>> >> invoking DoFns). >>> > >>> > Seems easiest to just define a new regular Java interface for this. >>> > Could be either, indeed, ReadableFile -> Iterable<T>, or something >>> analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void. Depends on how >>> much control over iteration you need. >>> >>> For this application, one wants to iterate over several files in >>> parallel. The downside of a new interface is that it shares almost >>> nothing with the "normal" sources (e.g. when features (or >>> optimizations) get added to one, they won't get added to the other). >> >> >> >>> >>> > And yes, DoFn's including SDF's are not designed to be used as Java >>> interfaces per se. If you need DoFn machinery in this interface (e.g. side >>> inputs), use Contextful - s.apache.org/context-fn. >>> >>> Yeah, one of the primary downsides to the NewDoFns is how hard it is >>> to build new DoFns out of others (or, really, use them in any context >>> other than as an argument to ParDo). >>> >>> >> > I'll file separate PRs for core changes needed for discussion. WDYT? >>> >> >>> >> Sounds good. >>> >> >> +1 >> >> >>> >> >>> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw < >>> rober...@google.com> wrote: >>> >> >> >>> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <neville....@gmail.com> >>> wrote: >>> >> >> > >>> >> >> > Forking this thread to discuss action items regarding the >>> change. We can keep technical discussion in the original thread. >>> >> >> > >>> >> >> > Background: our SMB POC showed promising performance & cost >>> saving improvements and we'd like to adopt it for production soon (by EOY). >>> We want to contribute it to Beam so it's better generalized and maintained. >>> We also want to avoid divergence between our internal version and the PR >>> while it's in progress, specifically any breaking change in the produced >>> SMB data. >>> >> >> >>> >> >> All good goals. >>> >> >> >>> >> >> > To achieve that I'd like to propose a few action items. >>> >> >> > >>> >> >> > 1. Reach a consensus about bucket and shard strategy, key >>> handling, bucket file and metadata format, etc., anything that affect >>> produced SMB data. >>> >> >> > 2. Revise the existing PR according to #1 >>> >> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink, >>> Compression, etc., but keep the existing file level abstraction >>> >> >> > 4. (Optional) Merge code into extensions::smb but mark clearly >>> as @experimental >>> >> >> > 5. Incorporate ideas from the discussion, e.g. ShardingFn, >>> GroupByKeyAndSortValues, FileIO generalization, key URN, etc. >>> >> >> > >>> >> >> > #1-4 gives us something usable in the short term, while #1 >>> guarantees that production data produced today are usable when #5 lands on >>> master. #4 also gives early adopters a chance to give feedback. >>> >> >> > Due to the scope of #5, it might take much longer and a couple >>> of big PRs to achieve, which we can keep iterating on. >>> >> >> > >>> >> >> > What are your thoughts on this? >>> >> >> >>> >> >> I would like to see some resolution on the FileIO abstractions >>> before >>> >> >> merging into experimental. (We have a FileBasedSink that would >>> mostly >>> >> >> already work, so it's a matter of coming up with an analogous >>> Source >>> >> >> interface.) Specifically I would not want to merge a set of per >>> file >>> >> >> type smb IOs without a path forward to this or the determination >>> that >>> >> >> it's not possible/desirable. >>> >>