>From the peanut gallery, keeping a separate implementation for SMB seems
fine. Dependencies are serious liabilities for both upstream and
downstream. It seems like the reuse angle is generating extra work, and
potentially making already-complex implementations more complex, instead of
helping things.

Kenn

On Wed, Jul 24, 2019 at 11:59 AM Neville Li <neville....@gmail.com> wrote:

> I spoke too soon. Turns out for unsharded writes, numShards can't be
> determined until the last finalize transform, which is again different from
> the current SMB proposal (static number of buckets & shards).
> I'll end up with more code specialized for SMB in order to generalize
> existing sink code, which I think we all want to avoid.
>
> Seems the only option is duplicating some logic like temp file handling,
> which is exactly what we did in the original PR.
> I can reuse Compression & Sink<T> for file level writes but that seems
> about the most I can reuse right now.
>
> On Tue, Jul 23, 2019 at 6:36 PM Neville Li <neville....@gmail.com> wrote:
>
>> So I spent one afternoon trying some ideas for reusing the last few
>> transforms WriteFiles.
>>
>> WriteShardsIntoTempFilesFn extends DoFn<KV<*ShardedKey<Integer>*,
>> Iterable<UserT>>, *FileResult*<DestinationT>>
>> => GatherResults<ResultT> extends PTransform<PCollection<ResultT>,
>> PCollection<List<ResultT>>>
>> => FinalizeTempFileBundles extends PTransform<PCollection<List<
>> *FileResult<DestinationT>*>>, WriteFilesResult<DestinationT>>
>>
>> I replaced FileResult<DestinationT> with KV<DestinationT, ResourceId> so
>> I can use pre-compute SMB destination file names for the transforms.
>> I'm also thinking of parameterizing ShardedKey<Integer> for SMB's
>> bucket/shard to reuse WriteShardsIntoTempFilesFn. These transforms are
>> private and easy to change/pull out.
>>
>> OTOH they are somewhat coupled with the package private
>> {Avro,Text,TFRecord}Sink and their WriteOperation impl (where the bulk of
>> temp file handing logic lives). Might be hard to decouple either modifying
>> existing code or creating new transforms, unless if we re-write most of
>> FileBasedSink from scratch.
>>
>> Let me know if I'm on the wrong track.
>>
>> WIP Branch https://github.com/spotify/beam/tree/neville/write-files
>>
>> On Tue, Jul 23, 2019 at 4:22 PM Chamikara Jayalath <chamik...@google.com>
>> wrote:
>>
>>>
>>>
>>> On Mon, Jul 22, 2019 at 1:41 PM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> On Mon, Jul 22, 2019 at 7:39 PM Eugene Kirpichov <kirpic...@google.com>
>>>> wrote:
>>>> >
>>>> > On Mon, Jul 22, 2019 at 7:49 AM Robert Bradshaw <rober...@google.com>
>>>> wrote:
>>>> >>
>>>> >> On Mon, Jul 22, 2019 at 4:04 PM Neville Li <neville....@gmail.com>
>>>> wrote:
>>>> >> >
>>>> >> > Thanks Robert. Agree with the FileIO point. I'll look into it and
>>>> see what needs to be done.
>>>> >> >
>>>> >> > Eugene pointed out that we shouldn't build on
>>>> FileBased{Source,Sink}. So for writes I'll probably build on top of
>>>> WriteFiles.
>>>> >>
>>>> >> Meaning it could be parameterized by FileIO.Sink, right?
>>>> >>
>>>> >>
>>>> https://github.com/apache/beam/blob/release-2.13.0/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileIO.java#L779
>>>> >
>>>> > Yeah if possible, parameterize FileIO.Sink.
>>>> > I would recommend against building on top of WriteFiles either.
>>>> FileIO being implemented on top of WriteFiles was supposed to be a
>>>> temporary measure - the longer-term plan was to rewrite it from scratch
>>>> (albeit with a similar structure) and throw away WriteFiles.
>>>> > If possible, I would recommend to pursue this path: if there are
>>>> parts of WriteFiles you want to reuse, I would recommend to implement them
>>>> as new transforms, not at all tied to FileBasedSink (but ok if tied to
>>>> FileIO.Sink), with the goal in mind that FileIO could be rewritten on top
>>>> of these new transforms, or maybe parts of WriteFiles could be swapped out
>>>> for them incrementally.
>>>>
>>>> Thanks for the feedback. There's a lot that was done, but looking at
>>>> the code it feels like there's a lot that was not yet done either, and
>>>> the longer-term plan wasn't clear (though perhaps I'm just not finding
>>>> the right docs).
>>>>
>>>
>>> I'm also a bit unfamiliar with original plans for WriteFiles and for
>>> updating source interfaces, but I prefer not significantly modifying
>>> existing IO transforms to suite the SMB use-case. If there are existing
>>> pieces of code that can be easily re-used that is fine, but existing
>>> sources/sinks are designed to perform a PCollection -> file transformation
>>> and vice versa with (usually) runner determined sharding. Things specific
>>> to SMB such as sharding restrictions, writing metadata to a separate file,
>>> reading multiple files from the same abstraction, does not sound like
>>> features that should be included in our usual file read/write transforms.
>>>
>>>
>>>> >> > Read might be a bigger change w.r.t. collocating ordered elements
>>>> across files within a bucket and TBH I'm not even sure where to start.
>>>> >>
>>>> >> Yeah, here we need an interface that gives us ReadableFile ->
>>>> >> Iterable<T>. There are existing PTransform<PCollection<ReadableFile>,
>>>> >> PCollection<T>> but such an interface is insufficient to extract
>>>> >> ordered records per shard. It seems the only concrete implementations
>>>> >> are based on FileBasedSource, which we'd like to avoid, but there's
>>>> no
>>>> >> alternative. An SDF, if exposed, would likely be overkill and
>>>> >> cumbersome to call (given the reflection machinery involved in
>>>> >> invoking DoFns).
>>>> >
>>>> > Seems easiest to just define a new regular Java interface for this.
>>>> > Could be either, indeed, ReadableFile -> Iterable<T>, or something
>>>> analogous, e.g. (ReadableFile, OutputReceiver<T>) -> void. Depends on how
>>>> much control over iteration you need.
>>>>
>>>> For this application, one wants to iterate over several files in
>>>> parallel. The downside of a new interface is that it shares almost
>>>> nothing with the "normal" sources (e.g. when features (or
>>>> optimizations) get added to one, they won't get added to the other).
>>>
>>>
>>>
>>>>
>>>> > And yes, DoFn's including SDF's are not designed to be used as Java
>>>> interfaces per se. If you need DoFn machinery in this interface (e.g. side
>>>> inputs), use Contextful - s.apache.org/context-fn.
>>>>
>>>> Yeah, one of the primary downsides to the NewDoFns is how hard it is
>>>> to build new DoFns out of others (or, really, use them in any context
>>>> other than as an argument to ParDo).
>>>>
>>>> >> > I'll file separate PRs for core changes needed for discussion.
>>>> WDYT?
>>>> >>
>>>> >> Sounds good.
>>>>
>>>
>>> +1
>>>
>>>
>>>> >>
>>>> >> > On Mon, Jul 22, 2019 at 4:20 AM Robert Bradshaw <
>>>> rober...@google.com> wrote:
>>>> >> >>
>>>> >> >> On Fri, Jul 19, 2019 at 5:16 PM Neville Li <neville....@gmail.com>
>>>> wrote:
>>>> >> >> >
>>>> >> >> > Forking this thread to discuss action items regarding the
>>>> change. We can keep technical discussion in the original thread.
>>>> >> >> >
>>>> >> >> > Background: our SMB POC showed promising performance & cost
>>>> saving improvements and we'd like to adopt it for production soon (by EOY).
>>>> We want to contribute it to Beam so it's better generalized and maintained.
>>>> We also want to avoid divergence between our internal version and the PR
>>>> while it's in progress, specifically any breaking change in the produced
>>>> SMB data.
>>>> >> >>
>>>> >> >> All good goals.
>>>> >> >>
>>>> >> >> > To achieve that I'd like to propose a few action items.
>>>> >> >> >
>>>> >> >> > 1. Reach a consensus about bucket and shard strategy, key
>>>> handling, bucket file and metadata format, etc., anything that affect
>>>> produced SMB data.
>>>> >> >> > 2. Revise the existing PR according to #1
>>>> >> >> > 3. Reduce duplicate file IO logic by reusing FileIO.Sink,
>>>> Compression, etc., but keep the existing file level abstraction
>>>> >> >> > 4. (Optional) Merge code into extensions::smb but mark clearly
>>>> as @experimental
>>>> >> >> > 5. Incorporate ideas from the discussion, e.g. ShardingFn,
>>>> GroupByKeyAndSortValues, FileIO generalization, key URN, etc.
>>>> >> >> >
>>>> >> >> > #1-4 gives us something usable in the short term, while #1
>>>> guarantees that production data produced today are usable when #5 lands on
>>>> master. #4 also gives early adopters a chance to give feedback.
>>>> >> >> > Due to the scope of #5, it might take much longer and a couple
>>>> of big PRs to achieve, which we can keep iterating on.
>>>> >> >> >
>>>> >> >> > What are your thoughts on this?
>>>> >> >>
>>>> >> >> I would like to see some resolution on the FileIO abstractions
>>>> before
>>>> >> >> merging into experimental. (We have a FileBasedSink that would
>>>> mostly
>>>> >> >> already work, so it's a matter of coming up with an analogous
>>>> Source
>>>> >> >> interface.) Specifically I would not want to merge a set of per
>>>> file
>>>> >> >> type smb IOs without a path forward to this or the determination
>>>> that
>>>> >> >> it's not possible/desirable.
>>>>
>>>

Reply via email to