Actually the runner is free to perform surgery on the graph. The
FlinkRunner can insert a custom function to determine the sharding keys.

On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote:

> Right now, sharding can be specified only via target `shardCount`, be it
> user or runner. Next to configurable shardCount, I am proposing to be able
> to pass also a function which will allow to the user (or runner) control
> how is shard determined and what key will be used to represent it
>
> interface ShardingFunction[UserT, DestinationT, ShardKeyT]  extends
> Serializable {
>    ShardKeyT assign(DestinationT destination, UserT element, shardCount:
> Integer);
> }
>
> Default implementation can be what is right now =>  random shard
> encapsulated as ShardedKey<Integer>.
>
>
> On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <re...@google.com> wrote:
>
>> If sharding is not specified, then the semantics are "runner-determined
>> sharding." The DataflowRunner already takes advantage of this to impose its
>> own sharding if the user hasn't specified an explicit one. Could the Flink
>> runner do the same instead of pushing this to the users?
>>
>> On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels <m...@apache.org>
>> wrote:
>>
>>> Hi Jozef,
>>>
>>> For sharding in FileIO there are basically two options:
>>>
>>> (1) num_shards ~= num_workers => bad spread of the load across workers
>>> (2) num_shards >> num_workers => good spread of the load across workers,
>>> but huge number of files
>>>
>>> Your approach would give users control over the sharding keys such that
>>> they could be adjusted to spread load more evenly.
>>>
>>> I'd like to hear from Beam IO experts if that would make sense.
>>>
>>> Thanks,
>>> Max
>>>
>>> On 25.04.19 08:52, Jozef Vilcek wrote:
>>> > Hello,
>>> >
>>> > Right now, if someone needs sharded files via FileIO, there is only
>>> one
>>> > option which is random (round robin) shard assignment per element and
>>> it
>>> > always use ShardedKey<Integer> as a key for the GBK which follows.
>>> >
>>> > I would like to generalize this and have a possibility to provide some
>>> > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
>>> > What I am mainly after is, to have a possibility to provide
>>> optimisation
>>> > for Flink runtime and pass in a special function which generates shard
>>> > keys in a way that they are evenly spread among workers (BEAM-5865).
>>> >
>>> > Would such extension for FileIO make sense? If yes, I would create a
>>> > ticket for it and try to draft a PR.
>>> >
>>> > Best,
>>> > Jozef
>>>
>>

Reply via email to