Initial thought on PR: we usually try to limit changing coders in these types of transforms to better support runners that allow in-place updates of pipelines. Can this be done without changing the coder?
On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote: > I have created a PR for enhancing WriteFiles for custom sharding function. > https://github.com/apache/beam/pull/8438 > > If this sort of change looks good, then next step would be to use in in > Flink runner transform override. Let me know what do you think > > On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jozo.vil...@gmail.com> > wrote: > >> I guess it is fine to enable shardingFn control only on WriteFiles level >> rather than FileIO. On WriteFiles it can be manipulated in >> PTransformOverride by runner. >> >> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote: >> >>> Yes, a hook would have to be added to allow specifying a different >>> function for choosing the shard number (I assume the problem is that there >>> are cases where the current random assignment is not good?). However this >>> can be set using PTransformOverride, we ideally shouldn't force the user to >>> know details of the runner when writing their code. >>> >>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <m...@apache.org> >>> wrote: >>> >>>> Reuven is talking about PTransformOverride, e.g. >>>> FlinkTransformOverrides. We already use this to determine the number of >>>> shards in case of Runner-determined sharding. >>>> >>>> Not sure if that would work for Jozef's case because setting the number >>>> of shards is not enough. We want to set the shard key directly and that >>>> logic is buried inside WriteFiles. >>>> >>>> -Max >>>> >>>> On 25.04.19 16:30, Reuven Lax wrote: >>>> > Actually the runner is free to perform surgery on the graph. The >>>> > FlinkRunner can insert a custom function to determine the sharding >>>> keys. >>>> > >>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <jozo.vil...@gmail.com >>>> > <mailto:jozo.vil...@gmail.com>> wrote: >>>> > >>>> > Right now, sharding can be specified only via target `shardCount`, >>>> > be it user or runner. Next to configurable shardCount, I am >>>> > proposing to be able to pass also a function which will allow to >>>> the >>>> > user (or runner) control how is shard determined and what key will >>>> > be used to represent it >>>> > >>>> > interface ShardingFunction[UserT, DestinationT, ShardKeyT] >>>> extends >>>> > Serializable { >>>> > ShardKeyT assign(DestinationT destination, UserT element, >>>> > shardCount: Integer); >>>> > } >>>> > >>>> > Default implementation can be what is right now => random shard >>>> > encapsulated as ShardedKey<Integer>. >>>> > >>>> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <re...@google.com >>>> > <mailto:re...@google.com>> wrote: >>>> > >>>> > If sharding is not specified, then the semantics are >>>> > "runner-determined sharding." The DataflowRunner already takes >>>> > advantage of this to impose its own sharding if the user >>>> hasn't >>>> > specified an explicit one. Could the Flink runner do the same >>>> > instead of pushing this to the users? >>>> > >>>> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels >>>> > <m...@apache.org <mailto:m...@apache.org>> wrote: >>>> > >>>> > Hi Jozef, >>>> > >>>> > For sharding in FileIO there are basically two options: >>>> > >>>> > (1) num_shards ~= num_workers => bad spread of the load >>>> > across workers >>>> > (2) num_shards >> num_workers => good spread of the load >>>> > across workers, >>>> > but huge number of files >>>> > >>>> > Your approach would give users control over the sharding >>>> > keys such that >>>> > they could be adjusted to spread load more evenly. >>>> > >>>> > I'd like to hear from Beam IO experts if that would make >>>> sense. >>>> > >>>> > Thanks, >>>> > Max >>>> > >>>> > On 25.04.19 08:52, Jozef Vilcek wrote: >>>> > > Hello, >>>> > > >>>> > > Right now, if someone needs sharded files via FileIO, >>>> > there is only one >>>> > > option which is random (round robin) shard assignment >>>> per >>>> > element and it >>>> > > always use ShardedKey<Integer> as a key for the GBK >>>> which >>>> > follows. >>>> > > >>>> > > I would like to generalize this and have a possibility >>>> to >>>> > provide some >>>> > > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO. >>>> > > What I am mainly after is, to have a possibility to >>>> > provide optimisation >>>> > > for Flink runtime and pass in a special function which >>>> > generates shard >>>> > > keys in a way that they are evenly spread among workers >>>> > (BEAM-5865). >>>> > > >>>> > > Would such extension for FileIO make sense? If yes, I >>>> > would create a >>>> > > ticket for it and try to draft a PR. >>>> > > >>>> > > Best, >>>> > > Jozef >>>> > >>>> >>>