Initial thought on PR: we usually try to limit changing coders in these
types of transforms to better support runners that allow in-place updates
of pipelines. Can this be done without changing the coder?

On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote:

> I have created a PR for enhancing WriteFiles for custom sharding function.
> https://github.com/apache/beam/pull/8438
>
> If this sort of change looks good, then next step would be to use in in
> Flink runner transform override. Let me know what do you think
>
> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jozo.vil...@gmail.com>
> wrote:
>
>> I guess it is fine to enable shardingFn control only on WriteFiles level
>> rather than FileIO. On WriteFiles it can be manipulated in
>> PTransformOverride by runner.
>>
>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Yes, a hook would have to be added to allow specifying a different
>>> function for choosing the shard number (I assume the problem is that there
>>> are cases where the current random assignment is not good?). However this
>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>> know details of the runner when writing their code.
>>>
>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <m...@apache.org>
>>> wrote:
>>>
>>>> Reuven is talking about PTransformOverride, e.g.
>>>> FlinkTransformOverrides. We already use this to determine the number of
>>>> shards in case of Runner-determined sharding.
>>>>
>>>> Not sure if that would work for Jozef's case because setting the number
>>>> of shards is not enough. We want to set the shard key directly and that
>>>> logic is buried inside WriteFiles.
>>>>
>>>> -Max
>>>>
>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>> > Actually the runner is free to perform surgery on the graph. The
>>>> > FlinkRunner can insert a custom function to determine the sharding
>>>> keys.
>>>> >
>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <jozo.vil...@gmail.com
>>>> > <mailto:jozo.vil...@gmail.com>> wrote:
>>>> >
>>>> >     Right now, sharding can be specified only via target `shardCount`,
>>>> >     be it user or runner. Next to configurable shardCount, I am
>>>> >     proposing to be able to pass also a function which will allow to
>>>> the
>>>> >     user (or runner) control how is shard determined and what key will
>>>> >     be used to represent it
>>>> >
>>>> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>> extends
>>>> >     Serializable {
>>>> >         ShardKeyT assign(DestinationT destination, UserT element,
>>>> >     shardCount: Integer);
>>>> >     }
>>>> >
>>>> >     Default implementation can be what is right now =>  random shard
>>>> >     encapsulated as ShardedKey<Integer>.
>>>> >
>>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <re...@google.com
>>>> >     <mailto:re...@google.com>> wrote:
>>>> >
>>>> >         If sharding is not specified, then the semantics are
>>>> >         "runner-determined sharding." The DataflowRunner already takes
>>>> >         advantage of this to impose its own sharding if the user
>>>> hasn't
>>>> >         specified an explicit one. Could the Flink runner do the same
>>>> >         instead of pushing this to the users?
>>>> >
>>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>> >         <m...@apache.org <mailto:m...@apache.org>> wrote:
>>>> >
>>>> >             Hi Jozef,
>>>> >
>>>> >             For sharding in FileIO there are basically two options:
>>>> >
>>>> >             (1) num_shards ~= num_workers => bad spread of the load
>>>> >             across workers
>>>> >             (2) num_shards >> num_workers => good spread of the load
>>>> >             across workers,
>>>> >             but huge number of files
>>>> >
>>>> >             Your approach would give users control over the sharding
>>>> >             keys such that
>>>> >             they could be adjusted to spread load more evenly.
>>>> >
>>>> >             I'd like to hear from Beam IO experts if that would make
>>>> sense.
>>>> >
>>>> >             Thanks,
>>>> >             Max
>>>> >
>>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>>> >              > Hello,
>>>> >              >
>>>> >              > Right now, if someone needs sharded files via FileIO,
>>>> >             there is only one
>>>> >              > option which is random (round robin) shard assignment
>>>> per
>>>> >             element and it
>>>> >              > always use ShardedKey<Integer> as a key for the GBK
>>>> which
>>>> >             follows.
>>>> >              >
>>>> >              > I would like to generalize this and have a possibility
>>>> to
>>>> >             provide some
>>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
>>>> >              > What I am mainly after is, to have a possibility to
>>>> >             provide optimisation
>>>> >              > for Flink runtime and pass in a special function which
>>>> >             generates shard
>>>> >              > keys in a way that they are evenly spread among workers
>>>> >             (BEAM-5865).
>>>> >              >
>>>> >              > Would such extension for FileIO make sense? If yes, I
>>>> >             would create a
>>>> >              > ticket for it and try to draft a PR.
>>>> >              >
>>>> >              > Best,
>>>> >              > Jozef
>>>> >
>>>>
>>>

Reply via email to