Great, let me know when to take another look at the PR! Reuven
On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote: > That coder is added extra as a re-map stage from "original" key to new > ShardAwareKey ... But pipeline might get broken I guess. > Very fair point. I am having a second thought pass over this and will try > to simplify it much more > > On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote: > >> I haven't looked at the PR in depth yet, but it appears that someone >> running a pipeline today who then tries to update post this PR will have >> the coder change to DefaultShardKeyCoder, even if they haven't picked any >> custom function. Is that correct, or am I misreading things? >> >> Reuven >> >> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jozo.vil...@gmail.com> >> wrote: >> >>> Hm, what would be the scenario? Have version A running with original >>> random sharding and then start version B where I change sharding to some >>> custom function? >>> So I have to enable the pipeline to digest old keys from GBK restored >>> state and also work with new keys produced to GBK going forward? >>> >>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote: >>> >>>> Initial thought on PR: we usually try to limit changing coders in these >>>> types of transforms to better support runners that allow in-place updates >>>> of pipelines. Can this be done without changing the coder? >>>> >>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jozo.vil...@gmail.com> >>>> wrote: >>>> >>>>> I have created a PR for enhancing WriteFiles for custom sharding >>>>> function. >>>>> https://github.com/apache/beam/pull/8438 >>>>> >>>>> If this sort of change looks good, then next step would be to use in >>>>> in Flink runner transform override. Let me know what do you think >>>>> >>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jozo.vil...@gmail.com> >>>>> wrote: >>>>> >>>>>> I guess it is fine to enable shardingFn control only on WriteFiles >>>>>> level rather than FileIO. On WriteFiles it can be manipulated in >>>>>> PTransformOverride by runner. >>>>>> >>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> Yes, a hook would have to be added to allow specifying a different >>>>>>> function for choosing the shard number (I assume the problem is that >>>>>>> there >>>>>>> are cases where the current random assignment is not good?). However >>>>>>> this >>>>>>> can be set using PTransformOverride, we ideally shouldn't force the >>>>>>> user to >>>>>>> know details of the runner when writing their code. >>>>>>> >>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <m...@apache.org> >>>>>>> wrote: >>>>>>> >>>>>>>> Reuven is talking about PTransformOverride, e.g. >>>>>>>> FlinkTransformOverrides. We already use this to determine the >>>>>>>> number of >>>>>>>> shards in case of Runner-determined sharding. >>>>>>>> >>>>>>>> Not sure if that would work for Jozef's case because setting the >>>>>>>> number >>>>>>>> of shards is not enough. We want to set the shard key directly and >>>>>>>> that >>>>>>>> logic is buried inside WriteFiles. >>>>>>>> >>>>>>>> -Max >>>>>>>> >>>>>>>> On 25.04.19 16:30, Reuven Lax wrote: >>>>>>>> > Actually the runner is free to perform surgery on the graph. The >>>>>>>> > FlinkRunner can insert a custom function to determine the >>>>>>>> sharding keys. >>>>>>>> > >>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek < >>>>>>>> jozo.vil...@gmail.com >>>>>>>> > <mailto:jozo.vil...@gmail.com>> wrote: >>>>>>>> > >>>>>>>> > Right now, sharding can be specified only via target >>>>>>>> `shardCount`, >>>>>>>> > be it user or runner. Next to configurable shardCount, I am >>>>>>>> > proposing to be able to pass also a function which will allow >>>>>>>> to the >>>>>>>> > user (or runner) control how is shard determined and what key >>>>>>>> will >>>>>>>> > be used to represent it >>>>>>>> > >>>>>>>> > interface ShardingFunction[UserT, DestinationT, ShardKeyT] >>>>>>>> extends >>>>>>>> > Serializable { >>>>>>>> > ShardKeyT assign(DestinationT destination, UserT element, >>>>>>>> > shardCount: Integer); >>>>>>>> > } >>>>>>>> > >>>>>>>> > Default implementation can be what is right now => random >>>>>>>> shard >>>>>>>> > encapsulated as ShardedKey<Integer>. >>>>>>>> > >>>>>>>> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <re...@google.com >>>>>>>> > <mailto:re...@google.com>> wrote: >>>>>>>> > >>>>>>>> > If sharding is not specified, then the semantics are >>>>>>>> > "runner-determined sharding." The DataflowRunner already >>>>>>>> takes >>>>>>>> > advantage of this to impose its own sharding if the user >>>>>>>> hasn't >>>>>>>> > specified an explicit one. Could the Flink runner do the >>>>>>>> same >>>>>>>> > instead of pushing this to the users? >>>>>>>> > >>>>>>>> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels >>>>>>>> > <m...@apache.org <mailto:m...@apache.org>> wrote: >>>>>>>> > >>>>>>>> > Hi Jozef, >>>>>>>> > >>>>>>>> > For sharding in FileIO there are basically two >>>>>>>> options: >>>>>>>> > >>>>>>>> > (1) num_shards ~= num_workers => bad spread of the >>>>>>>> load >>>>>>>> > across workers >>>>>>>> > (2) num_shards >> num_workers => good spread of the >>>>>>>> load >>>>>>>> > across workers, >>>>>>>> > but huge number of files >>>>>>>> > >>>>>>>> > Your approach would give users control over the >>>>>>>> sharding >>>>>>>> > keys such that >>>>>>>> > they could be adjusted to spread load more evenly. >>>>>>>> > >>>>>>>> > I'd like to hear from Beam IO experts if that would >>>>>>>> make sense. >>>>>>>> > >>>>>>>> > Thanks, >>>>>>>> > Max >>>>>>>> > >>>>>>>> > On 25.04.19 08:52, Jozef Vilcek wrote: >>>>>>>> > > Hello, >>>>>>>> > > >>>>>>>> > > Right now, if someone needs sharded files via >>>>>>>> FileIO, >>>>>>>> > there is only one >>>>>>>> > > option which is random (round robin) shard >>>>>>>> assignment per >>>>>>>> > element and it >>>>>>>> > > always use ShardedKey<Integer> as a key for the >>>>>>>> GBK which >>>>>>>> > follows. >>>>>>>> > > >>>>>>>> > > I would like to generalize this and have a >>>>>>>> possibility to >>>>>>>> > provide some >>>>>>>> > > ShardingFn[UserT, DestinationT, ShardKeyT] via >>>>>>>> FileIO. >>>>>>>> > > What I am mainly after is, to have a possibility to >>>>>>>> > provide optimisation >>>>>>>> > > for Flink runtime and pass in a special function >>>>>>>> which >>>>>>>> > generates shard >>>>>>>> > > keys in a way that they are evenly spread among >>>>>>>> workers >>>>>>>> > (BEAM-5865). >>>>>>>> > > >>>>>>>> > > Would such extension for FileIO make sense? If >>>>>>>> yes, I >>>>>>>> > would create a >>>>>>>> > > ticket for it and try to draft a PR. >>>>>>>> > > >>>>>>>> > > Best, >>>>>>>> > > Jozef >>>>>>>> > >>>>>>>> >>>>>>>