As far as I/O code re-use, the consensus seems to be to make the SMB module as composable as possible using existing Beam components, ideally as-is or with very basic tweaks.
To be clear, what I care about is that WriteFiles(X) and > WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text, > TFRecord, ...}. In other words composability of the API (vs. manually > filling out the matrix). If WriteFiles and WriteSmbFiles find > opportunities for (easy, clean) implementation sharing, that'd be > nice, but not the primary goal. For SMB writes, it's a pretty easy change to parameterize them by FileIO.Sink, for which there are already public implementations for Avro/TFRecord/Parquet/Text! It'll remove a lot of code duplication from the smb module. (Similarly for reading, though that's seem less obvious. Certainly > whatever T is useful for ReadSmb(T) could be useful for a > (non-liquid-shading) ReadAll(T) however.) It seems like there isn't an easily composable equivalent for Reader that isn't coupled with FileBasedSource (+1 on Gleb's question about the long-term future of org.apache.beam.sdk.io.Read). One thing we could do to improve parity between SMB reads and Beam's io module is to use ReadableFile <https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L381> as our file handles (currently we just use plain ResourceIds + FileSystems api to open), so our Source transform would more closely resemble the existing ReadFiles transforms <https://github.com/apache/beam/blob/c7911043510a266078a3dc8faef7a1dbe1f598c5/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java#L698>. ReadableFile also brings in io.Compression <https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Compression.java> and handles opening a ReadableByteChannel for us. We'd still be re-implementing the I/O operations that deserialize a bytestream into individual elements, but this seems unavoidable for the time being. Let me know what you think about these proposed modifications to SMB read/write! Thanks, Claire On Thu, Jul 25, 2019 at 9:39 AM Gleb Kanterov <g...@spotify.com> wrote: > What is the long-term plan for org.apache.beam.sdk.io.Read? Is it going > away in favor of SDF, or we are always going to have both? > > I was looking into AvroIO.read and AvroIO.readAll, both of them > use AvroSource. AvroIO.readAll is using SDF, and it's implemented with > ReadAllViaFileBasedSource that takes AvroSource as a parameter. Looking at > ReadAllViaFileBasedSource I find it not necessary to use Source<?>, it > should be enough to have something like (KV<ReadableFile, OffsetRange>, > OutputReceiver<T>), as we have discussed in this thread, and that should be > fine for SMB as well. It would require duplicating code from AvroSource, > but in the end, I don't see it as a problem if AvroSource is going away. > > I'm attaching a small diagram I put for myself to better understand the > code. > > AvroIO.readAll :: PTransform<PBegin, PCollection<T>> -> > > FileIO.matchAll :: PTransform<PCollection<String>, > PCollection<MatchResult.Metadata>> > FileIO.readMatches :: PTransform<PCollection<MatchResult.Metadata>, > PCollection<ReadableFile>> > AvroIO.readFiles :: PTransform<PCollection<FileIO.ReadableFile>, > PCollection<T>> -> > > ReadAllViaFileBasedSource :: PTransform<PCollection<ReadableFile>, > PCollection<T>> -> > > ParDo.of(SplitIntoRangesFn :: DoFn<ReadableFile, KV<ReadableFile, > OffsetRange>>) (splittable do fn) > > Reshuffle.viaRandomKey() > > ParDo.of(ReadFileRangesFn(createSource) :: DoFn<KV<ReadableFile, > OffsetRange>, T>) where > > createSource :: String -> FileBasedSource<T> > > createSource = AvroSource > > > AvroIO.read without getHintMatchedManyFiles() :: PTransform<PBegin, > PCollection<T>> -> > > Read.Bounded.from(createSource) where > > createSource :: String -> FileBasedSource<T> > > createSource = AvroSource > > > Gleb > > > On Thu, Jul 25, 2019 at 2:41 PM Robert Bradshaw <rober...@google.com> > wrote: > >> On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles <k...@apache.org> wrote: >> > >> > From the peanut gallery, keeping a separate implementation for SMB >> seems fine. Dependencies are serious liabilities for both upstream and >> downstream. It seems like the reuse angle is generating extra work, and >> potentially making already-complex implementations more complex, instead of >> helping things. >> >> +1 >> >> To be clear, what I care about is that WriteFiles(X) and >> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text, >> TFRecord, ...}. In other words composability of the API (vs. manually >> filling out the matrix). If WriteFiles and WriteSmbFiles find >> opportunities for (easy, clean) implementation sharing, that'd be >> nice, but not the primary goal. >> >> (Similarly for reading, though that's seem less obvious. Certainly >> whatever T is useful for ReadSmb(T) could be useful for a >> (non-liquid-shading) ReadAll(T) however.) >> >> > On Wed, Jul 24, 2019 at 11:59 AM Neville Li <neville....@gmail.com> >> wrote: >> >> >> >> I spoke too soon. Turns out for unsharded writes, numShards can't be >> determined until the last finalize transform, which is again different from >> the current SMB proposal (static number of buckets & shards). >> >> I'll end up with more code specialized for SMB in order to generalize >> existing sink code, which I think we all want to avoid. >> >> >> >> Seems the only option is duplicating some logic like temp file >> handling, which is exactly what we did in the original PR. >> >> I can reuse Compression & Sink<T> for file level writes but that seems >> about the most I can reuse right now. >> >> >> >> On Tue, Jul 23, 2019 at 6:36 PM Neville Li <neville....@gmail.com> >> wrote: >> >>> >> >>> So I spent one afternoon trying some ideas for reusing the last few >> transforms WriteFiles. >> >>> >> >>> WriteShardsIntoTempFilesFn extends DoFn<KV<ShardedKey<Integer>, >> Iterable<UserT>>, FileResult<DestinationT>> >> >>> => GatherResults<ResultT> extends PTransform<PCollection<ResultT>, >> PCollection<List<ResultT>>> >> >>> => FinalizeTempFileBundles extends >> PTransform<PCollection<List<FileResult<DestinationT>>>, >> WriteFilesResult<DestinationT>> >> >>> >> >>> I replaced FileResult<DestinationT> with KV<DestinationT, ResourceId> >> so I can use pre-compute SMB destination file names for the transforms. >> >>> I'm also thinking of parameterizing ShardedKey<Integer> for SMB's >> bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are >> private and easy to change/pull out. >> >>> >> >>> OTOH they are somewhat coupled with the package private >> {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of >> temp file handing logic lives). Might be hard to decouple either modifying >> existing code or creating new transforms, unless if we re-write most of >> FileBasedSink from scratch. >> >>> >> >>> Let me know if I'm on the wrong track. >> >>> >> >>> WIP Branch https://github.com/spotify/beam/tree/neville/write-files >> >>> >> >>> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath < >> chamik...@google.com> wrote: >> >>>> >> >>>> >> >>>> >> >>>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <rober...@google.com> >> wrote: >> >>>>> >> >>>>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov < >> kirpic...@google.com> wrote: >> >>>>> > >> >>>>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw < >> rober...@google.com> wrote: >> >>>>> >> >> >>>>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li < >> neville....@gmail.com> wrote: >> >>>>> >> > >> >>>>> >> > Thanks Robert. Agree with the FileIO point. I'll look into it >> and see what needs to be done. >> >>>>> >> > >> >>>>> >> > Eugene pointed out that we shouldn't build on >> FileBased{Source,Sink}. So for writes I'll probably build on top of >> WriteFiles. >> >>>>> >> >> >>>>> >> Meaning it could be parameterized by FileIO.Sink, right? >> >>>>> >> >> >>>>> >> >> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779 >> >>>>> > >> >>>>> > Yeah if possible, parameterize FileIO.Sink. >> >>>>> > I would recommend against building on top of WriteFiles either. >> FileIO being implemented on top of WriteFiles was supposed to be a >> temporary measure - the longer-term plan was to rewrite it from scratch >> (albeit with a similar structure) and throw away WriteFiles. >> >>>>> > If possible, I would recommend to pursue this path: if there are >> parts of WriteFiles you want to reuse, I would recommend to implement them >> as new transforms, not at all tied to FileBasedSink (but ok if tied to >> FileIO.Sink), with the goal in mind that FileIO could be rewritten on top >> of these new transforms, or maybe parts of WriteFiles could be swapped out >> for them incrementally. >> >>>>> >> >>>>> Thanks for the feedback. There's a lot that was done, but looking at >> >>>>> the code it feels like there's a lot that was not yet done either, >> and >> >>>>> the longer-term plan wasn't clear (though perhaps I'm just not >> finding >> >>>>> the right docs). >> >>>> >> >>>> >> >>>> I'm also a bit unfamiliar with original plans for WriteFiles and for >> updating source interfaces, but I prefer not significantly modifying >> existing IO transforms to suite the SMB use-case. If there are existing >> pieces of code that can be easily re-used that is fine, but existing >> sources/sinks are designed to perform a PCollection -> file transformation >> and vice versa with (usually) runner determined sharding. Things specific >> to SMB such as sharding restrictions, writing metadata to a separate file, >> reading multiple files from the same abstraction, does not sound like >> features that should be included in our usual file read/write transforms. >> >>>> >> >>>>> >> >>>>> >> > Read might be a bigger change w.r.t. collocating ordered >> elements across files within a bucket and TBH I'm not even sure where to >> start. >> >>>>> >> >> >>>>> >> Yeah, here we need an interface that gives us ReadableFile -> >> >>>>> >> Iterable<T>. There are existing >> PTransform<PCollection<ReadableFile>, >> >>>>> >> PCollection<T>> but such an interface is insufficient to extract >> >>>>> >> ordered records per shard. It seems the only concrete >> implementations >> >>>>> >> are based on FileBasedSource, which we'd like to avoid, but >> there's no >> >>>>> >> alternative. An SDF, if exposed, would likely be overkill and >> >>>>> >> cumbersome to call (given the reflection machinery involved in >> >>>>> >> invoking DoFns). >> >>>>> > >> >>>>> > Seems easiest to just define a new regular Java interface for >> this. >> >>>>> > Could be either, indeed, ReadableFile -> Iterable<T>, or >> something analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void. >> Depends on how much control over iteration you need. >> >>>>> >> >>>>> For this application, one wants to iterate over several files in >> >>>>> parallel. The downside of a new interface is that it shares almost >> >>>>> nothing with the "normal" sources (e.g. when features (or >> >>>>> optimizations) get added to one, they won't get added to the other). >> >>>> >> >>>> >> >>>>> >> >>>>> >> >>>>> > And yes, DoFn's including SDF's are not designed to be used as >> Java interfaces per se. If you need DoFn machinery in this interface (e.g. >> side inputs), use Contextful - s.apache.org/context-fn. >> >>>>> >> >>>>> Yeah, one of the primary downsides to the NewDoFns is how hard it is >> >>>>> to build new DoFns out of others (or, really, use them in any >> context >> >>>>> other than as an argument to ParDo). >> >>>>> >> >>>>> >> > I'll file separate PRs for core changes needed for discussion. >> WDYT? >> >>>>> >> >> >>>>> >> Sounds good. >> >>>> >> >>>> >> >>>> +1 >> >>>> >> >>>>> >> >>>>> >> >> >>>>> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw < >> rober...@google.com> wrote: >> >>>>> >> >> >> >>>>> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li < >> neville....@gmail.com> wrote: >> >>>>> >> >> > >> >>>>> >> >> > Forking this thread to discuss action items regarding the >> change. We can keep technical discussion in the original thread. >> >>>>> >> >> > >> >>>>> >> >> > Background: our SMB POC showed promising performance & cost >> saving improvements and we'd like to adopt it for production soon (by EOY). >> We want to contribute it to Beam so it's better generalized and maintained. >> We also want to avoid divergence between our internal version and the PR >> while it's in progress, specifically any breaking change in the produced >> SMB data. >> >>>>> >> >> >> >>>>> >> >> All good goals. >> >>>>> >> >> >> >>>>> >> >> > To achieve that I'd like to propose a few action items. >> >>>>> >> >> > >> >>>>> >> >> > 1. Reach a consensus about bucket and shard strategy, key >> handling, bucket file and metadata format, etc., anything that affect >> produced SMB data. >> >>>>> >> >> > 2. Revise the existing PR according to #1 >> >>>>> >> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink, >> Compression, etc., but keep the existing file level abstraction >> >>>>> >> >> > 4. (Optional) Merge code into extensions::smb but mark >> clearly as @experimental >> >>>>> >> >> > 5. Incorporate ideas from the discussion, e.g. ShardingFn, >> GroupByKeyAndSortValues, FileIO generalization, key URN, etc. >> >>>>> >> >> > >> >>>>> >> >> > #1-4 gives us something usable in the short term, while #1 >> guarantees that production data produced today are usable when #5 lands on >> master. #4 also gives early adopters a chance to give feedback. >> >>>>> >> >> > Due to the scope of #5, it might take much longer and a >> couple of big PRs to achieve, which we can keep iterating on. >> >>>>> >> >> > >> >>>>> >> >> > What are your thoughts on this? >> >>>>> >> >> >> >>>>> >> >> I would like to see some resolution on the FileIO >> abstractions before >> >>>>> >> >> merging into experimental. (We have a FileBasedSink that >> would mostly >> >>>>> >> >> already work, so it's a matter of coming up with an analogous >> Source >> >>>>> >> >> interface.) Specifically I would not want to merge a set of >> per file >> >>>>> >> >> type smb IOs without a path forward to this or the >> determination that >> >>>>> >> >> it's not possible/desirable. >> > > > -- > Cheers, > Gleb >