There is still considerable value in knowing data sources statically so you can do things like fetch sizes and other metadata and adjust pipeline shape. I would not expect to delete these, but to implement them on top of SDF while still giving them a clear URN and payload so runners can know that it is a statically-specified source.
Kenn On Fri, Jul 26, 2019 at 3:23 AM Robert Bradshaw <rober...@google.com> wrote: > On Thu, Jul 25, 2019 at 11:09 PM Eugene Kirpichov <kirpic...@google.com> > wrote: > > > > Hi Gleb, > > > > Regarding the future of io.Read: ideally things would go as follows > > - All runners support SDF at feature parity with Read (mostly this is > just the Dataflow runner's liquid sharding and size estimation for bounded > sources, and backlog for unbounded sources, but I recall that a couple of > other runners also used size estimation) > > - Bounded/UnboundedSource APIs are declared "deprecated" - it is > forbidden to add any new implementations to SDK, and users shouldn't use > them either (note: I believe it's already effectively forbidden to use them > for cases where a DoFn/SDF at the current level of support will be > sufficient) > > - People one by one rewrite existing Bounded/UnboundedSource based > PTransforms in the SDK to use SDFs instead > > - Read.from() is rewritten to use a wrapper SDF over the given Source, > and explicit support for Read is deleted from runners > > - In the next major version of Beam - presumably 3.0 - the Read > transform itself is deleted > > > > I don't know what's the current status of SDF/Read feature parity, maybe > Luke or Cham can comment. An alternative path is offered in > http://s.apache.org/sdf-via-source. > > Python supports initial splitting for SDF of all sources on portable > runners. Dataflow support for batch SDF is undergoing testing, not yet > rolled out. Dataflow support for streaming SDF is awaiting portable > state/timer support. > > > On Thu, Jul 25, 2019 at 6:39 AM Gleb Kanterov <g...@spotify.com> wrote: > >> > >> What is the long-term plan for org.apache.beam.sdk.io.Read? Is it going > away in favor of SDF, or we are always going to have both? > >> > >> I was looking into AvroIO.read and AvroIO.readAll, both of them use > AvroSource. AvroIO.readAll is using SDF, and it's implemented with > ReadAllViaFileBasedSource that takes AvroSource as a parameter. Looking at > ReadAllViaFileBasedSource I find it not necessary to use Source<?>, it > should be enough to have something like (KV<ReadableFile, OffsetRange>, > OutputReceiver<T>), as we have discussed in this thread, and that should be > fine for SMB as well. It would require duplicating code from AvroSource, > but in the end, I don't see it as a problem if AvroSource is going away. > >> > >> I'm attaching a small diagram I put for myself to better understand the > code. > >> > >> AvroIO.readAll :: PTransform<PBegin, PCollection<T>> -> > >> > >> FileIO.matchAll :: PTransform<PCollection<String>, > PCollection<MatchResult.Metadata>> > >> FileIO.readMatches :: PTransform<PCollection<MatchResult.Metadata>, > PCollection<ReadableFile>> > >> AvroIO.readFiles :: PTransform<PCollection<FileIO.ReadableFile>, > PCollection<T>> -> > >> > >> ReadAllViaFileBasedSource :: PTransform<PCollection<ReadableFile>, > PCollection<T>> -> > >> > >> ParDo.of(SplitIntoRangesFn :: DoFn<ReadableFile, KV<ReadableFile, > OffsetRange>>) (splittable do fn) > >> > >> Reshuffle.viaRandomKey() > >> > >> ParDo.of(ReadFileRangesFn(createSource) :: DoFn<KV<ReadableFile, > OffsetRange>, T>) where > >> > >> createSource :: String -> FileBasedSource<T> > >> > >> createSource = AvroSource > >> > >> > >> AvroIO.read without getHintMatchedManyFiles() :: PTransform<PBegin, > PCollection<T>> -> > >> > >> Read.Bounded.from(createSource) where > >> > >> createSource :: String -> FileBasedSource<T> > >> > >> createSource = AvroSource > >> > >> > >> Gleb > >> > >> > >> On Thu, Jul 25, 2019 at 2:41 PM Robert Bradshaw <rober...@google.com> > wrote: > >>> > >>> On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles <k...@apache.org> > wrote: > >>> > > >>> > From the peanut gallery, keeping a separate implementation for SMB > seems fine. Dependencies are serious liabilities for both upstream and > downstream. It seems like the reuse angle is generating extra work, and > potentially making already-complex implementations more complex, instead of > helping things. > >>> > >>> +1 > >>> > >>> To be clear, what I care about is that WriteFiles(X) and > >>> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text, > >>> TFRecord, ...}. In other words composability of the API (vs. manually > >>> filling out the matrix). If WriteFiles and WriteSmbFiles find > >>> opportunities for (easy, clean) implementation sharing, that'd be > >>> nice, but not the primary goal. > >>> > >>> (Similarly for reading, though that's seem less obvious. Certainly > >>> whatever T is useful for ReadSmb(T) could be useful for a > >>> (non-liquid-shading) ReadAll(T) however.) > >>> > >>> > On Wed, Jul 24, 2019 at 11:59 AM Neville Li <neville....@gmail.com> > wrote: > >>> >> > >>> >> I spoke too soon. Turns out for unsharded writes, numShards can't > be determined until the last finalize transform, which is again different > from the current SMB proposal (static number of buckets & shards). > >>> >> I'll end up with more code specialized for SMB in order to > generalize existing sink code, which I think we all want to avoid. > >>> >> > >>> >> Seems the only option is duplicating some logic like temp file > handling, which is exactly what we did in the original PR. > >>> >> I can reuse Compression & Sink<T> for file level writes but that > seems about the most I can reuse right now. > >>> >> > >>> >> On Tue, Jul 23, 2019 at 6:36 PM Neville Li <neville....@gmail.com> > wrote: > >>> >>> > >>> >>> So I spent one afternoon trying some ideas for reusing the last > few transforms WriteFiles. > >>> >>> > >>> >>> WriteShardsIntoTempFilesFn extends DoFn<KV<ShardedKey<Integer>, > Iterable<UserT>>, FileResult<DestinationT>> > >>> >>> => GatherResults<ResultT> extends PTransform<PCollection<ResultT>, > PCollection<List<ResultT>>> > >>> >>> => FinalizeTempFileBundles extends > PTransform<PCollection<List<FileResult<DestinationT>>>, > WriteFilesResult<DestinationT>> > >>> >>> > >>> >>> I replaced FileResult<DestinationT> with KV<DestinationT, > ResourceId> so I can use pre-compute SMB destination file names for the > transforms. > >>> >>> I'm also thinking of parameterizing ShardedKey<Integer> for SMB's > bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are > private and easy to change/pull out. > >>> >>> > >>> >>> OTOH they are somewhat coupled with the package private > {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of > temp file handing logic lives). Might be hard to decouple either modifying > existing code or creating new transforms, unless if we re-write most of > FileBasedSink from scratch. > >>> >>> > >>> >>> Let me know if I'm on the wrong track. > >>> >>> > >>> >>> WIP Branch > https://github.com/spotify/beam/tree/neville/write-files > >>> >>> > >>> >>> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath < > chamik...@google.com> wrote: > >>> >>>> > >>> >>>> > >>> >>>> > >>> >>>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw < > rober...@google.com> wrote: > >>> >>>>> > >>> >>>>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov < > kirpic...@google.com> wrote: > >>> >>>>> > > >>> >>>>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw < > rober...@google.com> wrote: > >>> >>>>> >> > >>> >>>>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li < > neville....@gmail.com> wrote: > >>> >>>>> >> > > >>> >>>>> >> > Thanks Robert. Agree with the FileIO point. I'll look into > it and see what needs to be done. > >>> >>>>> >> > > >>> >>>>> >> > Eugene pointed out that we shouldn't build on > FileBased{Source,Sink}. So for writes I'll probably build on top of > WriteFiles. > >>> >>>>> >> > >>> >>>>> >> Meaning it could be parameterized by FileIO.Sink, right? > >>> >>>>> >> > >>> >>>>> >> > https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779 > >>> >>>>> > > >>> >>>>> > Yeah if possible, parameterize FileIO.Sink. > >>> >>>>> > I would recommend against building on top of WriteFiles > either. FileIO being implemented on top of WriteFiles was supposed to be a > temporary measure - the longer-term plan was to rewrite it from scratch > (albeit with a similar structure) and throw away WriteFiles. > >>> >>>>> > If possible, I would recommend to pursue this path: if there > are parts of WriteFiles you want to reuse, I would recommend to implement > them as new transforms, not at all tied to FileBasedSink (but ok if tied to > FileIO.Sink), with the goal in mind that FileIO could be rewritten on top > of these new transforms, or maybe parts of WriteFiles could be swapped out > for them incrementally. > >>> >>>>> > >>> >>>>> Thanks for the feedback. There's a lot that was done, but > looking at > >>> >>>>> the code it feels like there's a lot that was not yet done > either, and > >>> >>>>> the longer-term plan wasn't clear (though perhaps I'm just not > finding > >>> >>>>> the right docs). > >>> >>>> > >>> >>>> > >>> >>>> I'm also a bit unfamiliar with original plans for WriteFiles and > for updating source interfaces, but I prefer not significantly modifying > existing IO transforms to suite the SMB use-case. If there are existing > pieces of code that can be easily re-used that is fine, but existing > sources/sinks are designed to perform a PCollection -> file transformation > and vice versa with (usually) runner determined sharding. Things specific > to SMB such as sharding restrictions, writing metadata to a separate file, > reading multiple files from the same abstraction, does not sound like > features that should be included in our usual file read/write transforms. > >>> >>>> > >>> >>>>> > >>> >>>>> >> > Read might be a bigger change w.r.t. collocating ordered > elements across files within a bucket and TBH I'm not even sure where to > start. > >>> >>>>> >> > >>> >>>>> >> Yeah, here we need an interface that gives us ReadableFile -> > >>> >>>>> >> Iterable<T>. There are existing > PTransform<PCollection<ReadableFile>, > >>> >>>>> >> PCollection<T>> but such an interface is insufficient to > extract > >>> >>>>> >> ordered records per shard. It seems the only concrete > implementations > >>> >>>>> >> are based on FileBasedSource, which we'd like to avoid, but > there's no > >>> >>>>> >> alternative. An SDF, if exposed, would likely be overkill and > >>> >>>>> >> cumbersome to call (given the reflection machinery involved in > >>> >>>>> >> invoking DoFns). > >>> >>>>> > > >>> >>>>> > Seems easiest to just define a new regular Java interface for > this. > >>> >>>>> > Could be either, indeed, ReadableFile -> Iterable<T>, or > something analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void. > Depends on how much control over iteration you need. > >>> >>>>> > >>> >>>>> For this application, one wants to iterate over several files in > >>> >>>>> parallel. The downside of a new interface is that it shares > almost > >>> >>>>> nothing with the "normal" sources (e.g. when features (or > >>> >>>>> optimizations) get added to one, they won't get added to the > other). > >>> >>>> > >>> >>>> > >>> >>>>> > >>> >>>>> > >>> >>>>> > And yes, DoFn's including SDF's are not designed to be used as > Java interfaces per se. If you need DoFn machinery in this interface (e.g. > side inputs), use Contextful - s.apache.org/context-fn. > >>> >>>>> > >>> >>>>> Yeah, one of the primary downsides to the NewDoFns is how hard > it is > >>> >>>>> to build new DoFns out of others (or, really, use them in any > context > >>> >>>>> other than as an argument to ParDo). > >>> >>>>> > >>> >>>>> >> > I'll file separate PRs for core changes needed for > discussion. WDYT? > >>> >>>>> >> > >>> >>>>> >> Sounds good. > >>> >>>> > >>> >>>> > >>> >>>> +1 > >>> >>>> > >>> >>>>> > >>> >>>>> >> > >>> >>>>> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw < > rober...@google.com> wrote: > >>> >>>>> >> >> > >>> >>>>> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li < > neville....@gmail.com> wrote: > >>> >>>>> >> >> > > >>> >>>>> >> >> > Forking this thread to discuss action items regarding > the change. We can keep technical discussion in the original thread. > >>> >>>>> >> >> > > >>> >>>>> >> >> > Background: our SMB POC showed promising performance & > cost saving improvements and we'd like to adopt it for production soon (by > EOY). We want to contribute it to Beam so it's better generalized and > maintained. We also want to avoid divergence between our internal version > and the PR while it's in progress, specifically any breaking change in the > produced SMB data. > >>> >>>>> >> >> > >>> >>>>> >> >> All good goals. > >>> >>>>> >> >> > >>> >>>>> >> >> > To achieve that I'd like to propose a few action items. > >>> >>>>> >> >> > > >>> >>>>> >> >> > 1. Reach a consensus about bucket and shard strategy, > key handling, bucket file and metadata format, etc., anything that affect > produced SMB data. > >>> >>>>> >> >> > 2. Revise the existing PR according to #1 > >>> >>>>> >> >> > 3. Reduce duplicate file IO logic by reusing > FileIO.Sink, Compression, etc., but keep the existing file level abstraction > >>> >>>>> >> >> > 4. (Optional) Merge code into extensions::smb but mark > clearly as @experimental > >>> >>>>> >> >> > 5. Incorporate ideas from the discussion, e.g. > ShardingFn, GroupByKeyAndSortValues, FileIO generalization, key URN, etc. > >>> >>>>> >> >> > > >>> >>>>> >> >> > #1-4 gives us something usable in the short term, while > #1 guarantees that production data produced today are usable when #5 lands > on master. #4 also gives early adopters a chance to give feedback. > >>> >>>>> >> >> > Due to the scope of #5, it might take much longer and a > couple of big PRs to achieve, which we can keep iterating on. > >>> >>>>> >> >> > > >>> >>>>> >> >> > What are your thoughts on this? > >>> >>>>> >> >> > >>> >>>>> >> >> I would like to see some resolution on the FileIO > abstractions before > >>> >>>>> >> >> merging into experimental. (We have a FileBasedSink that > would mostly > >>> >>>>> >> >> already work, so it's a matter of coming up with an > analogous Source > >>> >>>>> >> >> interface.) Specifically I would not want to merge a set > of per file > >>> >>>>> >> >> type smb IOs without a path forward to this or the > determination that > >>> >>>>> >> >> it's not possible/desirable. > >> > >> > >> > >> -- > >> Cheers, > >> Gleb >