On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <rober...@google.com> wrote:
> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <kirpic...@google.com> > wrote: > > > > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <rober...@google.com> > wrote: > >> > >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <neville....@gmail.com> > wrote: > >> > > >> > Thanks Robert. Agree with the FileIO point. I'll look into it and see > what needs to be done. > >> > > >> > Eugene pointed out that we shouldn't build on FileBased{Source,Sink}. > So for writes I'll probably build on top of WriteFiles. > >> > >> Meaning it could be parameterized by FileIO.Sink, right? > >> > >> > https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779 > > > > Yeah if possible, parameterize FileIO.Sink. > > I would recommend against building on top of WriteFiles either. FileIO > being implemented on top of WriteFiles was supposed to be a temporary > measure - the longer-term plan was to rewrite it from scratch (albeit with > a similar structure) and throw away WriteFiles. > > If possible, I would recommend to pursue this path: if there are parts > of WriteFiles you want to reuse, I would recommend to implement them as new > transforms, not at all tied to FileBasedSink (but ok if tied to > FileIO.Sink), with the goal in mind that FileIO could be rewritten on top > of these new transforms, or maybe parts of WriteFiles could be swapped out > for them incrementally. > > Thanks for the feedback. There's a lot that was done, but looking at > the code it feels like there's a lot that was not yet done either, and > the longer-term plan wasn't clear (though perhaps I'm just not finding > the right docs). > I'm also a bit unfamiliar with original plans for WriteFiles and for updating source interfaces, but I prefer not significantly modifying existing IO transforms to suite the SMB use-case. If there are existing pieces of code that can be easily re-used that is fine, but existing sources/sinks are designed to perform a PCollection -> file transformation and vice versa with (usually) runner determined sharding. Things specific to SMB such as sharding restrictions, writing metadata to a separate file, reading multiple files from the same abstraction, does not sound like features that should be included in our usual file read/write transforms. > >> > Read might be a bigger change w.r.t. collocating ordered elements > across files within a bucket and TBH I'm not even sure where to start. > >> > >> Yeah, here we need an interface that gives us ReadableFile -> > >> Iterable<T>. There are existing PTransform<PCollection<ReadableFile>, > >> PCollection<T>> but such an interface is insufficient to extract > >> ordered records per shard. It seems the only concrete implementations > >> are based on FileBasedSource, which we'd like to avoid, but there's no > >> alternative. An SDF, if exposed, would likely be overkill and > >> cumbersome to call (given the reflection machinery involved in > >> invoking DoFns). > > > > Seems easiest to just define a new regular Java interface for this. > > Could be either, indeed, ReadableFile -> Iterable<T>, or something > analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void. Depends on how > much control over iteration you need. > > For this application, one wants to iterate over several files in > parallel. The downside of a new interface is that it shares almost > nothing with the "normal" sources (e.g. when features (or > optimizations) get added to one, they won't get added to the other). > > > And yes, DoFn's including SDF's are not designed to be used as Java > interfaces per se. If you need DoFn machinery in this interface (e.g. side > inputs), use Contextful - s.apache.org/context-fn. > > Yeah, one of the primary downsides to the NewDoFns is how hard it is > to build new DoFns out of others (or, really, use them in any context > other than as an argument to ParDo). > > >> > I'll file separate PRs for core changes needed for discussion. WDYT? > >> > >> Sounds good. > +1 > >> > >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <rober...@google.com> > wrote: > >> >> > >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <neville....@gmail.com> > wrote: > >> >> > > >> >> > Forking this thread to discuss action items regarding the change. > We can keep technical discussion in the original thread. > >> >> > > >> >> > Background: our SMB POC showed promising performance & cost saving > improvements and we'd like to adopt it for production soon (by EOY). We > want to contribute it to Beam so it's better generalized and maintained. We > also want to avoid divergence between our internal version and the PR while > it's in progress, specifically any breaking change in the produced SMB data. > >> >> > >> >> All good goals. > >> >> > >> >> > To achieve that I'd like to propose a few action items. > >> >> > > >> >> > 1. Reach a consensus about bucket and shard strategy, key > handling, bucket file and metadata format, etc., anything that affect > produced SMB data. > >> >> > 2. Revise the existing PR according to #1 > >> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink, > Compression, etc., but keep the existing file level abstraction > >> >> > 4. (Optional) Merge code into extensions::smb but mark clearly as > @experimental > >> >> > 5. Incorporate ideas from the discussion, e.g. ShardingFn, > GroupByKeyAndSortValues, FileIO generalization, key URN, etc. > >> >> > > >> >> > #1-4 gives us something usable in the short term, while #1 > guarantees that production data produced today are usable when #5 lands on > master. #4 also gives early adopters a chance to give feedback. > >> >> > Due to the scope of #5, it might take much longer and a couple of > big PRs to achieve, which we can keep iterating on. > >> >> > > >> >> > What are your thoughts on this? > >> >> > >> >> I would like to see some resolution on the FileIO abstractions before > >> >> merging into experimental. (We have a FileBasedSink that would mostly > >> >> already work, so it's a matter of coming up with an analogous Source > >> >> interface.) Specifically I would not want to merge a set of per file > >> >> type smb IOs without a path forward to this or the determination that > >> >> it's not possible/desirable. >