I haven't looked at the PR in depth yet, but it appears that someone running a pipeline today who then tries to update post this PR will have the coder change to DefaultShardKeyCoder, even if they haven't picked any custom function. Is that correct, or am I misreading things?
Reuven On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote: > Hm, what would be the scenario? Have version A running with original > random sharding and then start version B where I change sharding to some > custom function? > So I have to enable the pipeline to digest old keys from GBK restored > state and also work with new keys produced to GBK going forward? > > On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote: > >> Initial thought on PR: we usually try to limit changing coders in these >> types of transforms to better support runners that allow in-place updates >> of pipelines. Can this be done without changing the coder? >> >> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jozo.vil...@gmail.com> >> wrote: >> >>> I have created a PR for enhancing WriteFiles for custom sharding >>> function. >>> https://github.com/apache/beam/pull/8438 >>> >>> If this sort of change looks good, then next step would be to use in in >>> Flink runner transform override. Let me know what do you think >>> >>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jozo.vil...@gmail.com> >>> wrote: >>> >>>> I guess it is fine to enable shardingFn control only on WriteFiles >>>> level rather than FileIO. On WriteFiles it can be manipulated in >>>> PTransformOverride by runner. >>>> >>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote: >>>> >>>>> Yes, a hook would have to be added to allow specifying a different >>>>> function for choosing the shard number (I assume the problem is that there >>>>> are cases where the current random assignment is not good?). However this >>>>> can be set using PTransformOverride, we ideally shouldn't force the user >>>>> to >>>>> know details of the runner when writing their code. >>>>> >>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <m...@apache.org> >>>>> wrote: >>>>> >>>>>> Reuven is talking about PTransformOverride, e.g. >>>>>> FlinkTransformOverrides. We already use this to determine the number >>>>>> of >>>>>> shards in case of Runner-determined sharding. >>>>>> >>>>>> Not sure if that would work for Jozef's case because setting the >>>>>> number >>>>>> of shards is not enough. We want to set the shard key directly and >>>>>> that >>>>>> logic is buried inside WriteFiles. >>>>>> >>>>>> -Max >>>>>> >>>>>> On 25.04.19 16:30, Reuven Lax wrote: >>>>>> > Actually the runner is free to perform surgery on the graph. The >>>>>> > FlinkRunner can insert a custom function to determine the sharding >>>>>> keys. >>>>>> > >>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <jozo.vil...@gmail.com >>>>>> > <mailto:jozo.vil...@gmail.com>> wrote: >>>>>> > >>>>>> > Right now, sharding can be specified only via target >>>>>> `shardCount`, >>>>>> > be it user or runner. Next to configurable shardCount, I am >>>>>> > proposing to be able to pass also a function which will allow >>>>>> to the >>>>>> > user (or runner) control how is shard determined and what key >>>>>> will >>>>>> > be used to represent it >>>>>> > >>>>>> > interface ShardingFunction[UserT, DestinationT, ShardKeyT] >>>>>> extends >>>>>> > Serializable { >>>>>> > ShardKeyT assign(DestinationT destination, UserT element, >>>>>> > shardCount: Integer); >>>>>> > } >>>>>> > >>>>>> > Default implementation can be what is right now => random shard >>>>>> > encapsulated as ShardedKey<Integer>. >>>>>> > >>>>>> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <re...@google.com >>>>>> > <mailto:re...@google.com>> wrote: >>>>>> > >>>>>> > If sharding is not specified, then the semantics are >>>>>> > "runner-determined sharding." The DataflowRunner already >>>>>> takes >>>>>> > advantage of this to impose its own sharding if the user >>>>>> hasn't >>>>>> > specified an explicit one. Could the Flink runner do the >>>>>> same >>>>>> > instead of pushing this to the users? >>>>>> > >>>>>> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels >>>>>> > <m...@apache.org <mailto:m...@apache.org>> wrote: >>>>>> > >>>>>> > Hi Jozef, >>>>>> > >>>>>> > For sharding in FileIO there are basically two options: >>>>>> > >>>>>> > (1) num_shards ~= num_workers => bad spread of the load >>>>>> > across workers >>>>>> > (2) num_shards >> num_workers => good spread of the load >>>>>> > across workers, >>>>>> > but huge number of files >>>>>> > >>>>>> > Your approach would give users control over the sharding >>>>>> > keys such that >>>>>> > they could be adjusted to spread load more evenly. >>>>>> > >>>>>> > I'd like to hear from Beam IO experts if that would >>>>>> make sense. >>>>>> > >>>>>> > Thanks, >>>>>> > Max >>>>>> > >>>>>> > On 25.04.19 08:52, Jozef Vilcek wrote: >>>>>> > > Hello, >>>>>> > > >>>>>> > > Right now, if someone needs sharded files via FileIO, >>>>>> > there is only one >>>>>> > > option which is random (round robin) shard >>>>>> assignment per >>>>>> > element and it >>>>>> > > always use ShardedKey<Integer> as a key for the GBK >>>>>> which >>>>>> > follows. >>>>>> > > >>>>>> > > I would like to generalize this and have a >>>>>> possibility to >>>>>> > provide some >>>>>> > > ShardingFn[UserT, DestinationT, ShardKeyT] via >>>>>> FileIO. >>>>>> > > What I am mainly after is, to have a possibility to >>>>>> > provide optimisation >>>>>> > > for Flink runtime and pass in a special function >>>>>> which >>>>>> > generates shard >>>>>> > > keys in a way that they are evenly spread among >>>>>> workers >>>>>> > (BEAM-5865). >>>>>> > > >>>>>> > > Would such extension for FileIO make sense? If yes, I >>>>>> > would create a >>>>>> > > ticket for it and try to draft a PR. >>>>>> > > >>>>>> > > Best, >>>>>> > > Jozef >>>>>> > >>>>>> >>>>>