What is the long-term plan for org.apache.beam.sdk.io.Read? Is it going
away in favor of SDF, or we are always going to have both?

I was looking into AvroIO.read and AvroIO.readAll, both of them
use AvroSource. AvroIO.readAll is using SDF, and it's implemented with
ReadAllViaFileBasedSource that takes AvroSource as a parameter. Looking at
ReadAllViaFileBasedSource I find it not necessary to use Source<?>, it
should be enough to have something like (KV<ReadableFile, OffsetRange>,
OutputReceiver<T>), as we have discussed in this thread, and that should be
fine for SMB as well. It would require duplicating code from AvroSource,
but in the end, I don't see it as a problem if AvroSource is going away.

I'm attaching a small diagram I put for myself to better understand the
code.

AvroIO.readAll :: PTransform<PBegin, PCollection<T>> ->

FileIO.matchAll :: PTransform<PCollection<String>,
PCollection<MatchResult.Metadata>>
FileIO.readMatches :: PTransform<PCollection<MatchResult.Metadata>,
PCollection<ReadableFile>>
AvroIO.readFiles :: PTransform<PCollection<FileIO.ReadableFile>,
PCollection<T>> ->

ReadAllViaFileBasedSource :: PTransform<PCollection<ReadableFile>,
PCollection<T>> ->

ParDo.of(SplitIntoRangesFn :: DoFn<ReadableFile, KV<ReadableFile,
OffsetRange>>) (splittable do fn)

Reshuffle.viaRandomKey()

ParDo.of(ReadFileRangesFn(createSource) :: DoFn<KV<ReadableFile,
OffsetRange>, T>) where

createSource :: String -> FileBasedSource<T>

createSource = AvroSource


AvroIO.read without getHintMatchedManyFiles() :: PTransform<PBegin,
PCollection<T>> ->

Read.Bounded.from(createSource) where

createSource :: String -> FileBasedSource<T>

createSource = AvroSource


Gleb


On Thu, Jul 25, 2019 at 2:41 PM Robert Bradshaw <rober...@google.com> wrote:

> On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles <k...@apache.org> wrote:
> >
> > From the peanut gallery, keeping a separate implementation for SMB seems
> fine. Dependencies are serious liabilities for both upstream and
> downstream. It seems like the reuse angle is generating extra work, and
> potentially making already-complex implementations more complex, instead of
> helping things.
>
> +1
>
> To be clear, what I care about is that WriteFiles(X) and
> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
> TFRecord, ...}. In other words composability of the API (vs. manually
> filling out the matrix). If WriteFiles and WriteSmbFiles find
> opportunities for (easy, clean) implementation sharing, that'd be
> nice, but not the primary goal.
>
> (Similarly for reading, though that's seem less obvious. Certainly
> whatever T is useful for ReadSmb(T) could be useful for a
> (non-liquid-shading) ReadAll(T) however.)
>
> > On Wed, Jul 24, 2019 at 11:59 AM Neville Li <neville....@gmail.com>
> wrote:
> >>
> >> I spoke too soon. Turns out for unsharded writes, numShards can't be
> determined until the last finalize transform, which is again different from
> the current SMB proposal (static number of buckets & shards).
> >> I'll end up with more code specialized for SMB in order to generalize
> existing sink code, which I think we all want to avoid.
> >>
> >> Seems the only option is duplicating some logic like temp file
> handling, which is exactly what we did in the original PR.
> >> I can reuse Compression & Sink<T> for file level writes but that seems
> about the most I can reuse right now.
> >>
> >> On Tue, Jul 23, 2019 at 6:36 PM Neville Li <neville....@gmail.com>
> wrote:
> >>>
> >>> So I spent one afternoon trying some ideas for reusing the last few
> transforms WriteFiles.
> >>>
> >>> WriteShardsIntoTempFilesFn extends DoFn<KV<ShardedKey<Integer>,
> Iterable<UserT>>, FileResult<DestinationT>>
> >>> => GatherResults<ResultT> extends PTransform<PCollection<ResultT>,
> PCollection<List<ResultT>>>
> >>> => FinalizeTempFileBundles extends
> PTransform<PCollection<List<FileResult<DestinationT>>>,
> WriteFilesResult<DestinationT>>
> >>>
> >>> I replaced FileResult<DestinationT> with KV<DestinationT, ResourceId>
> so I can use pre-compute SMB destination file names for the transforms.
> >>> I'm also thinking of parameterizing ShardedKey<Integer> for SMB's
> bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are
> private and easy to change/pull out.
> >>>
> >>> OTOH they are somewhat coupled with the package private
> {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of
> temp file handing logic lives). Might be hard to decouple either modifying
> existing code or creating new transforms, unless if we re-write most of
> FileBasedSink from scratch.
> >>>
> >>> Let me know if I'm on the wrong track.
> >>>
> >>> WIP Branch https://github.com/spotify/beam/tree/neville/write-files
> >>>
> >>> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath <
> chamik...@google.com> wrote:
> >>>>
> >>>>
> >>>>
> >>>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <rober...@google.com>
> wrote:
> >>>>>
> >>>>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <
> kirpic...@google.com> wrote:
> >>>>> >
> >>>>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <
> rober...@google.com> wrote:
> >>>>> >>
> >>>>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <neville....@gmail.com>
> wrote:
> >>>>> >> >
> >>>>> >> > Thanks Robert. Agree with the FileIO point. I'll look into it
> and see what needs to be done.
> >>>>> >> >
> >>>>> >> > Eugene pointed out that we shouldn't build on
> FileBased{Source,Sink}. So for writes I'll probably build on top of
> WriteFiles.
> >>>>> >>
> >>>>> >> Meaning it could be parameterized by FileIO.Sink, right?
> >>>>> >>
> >>>>> >>
> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
> >>>>> >
> >>>>> > Yeah if possible, parameterize FileIO.Sink.
> >>>>> > I would recommend against building on top of WriteFiles either.
> FileIO being implemented on top of WriteFiles was supposed to be a
> temporary measure - the longer-term plan was to rewrite it from scratch
> (albeit with a similar structure) and throw away WriteFiles.
> >>>>> > If possible, I would recommend to pursue this path: if there are
> parts of WriteFiles you want to reuse, I would recommend to implement them
> as new transforms, not at all tied to FileBasedSink (but ok if tied to
> FileIO.Sink), with the goal in mind that FileIO could be rewritten on top
> of these new transforms, or maybe parts of WriteFiles could be swapped out
> for them incrementally.
> >>>>>
> >>>>> Thanks for the feedback. There's a lot that was done, but looking at
> >>>>> the code it feels like there's a lot that was not yet done either,
> and
> >>>>> the longer-term plan wasn't clear (though perhaps I'm just not
> finding
> >>>>> the right docs).
> >>>>
> >>>>
> >>>> I'm also a bit unfamiliar with original plans for WriteFiles and for
> updating source interfaces, but I prefer not significantly modifying
> existing IO transforms to suite the SMB use-case. If there are existing
> pieces of code that can be easily re-used that is fine, but existing
> sources/sinks are designed to perform a PCollection -> file transformation
> and vice versa with (usually) runner determined sharding. Things specific
> to SMB such as sharding restrictions, writing metadata to a separate file,
> reading multiple files from the same abstraction, does not sound like
> features that should be included in our usual file read/write transforms.
> >>>>
> >>>>>
> >>>>> >> > Read might be a bigger change w.r.t. collocating ordered
> elements across files within a bucket and TBH I'm not even sure where to
> start.
> >>>>> >>
> >>>>> >> Yeah, here we need an interface that gives us ReadableFile ->
> >>>>> >> Iterable<T>. There are existing
> PTransform<PCollection<ReadableFile>,
> >>>>> >> PCollection<T>> but such an interface is insufficient to extract
> >>>>> >> ordered records per shard. It seems the only concrete
> implementations
> >>>>> >> are based on FileBasedSource, which we'd like to avoid, but
> there's no
> >>>>> >> alternative. An SDF, if exposed, would likely be overkill and
> >>>>> >> cumbersome to call (given the reflection machinery involved in
> >>>>> >> invoking DoFns).
> >>>>> >
> >>>>> > Seems easiest to just define a new regular Java interface for this.
> >>>>> > Could be either, indeed, ReadableFile -> Iterable<T>, or something
> analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void. Depends on how
> much control over iteration you need.
> >>>>>
> >>>>> For this application, one wants to iterate over several files in
> >>>>> parallel. The downside of a new interface is that it shares almost
> >>>>> nothing with the "normal" sources (e.g. when features (or
> >>>>> optimizations) get added to one, they won't get added to the other).
> >>>>
> >>>>
> >>>>>
> >>>>>
> >>>>> > And yes, DoFn's including SDF's are not designed to be used as
> Java interfaces per se. If you need DoFn machinery in this interface (e.g.
> side inputs), use Contextful - s.apache.org/context-fn.
> >>>>>
> >>>>> Yeah, one of the primary downsides to the NewDoFns is how hard it is
> >>>>> to build new DoFns out of others (or, really, use them in any context
> >>>>> other than as an argument to ParDo).
> >>>>>
> >>>>> >> > I'll file separate PRs for core changes needed for discussion.
> WDYT?
> >>>>> >>
> >>>>> >> Sounds good.
> >>>>
> >>>>
> >>>> +1
> >>>>
> >>>>>
> >>>>> >>
> >>>>> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <
> rober...@google.com> wrote:
> >>>>> >> >>
> >>>>> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <
> neville....@gmail.com> wrote:
> >>>>> >> >> >
> >>>>> >> >> > Forking this thread to discuss action items regarding the
> change. We can keep technical discussion in the original thread.
> >>>>> >> >> >
> >>>>> >> >> > Background: our SMB POC showed promising performance & cost
> saving improvements and we'd like to adopt it for production soon (by EOY).
> We want to contribute it to Beam so it's better generalized and maintained.
> We also want to avoid divergence between our internal version and the PR
> while it's in progress, specifically any breaking change in the produced
> SMB data.
> >>>>> >> >>
> >>>>> >> >> All good goals.
> >>>>> >> >>
> >>>>> >> >> > To achieve that I'd like to propose a few action items.
> >>>>> >> >> >
> >>>>> >> >> > 1. Reach a consensus about bucket and shard strategy, key
> handling, bucket file and metadata format, etc., anything that affect
> produced SMB data.
> >>>>> >> >> > 2. Revise the existing PR according to #1
> >>>>> >> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink,
> Compression, etc., but keep the existing file level abstraction
> >>>>> >> >> > 4. (Optional) Merge code into extensions::smb but mark
> clearly as @experimental
> >>>>> >> >> > 5. Incorporate ideas from the discussion, e.g. ShardingFn,
> GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
> >>>>> >> >> >
> >>>>> >> >> > #1-4 gives us something usable in the short term, while #1
> guarantees that production data produced today are usable when #5 lands on
> master. #4 also gives early adopters a chance to give feedback.
> >>>>> >> >> > Due to the scope of #5, it might take much longer and a
> couple of big PRs to achieve, which we can keep iterating on.
> >>>>> >> >> >
> >>>>> >> >> > What are your thoughts on this?
> >>>>> >> >>
> >>>>> >> >> I would like to see some resolution on the FileIO abstractions
> before
> >>>>> >> >> merging into experimental. (We have a FileBasedSink that would
> mostly
> >>>>> >> >> already work, so it's a matter of coming up with an analogous
> Source
> >>>>> >> >> interface.) Specifically I would not want to merge a set of
> per file
> >>>>> >> >> type smb IOs without a path forward to this or the
> determination that
> >>>>> >> >> it's not possible/desirable.
>


-- 
Cheers,
Gleb

Reply via email to