That coder is added extra as a re-map stage from "original" key to new ShardAwareKey ... But pipeline might get broken I guess. Very fair point. I am having a second thought pass over this and will try to simplify it much more
On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote: > I haven't looked at the PR in depth yet, but it appears that someone > running a pipeline today who then tries to update post this PR will have > the coder change to DefaultShardKeyCoder, even if they haven't picked any > custom function. Is that correct, or am I misreading things? > > Reuven > > On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >> Hm, what would be the scenario? Have version A running with original >> random sharding and then start version B where I change sharding to some >> custom function? >> So I have to enable the pipeline to digest old keys from GBK restored >> state and also work with new keys produced to GBK going forward? >> >> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote: >> >>> Initial thought on PR: we usually try to limit changing coders in these >>> types of transforms to better support runners that allow in-place updates >>> of pipelines. Can this be done without changing the coder? >>> >>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jozo.vil...@gmail.com> >>> wrote: >>> >>>> I have created a PR for enhancing WriteFiles for custom sharding >>>> function. >>>> https://github.com/apache/beam/pull/8438 >>>> >>>> If this sort of change looks good, then next step would be to use in in >>>> Flink runner transform override. Let me know what do you think >>>> >>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jozo.vil...@gmail.com> >>>> wrote: >>>> >>>>> I guess it is fine to enable shardingFn control only on WriteFiles >>>>> level rather than FileIO. On WriteFiles it can be manipulated in >>>>> PTransformOverride by runner. >>>>> >>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> Yes, a hook would have to be added to allow specifying a different >>>>>> function for choosing the shard number (I assume the problem is that >>>>>> there >>>>>> are cases where the current random assignment is not good?). However this >>>>>> can be set using PTransformOverride, we ideally shouldn't force the user >>>>>> to >>>>>> know details of the runner when writing their code. >>>>>> >>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <m...@apache.org> >>>>>> wrote: >>>>>> >>>>>>> Reuven is talking about PTransformOverride, e.g. >>>>>>> FlinkTransformOverrides. We already use this to determine the number >>>>>>> of >>>>>>> shards in case of Runner-determined sharding. >>>>>>> >>>>>>> Not sure if that would work for Jozef's case because setting the >>>>>>> number >>>>>>> of shards is not enough. We want to set the shard key directly and >>>>>>> that >>>>>>> logic is buried inside WriteFiles. >>>>>>> >>>>>>> -Max >>>>>>> >>>>>>> On 25.04.19 16:30, Reuven Lax wrote: >>>>>>> > Actually the runner is free to perform surgery on the graph. The >>>>>>> > FlinkRunner can insert a custom function to determine the sharding >>>>>>> keys. >>>>>>> > >>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek < >>>>>>> jozo.vil...@gmail.com >>>>>>> > <mailto:jozo.vil...@gmail.com>> wrote: >>>>>>> > >>>>>>> > Right now, sharding can be specified only via target >>>>>>> `shardCount`, >>>>>>> > be it user or runner. Next to configurable shardCount, I am >>>>>>> > proposing to be able to pass also a function which will allow >>>>>>> to the >>>>>>> > user (or runner) control how is shard determined and what key >>>>>>> will >>>>>>> > be used to represent it >>>>>>> > >>>>>>> > interface ShardingFunction[UserT, DestinationT, ShardKeyT] >>>>>>> extends >>>>>>> > Serializable { >>>>>>> > ShardKeyT assign(DestinationT destination, UserT element, >>>>>>> > shardCount: Integer); >>>>>>> > } >>>>>>> > >>>>>>> > Default implementation can be what is right now => random >>>>>>> shard >>>>>>> > encapsulated as ShardedKey<Integer>. >>>>>>> > >>>>>>> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <re...@google.com >>>>>>> > <mailto:re...@google.com>> wrote: >>>>>>> > >>>>>>> > If sharding is not specified, then the semantics are >>>>>>> > "runner-determined sharding." The DataflowRunner already >>>>>>> takes >>>>>>> > advantage of this to impose its own sharding if the user >>>>>>> hasn't >>>>>>> > specified an explicit one. Could the Flink runner do the >>>>>>> same >>>>>>> > instead of pushing this to the users? >>>>>>> > >>>>>>> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels >>>>>>> > <m...@apache.org <mailto:m...@apache.org>> wrote: >>>>>>> > >>>>>>> > Hi Jozef, >>>>>>> > >>>>>>> > For sharding in FileIO there are basically two options: >>>>>>> > >>>>>>> > (1) num_shards ~= num_workers => bad spread of the load >>>>>>> > across workers >>>>>>> > (2) num_shards >> num_workers => good spread of the >>>>>>> load >>>>>>> > across workers, >>>>>>> > but huge number of files >>>>>>> > >>>>>>> > Your approach would give users control over the >>>>>>> sharding >>>>>>> > keys such that >>>>>>> > they could be adjusted to spread load more evenly. >>>>>>> > >>>>>>> > I'd like to hear from Beam IO experts if that would >>>>>>> make sense. >>>>>>> > >>>>>>> > Thanks, >>>>>>> > Max >>>>>>> > >>>>>>> > On 25.04.19 08:52, Jozef Vilcek wrote: >>>>>>> > > Hello, >>>>>>> > > >>>>>>> > > Right now, if someone needs sharded files via >>>>>>> FileIO, >>>>>>> > there is only one >>>>>>> > > option which is random (round robin) shard >>>>>>> assignment per >>>>>>> > element and it >>>>>>> > > always use ShardedKey<Integer> as a key for the GBK >>>>>>> which >>>>>>> > follows. >>>>>>> > > >>>>>>> > > I would like to generalize this and have a >>>>>>> possibility to >>>>>>> > provide some >>>>>>> > > ShardingFn[UserT, DestinationT, ShardKeyT] via >>>>>>> FileIO. >>>>>>> > > What I am mainly after is, to have a possibility to >>>>>>> > provide optimisation >>>>>>> > > for Flink runtime and pass in a special function >>>>>>> which >>>>>>> > generates shard >>>>>>> > > keys in a way that they are evenly spread among >>>>>>> workers >>>>>>> > (BEAM-5865). >>>>>>> > > >>>>>>> > > Would such extension for FileIO make sense? If yes, >>>>>>> I >>>>>>> > would create a >>>>>>> > > ticket for it and try to draft a PR. >>>>>>> > > >>>>>>> > > Best, >>>>>>> > > Jozef >>>>>>> > >>>>>>> >>>>>>