Hi Gleb,

Regarding the future of io.Read: ideally things would go as follows
- All runners support SDF at feature parity with Read (mostly this is just
the Dataflow runner's liquid sharding and size estimation for bounded
sources, and backlog for unbounded sources, but I recall that a couple of
other runners also used size estimation)
- Bounded/UnboundedSource APIs are declared "deprecated" - it is forbidden
to add any new implementations to SDK, and users shouldn't use them either
(note: I believe it's already effectively forbidden to use them for cases
where a DoFn/SDF at the current level of support will be sufficient)
- People one by one rewrite existing Bounded/UnboundedSource based
PTransforms in the SDK to use SDFs instead
- Read.from() is rewritten to use a wrapper SDF over the given Source, and
explicit support for Read is deleted from runners
- In the next major version of Beam - presumably 3.0 - the Read transform
itself is deleted

I don't know what's the current status of SDF/Read feature parity, maybe
Luke or Cham can comment. An alternative path is offered in
http://s.apache.org/sdf-via-source.


On Thu, Jul 25, 2019 at 6:39 AM Gleb Kanterov <g...@spotify.com> wrote:

> What is the long-term plan for org.apache.beam.sdk.io.Read? Is it going
> away in favor of SDF, or we are always going to have both?
>
> I was looking into AvroIO.read and AvroIO.readAll, both of them
> use AvroSource. AvroIO.readAll is using SDF, and it's implemented with
> ReadAllViaFileBasedSource that takes AvroSource as a parameter. Looking at
> ReadAllViaFileBasedSource I find it not necessary to use Source<?>, it
> should be enough to have something like (KV<ReadableFile, OffsetRange>,
> OutputReceiver<T>), as we have discussed in this thread, and that should be
> fine for SMB as well. It would require duplicating code from AvroSource,
> but in the end, I don't see it as a problem if AvroSource is going away.
>
> I'm attaching a small diagram I put for myself to better understand the
> code.
>
> AvroIO.readAll :: PTransform<PBegin, PCollection<T>> ->
>
> FileIO.matchAll :: PTransform<PCollection<String>,
> PCollection<MatchResult.Metadata>>
> FileIO.readMatches :: PTransform<PCollection<MatchResult.Metadata>,
> PCollection<ReadableFile>>
> AvroIO.readFiles :: PTransform<PCollection<FileIO.ReadableFile>,
> PCollection<T>> ->
>
> ReadAllViaFileBasedSource :: PTransform<PCollection<ReadableFile>,
> PCollection<T>> ->
>
> ParDo.of(SplitIntoRangesFn :: DoFn<ReadableFile, KV<ReadableFile,
> OffsetRange>>) (splittable do fn)
>
> Reshuffle.viaRandomKey()
>
> ParDo.of(ReadFileRangesFn(createSource) :: DoFn<KV<ReadableFile,
> OffsetRange>, T>) where
>
> createSource :: String -> FileBasedSource<T>
>
> createSource = AvroSource
>
>
> AvroIO.read without getHintMatchedManyFiles() :: PTransform<PBegin,
> PCollection<T>> ->
>
> Read.Bounded.from(createSource) where
>
> createSource :: String -> FileBasedSource<T>
>
> createSource = AvroSource
>
>
> Gleb
>
>
> On Thu, Jul 25, 2019 at 2:41 PM Robert Bradshaw <rober...@google.com>
> wrote:
>
>> On Thu, Jul 25, 2019 at 12:35 AM Kenneth Knowles <k...@apache.org> wrote:
>> >
>> > From the peanut gallery, keeping a separate implementation for SMB
>> seems fine. Dependencies are serious liabilities for both upstream and
>> downstream. It seems like the reuse angle is generating extra work, and
>> potentially making already-complex implementations more complex, instead of
>> helping things.
>>
>> +1
>>
>> To be clear, what I care about is that WriteFiles(X) and
>> WriteSmbFiles(X) can share the same X, for X in {Avro, Parquet, Text,
>> TFRecord, ...}. In other words composability of the API (vs. manually
>> filling out the matrix). If WriteFiles and WriteSmbFiles find
>> opportunities for (easy, clean) implementation sharing, that'd be
>> nice, but not the primary goal.
>>
>> (Similarly for reading, though that's seem less obvious. Certainly
>> whatever T is useful for ReadSmb(T) could be useful for a
>> (non-liquid-shading) ReadAll(T) however.)
>>
>> > On Wed, Jul 24, 2019 at 11:59 AM Neville Li <neville....@gmail.com>
>> wrote:
>> >>
>> >> I spoke too soon. Turns out for unsharded writes, numShards can't be
>> determined until the last finalize transform, which is again different from
>> the current SMB proposal (static number of buckets & shards).
>> >> I'll end up with more code specialized for SMB in order to generalize
>> existing sink code, which I think we all want to avoid.
>> >>
>> >> Seems the only option is duplicating some logic like temp file
>> handling, which is exactly what we did in the original PR.
>> >> I can reuse Compression & Sink<T> for file level writes but that seems
>> about the most I can reuse right now.
>> >>
>> >> On Tue, Jul 23, 2019 at 6:36 PM Neville Li <neville....@gmail.com>
>> wrote:
>> >>>
>> >>> So I spent one afternoon trying some ideas for reusing the last few
>> transforms WriteFiles.
>> >>>
>> >>> WriteShardsIntoTempFilesFn extends DoFn<KV<ShardedKey<Integer>,
>> Iterable<UserT>>, FileResult<DestinationT>>
>> >>> => GatherResults<ResultT> extends PTransform<PCollection<ResultT>,
>> PCollection<List<ResultT>>>
>> >>> => FinalizeTempFileBundles extends
>> PTransform<PCollection<List<FileResult<DestinationT>>>,
>> WriteFilesResult<DestinationT>>
>> >>>
>> >>> I replaced FileResult<DestinationT> with KV<DestinationT, ResourceId>
>> so I can use pre-compute SMB destination file names for the transforms.
>> >>> I'm also thinking of parameterizing ShardedKey<Integer> for SMB's
>> bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are
>> private and easy to change/pull out.
>> >>>
>> >>> OTOH they are somewhat coupled with the package private
>> {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of
>> temp file handing logic lives). Might be hard to decouple either modifying
>> existing code or creating new transforms, unless if we re-write most of
>> FileBasedSink from scratch.
>> >>>
>> >>> Let me know if I'm on the wrong track.
>> >>>
>> >>> WIP Branch https://github.com/spotify/beam/tree/neville/write-files
>> >>>
>> >>> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath <
>> chamik...@google.com> wrote:
>> >>>>
>> >>>>
>> >>>>
>> >>>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <rober...@google.com>
>> wrote:
>> >>>>>
>> >>>>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <
>> kirpic...@google.com> wrote:
>> >>>>> >
>> >>>>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <
>> rober...@google.com> wrote:
>> >>>>> >>
>> >>>>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <
>> neville....@gmail.com> wrote:
>> >>>>> >> >
>> >>>>> >> > Thanks Robert. Agree with the FileIO point. I'll look into it
>> and see what needs to be done.
>> >>>>> >> >
>> >>>>> >> > Eugene pointed out that we shouldn't build on
>> FileBased{Source,Sink}. So for writes I'll probably build on top of
>> WriteFiles.
>> >>>>> >>
>> >>>>> >> Meaning it could be parameterized by FileIO.Sink, right?
>> >>>>> >>
>> >>>>> >>
>> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
>> >>>>> >
>> >>>>> > Yeah if possible, parameterize FileIO.Sink.
>> >>>>> > I would recommend against building on top of WriteFiles either.
>> FileIO being implemented on top of WriteFiles was supposed to be a
>> temporary measure - the longer-term plan was to rewrite it from scratch
>> (albeit with a similar structure) and throw away WriteFiles.
>> >>>>> > If possible, I would recommend to pursue this path: if there are
>> parts of WriteFiles you want to reuse, I would recommend to implement them
>> as new transforms, not at all tied to FileBasedSink (but ok if tied to
>> FileIO.Sink), with the goal in mind that FileIO could be rewritten on top
>> of these new transforms, or maybe parts of WriteFiles could be swapped out
>> for them incrementally.
>> >>>>>
>> >>>>> Thanks for the feedback. There's a lot that was done, but looking at
>> >>>>> the code it feels like there's a lot that was not yet done either,
>> and
>> >>>>> the longer-term plan wasn't clear (though perhaps I'm just not
>> finding
>> >>>>> the right docs).
>> >>>>
>> >>>>
>> >>>> I'm also a bit unfamiliar with original plans for WriteFiles and for
>> updating source interfaces, but I prefer not significantly modifying
>> existing IO transforms to suite the SMB use-case. If there are existing
>> pieces of code that can be easily re-used that is fine, but existing
>> sources/sinks are designed to perform a PCollection -> file transformation
>> and vice versa with (usually) runner determined sharding. Things specific
>> to SMB such as sharding restrictions, writing metadata to a separate file,
>> reading multiple files from the same abstraction, does not sound like
>> features that should be included in our usual file read/write transforms.
>> >>>>
>> >>>>>
>> >>>>> >> > Read might be a bigger change w.r.t. collocating ordered
>> elements across files within a bucket and TBH I'm not even sure where to
>> start.
>> >>>>> >>
>> >>>>> >> Yeah, here we need an interface that gives us ReadableFile ->
>> >>>>> >> Iterable<T>. There are existing
>> PTransform<PCollection<ReadableFile>,
>> >>>>> >> PCollection<T>> but such an interface is insufficient to extract
>> >>>>> >> ordered records per shard. It seems the only concrete
>> implementations
>> >>>>> >> are based on FileBasedSource, which we'd like to avoid, but
>> there's no
>> >>>>> >> alternative. An SDF, if exposed, would likely be overkill and
>> >>>>> >> cumbersome to call (given the reflection machinery involved in
>> >>>>> >> invoking DoFns).
>> >>>>> >
>> >>>>> > Seems easiest to just define a new regular Java interface for
>> this.
>> >>>>> > Could be either, indeed, ReadableFile -> Iterable<T>, or
>> something analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void.
>> Depends on how much control over iteration you need.
>> >>>>>
>> >>>>> For this application, one wants to iterate over several files in
>> >>>>> parallel. The downside of a new interface is that it shares almost
>> >>>>> nothing with the "normal" sources (e.g. when features (or
>> >>>>> optimizations) get added to one, they won't get added to the other).
>> >>>>
>> >>>>
>> >>>>>
>> >>>>>
>> >>>>> > And yes, DoFn's including SDF's are not designed to be used as
>> Java interfaces per se. If you need DoFn machinery in this interface (e.g.
>> side inputs), use Contextful - s.apache.org/context-fn.
>> >>>>>
>> >>>>> Yeah, one of the primary downsides to the NewDoFns is how hard it is
>> >>>>> to build new DoFns out of others (or, really, use them in any
>> context
>> >>>>> other than as an argument to ParDo).
>> >>>>>
>> >>>>> >> > I'll file separate PRs for core changes needed for discussion.
>> WDYT?
>> >>>>> >>
>> >>>>> >> Sounds good.
>> >>>>
>> >>>>
>> >>>> +1
>> >>>>
>> >>>>>
>> >>>>> >>
>> >>>>> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <
>> rober...@google.com> wrote:
>> >>>>> >> >>
>> >>>>> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <
>> neville....@gmail.com> wrote:
>> >>>>> >> >> >
>> >>>>> >> >> > Forking this thread to discuss action items regarding the
>> change. We can keep technical discussion in the original thread.
>> >>>>> >> >> >
>> >>>>> >> >> > Background: our SMB POC showed promising performance & cost
>> saving improvements and we'd like to adopt it for production soon (by EOY).
>> We want to contribute it to Beam so it's better generalized and maintained.
>> We also want to avoid divergence between our internal version and the PR
>> while it's in progress, specifically any breaking change in the produced
>> SMB data.
>> >>>>> >> >>
>> >>>>> >> >> All good goals.
>> >>>>> >> >>
>> >>>>> >> >> > To achieve that I'd like to propose a few action items.
>> >>>>> >> >> >
>> >>>>> >> >> > 1. Reach a consensus about bucket and shard strategy, key
>> handling, bucket file and metadata format, etc., anything that affect
>> produced SMB data.
>> >>>>> >> >> > 2. Revise the existing PR according to #1
>> >>>>> >> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink,
>> Compression, etc., but keep the existing file level abstraction
>> >>>>> >> >> > 4. (Optional) Merge code into extensions::smb but mark
>> clearly as @experimental
>> >>>>> >> >> > 5. Incorporate ideas from the discussion, e.g. ShardingFn,
>> GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
>> >>>>> >> >> >
>> >>>>> >> >> > #1-4 gives us something usable in the short term, while #1
>> guarantees that production data produced today are usable when #5 lands on
>> master. #4 also gives early adopters a chance to give feedback.
>> >>>>> >> >> > Due to the scope of #5, it might take much longer and a
>> couple of big PRs to achieve, which we can keep iterating on.
>> >>>>> >> >> >
>> >>>>> >> >> > What are your thoughts on this?
>> >>>>> >> >>
>> >>>>> >> >> I would like to see some resolution on the FileIO
>> abstractions before
>> >>>>> >> >> merging into experimental. (We have a FileBasedSink that
>> would mostly
>> >>>>> >> >> already work, so it's a matter of coming up with an analogous
>> Source
>> >>>>> >> >> interface.) Specifically I would not want to merge a set of
>> per file
>> >>>>> >> >> type smb IOs without a path forward to this or the
>> determination that
>> >>>>> >> >> it's not possible/desirable.
>>
>
>
> --
> Cheers,
> Gleb
>

Reply via email to