Actually the runner is free to perform surgery on the graph. The FlinkRunner can insert a custom function to determine the sharding keys.
On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote: > Right now, sharding can be specified only via target `shardCount`, be it > user or runner. Next to configurable shardCount, I am proposing to be able > to pass also a function which will allow to the user (or runner) control > how is shard determined and what key will be used to represent it > > interface ShardingFunction[UserT, DestinationT, ShardKeyT] extends > Serializable { > ShardKeyT assign(DestinationT destination, UserT element, shardCount: > Integer); > } > > Default implementation can be what is right now => random shard > encapsulated as ShardedKey<Integer>. > > > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <re...@google.com> wrote: > >> If sharding is not specified, then the semantics are "runner-determined >> sharding." The DataflowRunner already takes advantage of this to impose its >> own sharding if the user hasn't specified an explicit one. Could the Flink >> runner do the same instead of pushing this to the users? >> >> On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels <m...@apache.org> >> wrote: >> >>> Hi Jozef, >>> >>> For sharding in FileIO there are basically two options: >>> >>> (1) num_shards ~= num_workers => bad spread of the load across workers >>> (2) num_shards >> num_workers => good spread of the load across workers, >>> but huge number of files >>> >>> Your approach would give users control over the sharding keys such that >>> they could be adjusted to spread load more evenly. >>> >>> I'd like to hear from Beam IO experts if that would make sense. >>> >>> Thanks, >>> Max >>> >>> On 25.04.19 08:52, Jozef Vilcek wrote: >>> > Hello, >>> > >>> > Right now, if someone needs sharded files via FileIO, there is only >>> one >>> > option which is random (round robin) shard assignment per element and >>> it >>> > always use ShardedKey<Integer> as a key for the GBK which follows. >>> > >>> > I would like to generalize this and have a possibility to provide some >>> > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO. >>> > What I am mainly after is, to have a possibility to provide >>> optimisation >>> > for Flink runtime and pass in a special function which generates shard >>> > keys in a way that they are evenly spread among workers (BEAM-5865). >>> > >>> > Would such extension for FileIO make sense? If yes, I would create a >>> > ticket for it and try to draft a PR. >>> > >>> > Best, >>> > Jozef >>> >>