So you were able to use this in Flink? Did you see performance gains?

On Sun, May 5, 2019 at 5:25 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote:

> Sorry, it took a while. I wanted to actually use this extension for
> WriteFiles in Flink and see it works and that proved too be a bit bumpy.
> PR is at https://github.com/apache/beam/pull/8499
>
> On Thu, May 2, 2019 at 3:22 PM Reuven Lax <re...@google.com> wrote:
>
>> Great, let me know when to take another look at the PR!
>>
>> Reuven
>>
>> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek <jozo.vil...@gmail.com>
>> wrote:
>>
>>> That coder is added extra as a re-map stage from "original" key to new
>>> ShardAwareKey ... But pipeline might get broken I guess.
>>> Very fair point. I am having a second thought pass over this and will
>>> try to simplify it much more
>>>
>>> On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> I haven't looked at the PR in depth yet, but it appears that someone
>>>> running a pipeline today who then tries to update post this PR will have
>>>> the coder change to DefaultShardKeyCoder, even if they haven't picked any
>>>> custom function. Is that correct, or am I misreading things?
>>>>
>>>> Reuven
>>>>
>>>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hm, what would be the scenario? Have version A running with original
>>>>> random sharding and then start version B where I change sharding to some
>>>>> custom function?
>>>>> So I have to enable the pipeline to digest old keys from GBK restored
>>>>> state and also work with new keys produced to GBK going forward?
>>>>>
>>>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>>>>>
>>>>>> Initial thought on PR: we usually try to limit changing coders in
>>>>>> these types of transforms to better support runners that allow in-place
>>>>>> updates of pipelines. Can this be done without changing the coder?
>>>>>>
>>>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>>>>> function.
>>>>>>> https://github.com/apache/beam/pull/8438
>>>>>>>
>>>>>>> If this sort of change looks good, then next step would be to use in
>>>>>>> in Flink runner transform override. Let me know what do you think
>>>>>>>
>>>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>>>>> PTransformOverride by runner.
>>>>>>>>
>>>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Yes, a hook would have to be added to allow specifying a different
>>>>>>>>> function for choosing the shard number (I assume the problem is that 
>>>>>>>>> there
>>>>>>>>> are cases where the current random assignment is not good?). However 
>>>>>>>>> this
>>>>>>>>> can be set using PTransformOverride, we ideally shouldn't force the 
>>>>>>>>> user to
>>>>>>>>> know details of the runner when writing their code.
>>>>>>>>>
>>>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <m...@apache.org>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>>>>> FlinkTransformOverrides. We already use this to determine the
>>>>>>>>>> number of
>>>>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>>>>
>>>>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>>>>> number
>>>>>>>>>> of shards is not enough. We want to set the shard key directly
>>>>>>>>>> and that
>>>>>>>>>> logic is buried inside WriteFiles.
>>>>>>>>>>
>>>>>>>>>> -Max
>>>>>>>>>>
>>>>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>>>>> > Actually the runner is free to perform surgery on the graph.
>>>>>>>>>> The
>>>>>>>>>> > FlinkRunner can insert a custom function to determine the
>>>>>>>>>> sharding keys.
>>>>>>>>>> >
>>>>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>>>>>>>>> jozo.vil...@gmail.com
>>>>>>>>>> > <mailto:jozo.vil...@gmail.com>> wrote:
>>>>>>>>>> >
>>>>>>>>>> >     Right now, sharding can be specified only via target
>>>>>>>>>> `shardCount`,
>>>>>>>>>> >     be it user or runner. Next to configurable shardCount, I am
>>>>>>>>>> >     proposing to be able to pass also a function which will
>>>>>>>>>> allow to the
>>>>>>>>>> >     user (or runner) control how is shard determined and what
>>>>>>>>>> key will
>>>>>>>>>> >     be used to represent it
>>>>>>>>>> >
>>>>>>>>>> >     interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>>>>>>>>> extends
>>>>>>>>>> >     Serializable {
>>>>>>>>>> >         ShardKeyT assign(DestinationT destination, UserT
>>>>>>>>>> element,
>>>>>>>>>> >     shardCount: Integer);
>>>>>>>>>> >     }
>>>>>>>>>> >
>>>>>>>>>> >     Default implementation can be what is right now =>  random
>>>>>>>>>> shard
>>>>>>>>>> >     encapsulated as ShardedKey<Integer>.
>>>>>>>>>> >
>>>>>>>>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <
>>>>>>>>>> re...@google.com
>>>>>>>>>> >     <mailto:re...@google.com>> wrote:
>>>>>>>>>> >
>>>>>>>>>> >         If sharding is not specified, then the semantics are
>>>>>>>>>> >         "runner-determined sharding." The DataflowRunner
>>>>>>>>>> already takes
>>>>>>>>>> >         advantage of this to impose its own sharding if the
>>>>>>>>>> user hasn't
>>>>>>>>>> >         specified an explicit one. Could the Flink runner do
>>>>>>>>>> the same
>>>>>>>>>> >         instead of pushing this to the users?
>>>>>>>>>> >
>>>>>>>>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>>>>>> >         <m...@apache.org <mailto:m...@apache.org>> wrote:
>>>>>>>>>> >
>>>>>>>>>> >             Hi Jozef,
>>>>>>>>>> >
>>>>>>>>>> >             For sharding in FileIO there are basically two
>>>>>>>>>> options:
>>>>>>>>>> >
>>>>>>>>>> >             (1) num_shards ~= num_workers => bad spread of the
>>>>>>>>>> load
>>>>>>>>>> >             across workers
>>>>>>>>>> >             (2) num_shards >> num_workers => good spread of the
>>>>>>>>>> load
>>>>>>>>>> >             across workers,
>>>>>>>>>> >             but huge number of files
>>>>>>>>>> >
>>>>>>>>>> >             Your approach would give users control over the
>>>>>>>>>> sharding
>>>>>>>>>> >             keys such that
>>>>>>>>>> >             they could be adjusted to spread load more evenly.
>>>>>>>>>> >
>>>>>>>>>> >             I'd like to hear from Beam IO experts if that would
>>>>>>>>>> make sense.
>>>>>>>>>> >
>>>>>>>>>> >             Thanks,
>>>>>>>>>> >             Max
>>>>>>>>>> >
>>>>>>>>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>>>>>> >              > Hello,
>>>>>>>>>> >              >
>>>>>>>>>> >              > Right now, if someone needs sharded files via
>>>>>>>>>> FileIO,
>>>>>>>>>> >             there is only one
>>>>>>>>>> >              > option which is random (round robin) shard
>>>>>>>>>> assignment per
>>>>>>>>>> >             element and it
>>>>>>>>>> >              > always use ShardedKey<Integer> as a key for the
>>>>>>>>>> GBK which
>>>>>>>>>> >             follows.
>>>>>>>>>> >              >
>>>>>>>>>> >              > I would like to generalize this and have a
>>>>>>>>>> possibility to
>>>>>>>>>> >             provide some
>>>>>>>>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>>>>>> FileIO.
>>>>>>>>>> >              > What I am mainly after is, to have a possibility
>>>>>>>>>> to
>>>>>>>>>> >             provide optimisation
>>>>>>>>>> >              > for Flink runtime and pass in a special function
>>>>>>>>>> which
>>>>>>>>>> >             generates shard
>>>>>>>>>> >              > keys in a way that they are evenly spread among
>>>>>>>>>> workers
>>>>>>>>>> >             (BEAM-5865).
>>>>>>>>>> >              >
>>>>>>>>>> >              > Would such extension for FileIO make sense? If
>>>>>>>>>> yes, I
>>>>>>>>>> >             would create a
>>>>>>>>>> >              > ticket for it and try to draft a PR.
>>>>>>>>>> >              >
>>>>>>>>>> >              > Best,
>>>>>>>>>> >              > Jozef
>>>>>>>>>> >
>>>>>>>>>>
>>>>>>>>>

Reply via email to