I spoke too soon. Turns out for unsharded writes, numShards can't be
determined until the last finalize transform, which is again different from
the current SMB proposal (static number of buckets & shards).
I'll end up with more code specialized for SMB in order to generalize
existing sink code, which I think we all want to avoid.

Seems the only option is duplicating some logic like temp file handling,
which is exactly what we did in the original PR.
I can reuse Compression & Sink<T> for file level writes but that seems
about the most I can reuse right now.

On Tue, Jul 23, 2019 at 6:36 PM Neville Li <neville....@gmail.com> wrote:

> So I spent one afternoon trying some ideas for reusing the last few
> transforms WriteFiles.
>
> WriteShardsIntoTempFilesFn extends DoFn<KV<*ShardedKey<Integer>*,
> Iterable<UserT>>, *FileResult*<DestinationT>>
> => GatherResults<ResultT> extends PTransform<PCollection<ResultT>,
> PCollection<List<ResultT>>>
> => FinalizeTempFileBundles extends PTransform<PCollection<List<
> *FileResult<DestinationT>*>>, WriteFilesResult<DestinationT>>
>
> I replaced FileResult<DestinationT> with KV<DestinationT, ResourceId> so I
> can use pre-compute SMB destination file names for the transforms.
> I'm also thinking of parameterizing ShardedKey<Integer> for SMB's
> bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are
> private and easy to change/pull out.
>
> OTOH they are somewhat coupled with the package private
> {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of
> temp file handing logic lives). Might be hard to decouple either modifying
> existing code or creating new transforms, unless if we re-write most of
> FileBasedSink from scratch.
>
> Let me know if I'm on the wrong track.
>
> WIP Branch https://github.com/spotify/beam/tree/neville/write-files
>
> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath <chamik...@google.com>
> wrote:
>
>>
>>
>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <kirpic...@google.com>
>>> wrote:
>>> >
>>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>> >>
>>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <neville....@gmail.com>
>>> wrote:
>>> >> >
>>> >> > Thanks Robert. Agree with the FileIO point. I'll look into it and
>>> see what needs to be done.
>>> >> >
>>> >> > Eugene pointed out that we shouldn't build on
>>> FileBased{Source,Sink}. So for writes I'll probably build on top of
>>> WriteFiles.
>>> >>
>>> >> Meaning it could be parameterized by FileIO.Sink, right?
>>> >>
>>> >>
>>> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
>>> >
>>> > Yeah if possible, parameterize FileIO.Sink.
>>> > I would recommend against building on top of WriteFiles either. FileIO
>>> being implemented on top of WriteFiles was supposed to be a temporary
>>> measure - the longer-term plan was to rewrite it from scratch (albeit with
>>> a similar structure) and throw away WriteFiles.
>>> > If possible, I would recommend to pursue this path: if there are parts
>>> of WriteFiles you want to reuse, I would recommend to implement them as new
>>> transforms, not at all tied to FileBasedSink (but ok if tied to
>>> FileIO.Sink), with the goal in mind that FileIO could be rewritten on top
>>> of these new transforms, or maybe parts of WriteFiles could be swapped out
>>> for them incrementally.
>>>
>>> Thanks for the feedback. There's a lot that was done, but looking at
>>> the code it feels like there's a lot that was not yet done either, and
>>> the longer-term plan wasn't clear (though perhaps I'm just not finding
>>> the right docs).
>>>
>>
>> I'm also a bit unfamiliar with original plans for WriteFiles and for
>> updating source interfaces, but I prefer not significantly modifying
>> existing IO transforms to suite the SMB use-case. If there are existing
>> pieces of code that can be easily re-used that is fine, but existing
>> sources/sinks are designed to perform a PCollection -> file transformation
>> and vice versa with (usually) runner determined sharding. Things specific
>> to SMB such as sharding restrictions, writing metadata to a separate file,
>> reading multiple files from the same abstraction, does not sound like
>> features that should be included in our usual file read/write transforms.
>>
>>
>>> >> > Read might be a bigger change w.r.t. collocating ordered elements
>>> across files within a bucket and TBH I'm not even sure where to start.
>>> >>
>>> >> Yeah, here we need an interface that gives us ReadableFile ->
>>> >> Iterable<T>. There are existing PTransform<PCollection<ReadableFile>,
>>> >> PCollection<T>> but such an interface is insufficient to extract
>>> >> ordered records per shard. It seems the only concrete implementations
>>> >> are based on FileBasedSource, which we'd like to avoid, but there's no
>>> >> alternative. An SDF, if exposed, would likely be overkill and
>>> >> cumbersome to call (given the reflection machinery involved in
>>> >> invoking DoFns).
>>> >
>>> > Seems easiest to just define a new regular Java interface for this.
>>> > Could be either, indeed, ReadableFile -> Iterable<T>, or something
>>> analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void. Depends on how
>>> much control over iteration you need.
>>>
>>> For this application, one wants to iterate over several files in
>>> parallel. The downside of a new interface is that it shares almost
>>> nothing with the "normal" sources (e.g. when features (or
>>> optimizations) get added to one, they won't get added to the other).
>>
>>
>>
>>>
>>> > And yes, DoFn's including SDF's are not designed to be used as Java
>>> interfaces per se. If you need DoFn machinery in this interface (e.g. side
>>> inputs), use Contextful - s.apache.org/context-fn.
>>>
>>> Yeah, one of the primary downsides to the NewDoFns is how hard it is
>>> to build new DoFns out of others (or, really, use them in any context
>>> other than as an argument to ParDo).
>>>
>>> >> > I'll file separate PRs for core changes needed for discussion. WDYT?
>>> >>
>>> >> Sounds good.
>>>
>>
>> +1
>>
>>
>>> >>
>>> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <
>>> rober...@google.com> wrote:
>>> >> >>
>>> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <neville....@gmail.com>
>>> wrote:
>>> >> >> >
>>> >> >> > Forking this thread to discuss action items regarding the
>>> change. We can keep technical discussion in the original thread.
>>> >> >> >
>>> >> >> > Background: our SMB POC showed promising performance & cost
>>> saving improvements and we'd like to adopt it for production soon (by EOY).
>>> We want to contribute it to Beam so it's better generalized and maintained.
>>> We also want to avoid divergence between our internal version and the PR
>>> while it's in progress, specifically any breaking change in the produced
>>> SMB data.
>>> >> >>
>>> >> >> All good goals.
>>> >> >>
>>> >> >> > To achieve that I'd like to propose a few action items.
>>> >> >> >
>>> >> >> > 1. Reach a consensus about bucket and shard strategy, key
>>> handling, bucket file and metadata format, etc., anything that affect
>>> produced SMB data.
>>> >> >> > 2. Revise the existing PR according to #1
>>> >> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink,
>>> Compression, etc., but keep the existing file level abstraction
>>> >> >> > 4. (Optional) Merge code into extensions::smb but mark clearly
>>> as @experimental
>>> >> >> > 5. Incorporate ideas from the discussion, e.g. ShardingFn,
>>> GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
>>> >> >> >
>>> >> >> > #1-4 gives us something usable in the short term, while #1
>>> guarantees that production data produced today are usable when #5 lands on
>>> master. #4 also gives early adopters a chance to give feedback.
>>> >> >> > Due to the scope of #5, it might take much longer and a couple
>>> of big PRs to achieve, which we can keep iterating on.
>>> >> >> >
>>> >> >> > What are your thoughts on this?
>>> >> >>
>>> >> >> I would like to see some resolution on the FileIO abstractions
>>> before
>>> >> >> merging into experimental. (We have a FileBasedSink that would
>>> mostly
>>> >> >> already work, so it's a matter of coming up with an analogous
>>> Source
>>> >> >> interface.) Specifically I would not want to merge a set of per
>>> file
>>> >> >> type smb IOs without a path forward to this or the determination
>>> that
>>> >> >> it's not possible/desirable.
>>>
>>

Reply via email to