Great, let me know when to take another look at the PR!

Reuven

On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote:

> That coder is added extra as a re-map stage from "original" key to new
> ShardAwareKey ... But pipeline might get broken I guess.
> Very fair point. I am having a second thought pass over this and will try
> to simplify it much more
>
> On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote:
>
>> I haven't looked at the PR in depth yet, but it appears that someone
>> running a pipeline today who then tries to update post this PR will have
>> the coder change to DefaultShardKeyCoder, even if they haven't picked any
>> custom function. Is that correct, or am I misreading things?
>>
>> Reuven
>>
>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jozo.vil...@gmail.com>
>> wrote:
>>
>>> Hm, what would be the scenario? Have version A running with original
>>> random sharding and then start version B where I change sharding to some
>>> custom function?
>>> So I have to enable the pipeline to digest old keys from GBK restored
>>> state and also work with new keys produced to GBK going forward?
>>>
>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Initial thought on PR: we usually try to limit changing coders in these
>>>> types of transforms to better support runners that allow in-place updates
>>>> of pipelines. Can this be done without changing the coder?
>>>>
>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>>> wrote:
>>>>
>>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>>> function.
>>>>> https://github.com/apache/beam/pull/8438
>>>>>
>>>>> If this sort of change looks good, then next step would be to use in
>>>>> in Flink runner transform override. Let me know what do you think
>>>>>
>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>>> PTransformOverride by runner.
>>>>>>
>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Yes, a hook would have to be added to allow specifying a different
>>>>>>> function for choosing the shard number (I assume the problem is that 
>>>>>>> there
>>>>>>> are cases where the current random assignment is not good?). However 
>>>>>>> this
>>>>>>> can be set using PTransformOverride, we ideally shouldn't force the 
>>>>>>> user to
>>>>>>> know details of the runner when writing their code.
>>>>>>>
>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <m...@apache.org>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>>> FlinkTransformOverrides. We already use this to determine the
>>>>>>>> number of
>>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>>
>>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>>> number
>>>>>>>> of shards is not enough. We want to set the shard key directly and
>>>>>>>> that
>>>>>>>> logic is buried inside WriteFiles.
>>>>>>>>
>>>>>>>> -Max
>>>>>>>>
>>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>>> > Actually the runner is free to perform surgery on the graph. The
>>>>>>>> > FlinkRunner can insert a custom function to determine the
>>>>>>>> sharding keys.
>>>>>>>> >
>>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>>>>>>> jozo.vil...@gmail.com
>>>>>>>> > <mailto:jozo.vil...@gmail.com>> wrote:
>>>>>>>> >
>>>>>>>> >     Right now, sharding can be specified only via target
>>>>>>>> `shardCount`,
>>>>>>>> >     be it user or runner. Next to configurable shardCount, I am
>>>>>>>> >     proposing to be able to pass also a function which will allow
>>>>>>>> to the
>>>>>>>> >     user (or runner) control how is shard determined and what key
>>>>>>>> will
>>>>>>>> >     be used to represent it
>>>>>>>> >
>>>>>>>> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>>>>> extends
>>>>>>>> >     Serializable {
>>>>>>>> >         ShardKeyT assign(DestinationT destination, UserT element,
>>>>>>>> >     shardCount: Integer);
>>>>>>>> >     }
>>>>>>>> >
>>>>>>>> >     Default implementation can be what is right now =>  random
>>>>>>>> shard
>>>>>>>> >     encapsulated as ShardedKey<Integer>.
>>>>>>>> >
>>>>>>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <re...@google.com
>>>>>>>> >     <mailto:re...@google.com>> wrote:
>>>>>>>> >
>>>>>>>> >         If sharding is not specified, then the semantics are
>>>>>>>> >         "runner-determined sharding." The DataflowRunner already
>>>>>>>> takes
>>>>>>>> >         advantage of this to impose its own sharding if the user
>>>>>>>> hasn't
>>>>>>>> >         specified an explicit one. Could the Flink runner do the
>>>>>>>> same
>>>>>>>> >         instead of pushing this to the users?
>>>>>>>> >
>>>>>>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>>>> >         <m...@apache.org <mailto:m...@apache.org>> wrote:
>>>>>>>> >
>>>>>>>> >             Hi Jozef,
>>>>>>>> >
>>>>>>>> >             For sharding in FileIO there are basically two
>>>>>>>> options:
>>>>>>>> >
>>>>>>>> >             (1) num_shards ~= num_workers => bad spread of the
>>>>>>>> load
>>>>>>>> >             across workers
>>>>>>>> >             (2) num_shards >> num_workers => good spread of the
>>>>>>>> load
>>>>>>>> >             across workers,
>>>>>>>> >             but huge number of files
>>>>>>>> >
>>>>>>>> >             Your approach would give users control over the
>>>>>>>> sharding
>>>>>>>> >             keys such that
>>>>>>>> >             they could be adjusted to spread load more evenly.
>>>>>>>> >
>>>>>>>> >             I'd like to hear from Beam IO experts if that would
>>>>>>>> make sense.
>>>>>>>> >
>>>>>>>> >             Thanks,
>>>>>>>> >             Max
>>>>>>>> >
>>>>>>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>>>> >              > Hello,
>>>>>>>> >              >
>>>>>>>> >              > Right now, if someone needs sharded files via
>>>>>>>> FileIO,
>>>>>>>> >             there is only one
>>>>>>>> >              > option which is random (round robin) shard
>>>>>>>> assignment per
>>>>>>>> >             element and it
>>>>>>>> >              > always use ShardedKey<Integer> as a key for the
>>>>>>>> GBK which
>>>>>>>> >             follows.
>>>>>>>> >              >
>>>>>>>> >              > I would like to generalize this and have a
>>>>>>>> possibility to
>>>>>>>> >             provide some
>>>>>>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>>>> FileIO.
>>>>>>>> >              > What I am mainly after is, to have a possibility to
>>>>>>>> >             provide optimisation
>>>>>>>> >              > for Flink runtime and pass in a special function
>>>>>>>> which
>>>>>>>> >             generates shard
>>>>>>>> >              > keys in a way that they are evenly spread among
>>>>>>>> workers
>>>>>>>> >             (BEAM-5865).
>>>>>>>> >              >
>>>>>>>> >              > Would such extension for FileIO make sense? If
>>>>>>>> yes, I
>>>>>>>> >             would create a
>>>>>>>> >              > ticket for it and try to draft a PR.
>>>>>>>> >              >
>>>>>>>> >              > Best,
>>>>>>>> >              > Jozef
>>>>>>>> >
>>>>>>>>
>>>>>>>

Reply via email to