Yes, I was able to use it in Flink and I do see performance gain. I also
see, which is important for me, more predictable and uniform memory usage
among workers

On Wed, May 8, 2019 at 7:19 AM Reuven Lax <re...@google.com> wrote:

> So you were able to use this in Flink? Did you see performance gains?
>
> On Sun, May 5, 2019 at 5:25 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote:
>
>> Sorry, it took a while. I wanted to actually use this extension for
>> WriteFiles in Flink and see it works and that proved too be a bit bumpy.
>> PR is at https://github.com/apache/beam/pull/8499
>>
>> On Thu, May 2, 2019 at 3:22 PM Reuven Lax <re...@google.com> wrote:
>>
>>> Great, let me know when to take another look at the PR!
>>>
>>> Reuven
>>>
>>> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>> wrote:
>>>
>>>> That coder is added extra as a re-map stage from "original" key to new
>>>> ShardAwareKey ... But pipeline might get broken I guess.
>>>> Very fair point. I am having a second thought pass over this and will
>>>> try to simplify it much more
>>>>
>>>> On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> I haven't looked at the PR in depth yet, but it appears that someone
>>>>> running a pipeline today who then tries to update post this PR will have
>>>>> the coder change to DefaultShardKeyCoder, even if they haven't picked any
>>>>> custom function. Is that correct, or am I misreading things?
>>>>>
>>>>> Reuven
>>>>>
>>>>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hm, what would be the scenario? Have version A running with original
>>>>>> random sharding and then start version B where I change sharding to some
>>>>>> custom function?
>>>>>> So I have to enable the pipeline to digest old keys from GBK restored
>>>>>> state and also work with new keys produced to GBK going forward?
>>>>>>
>>>>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Initial thought on PR: we usually try to limit changing coders in
>>>>>>> these types of transforms to better support runners that allow in-place
>>>>>>> updates of pipelines. Can this be done without changing the coder?
>>>>>>>
>>>>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I have created a PR for enhancing WriteFiles for custom sharding
>>>>>>>> function.
>>>>>>>> https://github.com/apache/beam/pull/8438
>>>>>>>>
>>>>>>>> If this sort of change looks good, then next step would be to use
>>>>>>>> in in Flink runner transform override. Let me know what do you think
>>>>>>>>
>>>>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jozo.vil...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>>>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>>>>>>>> PTransformOverride by runner.
>>>>>>>>>
>>>>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Yes, a hook would have to be added to allow specifying a
>>>>>>>>>> different function for choosing the shard number (I assume the 
>>>>>>>>>> problem is
>>>>>>>>>> that there are cases where the current random assignment is not 
>>>>>>>>>> good?).
>>>>>>>>>> However this can be set using PTransformOverride, we ideally 
>>>>>>>>>> shouldn't
>>>>>>>>>> force the user to know details of the runner when writing their code.
>>>>>>>>>>
>>>>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <
>>>>>>>>>> m...@apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> Reuven is talking about PTransformOverride, e.g.
>>>>>>>>>>> FlinkTransformOverrides. We already use this to determine the
>>>>>>>>>>> number of
>>>>>>>>>>> shards in case of Runner-determined sharding.
>>>>>>>>>>>
>>>>>>>>>>> Not sure if that would work for Jozef's case because setting the
>>>>>>>>>>> number
>>>>>>>>>>> of shards is not enough. We want to set the shard key directly
>>>>>>>>>>> and that
>>>>>>>>>>> logic is buried inside WriteFiles.
>>>>>>>>>>>
>>>>>>>>>>> -Max
>>>>>>>>>>>
>>>>>>>>>>> On 25.04.19 16:30, Reuven Lax wrote:
>>>>>>>>>>> > Actually the runner is free to perform surgery on the graph.
>>>>>>>>>>> The
>>>>>>>>>>> > FlinkRunner can insert a custom function to determine the
>>>>>>>>>>> sharding keys.
>>>>>>>>>>> >
>>>>>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>>>>>>>>>> jozo.vil...@gmail.com
>>>>>>>>>>> > <mailto:jozo.vil...@gmail.com>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >     Right now, sharding can be specified only via target
>>>>>>>>>>> `shardCount`,
>>>>>>>>>>> >     be it user or runner. Next to configurable shardCount, I am
>>>>>>>>>>> >     proposing to be able to pass also a function which will
>>>>>>>>>>> allow to the
>>>>>>>>>>> >     user (or runner) control how is shard determined and what
>>>>>>>>>>> key will
>>>>>>>>>>> >     be used to represent it
>>>>>>>>>>> >
>>>>>>>>>>> >     interface ShardingFunction[UserT, DestinationT,
>>>>>>>>>>> ShardKeyT]  extends
>>>>>>>>>>> >     Serializable {
>>>>>>>>>>> >         ShardKeyT assign(DestinationT destination, UserT
>>>>>>>>>>> element,
>>>>>>>>>>> >     shardCount: Integer);
>>>>>>>>>>> >     }
>>>>>>>>>>> >
>>>>>>>>>>> >     Default implementation can be what is right now =>  random
>>>>>>>>>>> shard
>>>>>>>>>>> >     encapsulated as ShardedKey<Integer>.
>>>>>>>>>>> >
>>>>>>>>>>> >     On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <
>>>>>>>>>>> re...@google.com
>>>>>>>>>>> >     <mailto:re...@google.com>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >         If sharding is not specified, then the semantics are
>>>>>>>>>>> >         "runner-determined sharding." The DataflowRunner
>>>>>>>>>>> already takes
>>>>>>>>>>> >         advantage of this to impose its own sharding if the
>>>>>>>>>>> user hasn't
>>>>>>>>>>> >         specified an explicit one. Could the Flink runner do
>>>>>>>>>>> the same
>>>>>>>>>>> >         instead of pushing this to the users?
>>>>>>>>>>> >
>>>>>>>>>>> >         On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>>>>>>>>>> >         <m...@apache.org <mailto:m...@apache.org>> wrote:
>>>>>>>>>>> >
>>>>>>>>>>> >             Hi Jozef,
>>>>>>>>>>> >
>>>>>>>>>>> >             For sharding in FileIO there are basically two
>>>>>>>>>>> options:
>>>>>>>>>>> >
>>>>>>>>>>> >             (1) num_shards ~= num_workers => bad spread of the
>>>>>>>>>>> load
>>>>>>>>>>> >             across workers
>>>>>>>>>>> >             (2) num_shards >> num_workers => good spread of
>>>>>>>>>>> the load
>>>>>>>>>>> >             across workers,
>>>>>>>>>>> >             but huge number of files
>>>>>>>>>>> >
>>>>>>>>>>> >             Your approach would give users control over the
>>>>>>>>>>> sharding
>>>>>>>>>>> >             keys such that
>>>>>>>>>>> >             they could be adjusted to spread load more evenly.
>>>>>>>>>>> >
>>>>>>>>>>> >             I'd like to hear from Beam IO experts if that
>>>>>>>>>>> would make sense.
>>>>>>>>>>> >
>>>>>>>>>>> >             Thanks,
>>>>>>>>>>> >             Max
>>>>>>>>>>> >
>>>>>>>>>>> >             On 25.04.19 08:52, Jozef Vilcek wrote:
>>>>>>>>>>> >              > Hello,
>>>>>>>>>>> >              >
>>>>>>>>>>> >              > Right now, if someone needs sharded files via
>>>>>>>>>>> FileIO,
>>>>>>>>>>> >             there is only one
>>>>>>>>>>> >              > option which is random (round robin) shard
>>>>>>>>>>> assignment per
>>>>>>>>>>> >             element and it
>>>>>>>>>>> >              > always use ShardedKey<Integer> as a key for the
>>>>>>>>>>> GBK which
>>>>>>>>>>> >             follows.
>>>>>>>>>>> >              >
>>>>>>>>>>> >              > I would like to generalize this and have a
>>>>>>>>>>> possibility to
>>>>>>>>>>> >             provide some
>>>>>>>>>>> >              > ShardingFn[UserT, DestinationT, ShardKeyT] via
>>>>>>>>>>> FileIO.
>>>>>>>>>>> >              > What I am mainly after is, to have a
>>>>>>>>>>> possibility to
>>>>>>>>>>> >             provide optimisation
>>>>>>>>>>> >              > for Flink runtime and pass in a special
>>>>>>>>>>> function which
>>>>>>>>>>> >             generates shard
>>>>>>>>>>> >              > keys in a way that they are evenly spread among
>>>>>>>>>>> workers
>>>>>>>>>>> >             (BEAM-5865).
>>>>>>>>>>> >              >
>>>>>>>>>>> >              > Would such extension for FileIO make sense? If
>>>>>>>>>>> yes, I
>>>>>>>>>>> >             would create a
>>>>>>>>>>> >              > ticket for it and try to draft a PR.
>>>>>>>>>>> >              >
>>>>>>>>>>> >              > Best,
>>>>>>>>>>> >              > Jozef
>>>>>>>>>>> >
>>>>>>>>>>>
>>>>>>>>>>

Reply via email to