Re: Custom shardingFn for FileIO

2019-05-09 Thread Jozef Vilcek
Yes, I was able to use it in Flink and I do see performance gain. I also
see, which is important for me, more predictable and uniform memory usage
among workers

On Wed, May 8, 2019 at 7:19 AM Reuven Lax  wrote:

> So you were able to use this in Flink? Did you see performance gains?
>
> On Sun, May 5, 2019 at 5:25 AM Jozef Vilcek  wrote:
>
>> Sorry, it took a while. I wanted to actually use this extension for
>> WriteFiles in Flink and see it works and that proved too be a bit bumpy.
>> PR is at https://github.com/apache/beam/pull/8499
>>
>> On Thu, May 2, 2019 at 3:22 PM Reuven Lax  wrote:
>>
>>> Great, let me know when to take another look at the PR!
>>>
>>> Reuven
>>>
>>> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek 
>>> wrote:
>>>
 That coder is added extra as a re-map stage from "original" key to new
 ShardAwareKey ... But pipeline might get broken I guess.
 Very fair point. I am having a second thought pass over this and will
 try to simplify it much more

 On Wed, May 1, 2019 at 2:12 PM Reuven Lax  wrote:

> I haven't looked at the PR in depth yet, but it appears that someone
> running a pipeline today who then tries to update post this PR will have
> the coder change to DefaultShardKeyCoder, even if they haven't picked any
> custom function. Is that correct, or am I misreading things?
>
> Reuven
>
> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek 
> wrote:
>
>> Hm, what would be the scenario? Have version A running with original
>> random sharding and then start version B where I change sharding to some
>> custom function?
>> So I have to enable the pipeline to digest old keys from GBK restored
>> state and also work with new keys produced to GBK going forward?
>>
>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax  wrote:
>>
>>> Initial thought on PR: we usually try to limit changing coders in
>>> these types of transforms to better support runners that allow in-place
>>> updates of pipelines. Can this be done without changing the coder?
>>>
>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek 
>>> wrote:
>>>
 I have created a PR for enhancing WriteFiles for custom sharding
 function.
 https://github.com/apache/beam/pull/8438

 If this sort of change looks good, then next step would be to use
 in in Flink runner transform override. Let me know what do you think

 On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek 
 wrote:

> I guess it is fine to enable shardingFn control only on WriteFiles
> level rather than FileIO. On WriteFiles it can be manipulated in
> PTransformOverride by runner.
>
> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax 
> wrote:
>
>> Yes, a hook would have to be added to allow specifying a
>> different function for choosing the shard number (I assume the 
>> problem is
>> that there are cases where the current random assignment is not 
>> good?).
>> However this can be set using PTransformOverride, we ideally 
>> shouldn't
>> force the user to know details of the runner when writing their code.
>>
>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <
>> m...@apache.org> wrote:
>>
>>> Reuven is talking about PTransformOverride, e.g.
>>> FlinkTransformOverrides. We already use this to determine the
>>> number of
>>> shards in case of Runner-determined sharding.
>>>
>>> Not sure if that would work for Jozef's case because setting the
>>> number
>>> of shards is not enough. We want to set the shard key directly
>>> and that
>>> logic is buried inside WriteFiles.
>>>
>>> -Max
>>>
>>> On 25.04.19 16:30, Reuven Lax wrote:
>>> > Actually the runner is free to perform surgery on the graph.
>>> The
>>> > FlinkRunner can insert a custom function to determine the
>>> sharding keys.
>>> >
>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>> jozo.vil...@gmail.com
>>> > > wrote:
>>> >
>>> > Right now, sharding can be specified only via target
>>> `shardCount`,
>>> > be it user or runner. Next to configurable shardCount, I am
>>> > proposing to be able to pass also a function which will
>>> allow to the
>>> > user (or runner) control how is shard determined and what
>>> key will
>>> > be used to represent it
>>> >
>>> > interface ShardingFunction[UserT, DestinationT,
>>> ShardKeyT]  extends
>>> > Serializable {
>>> > ShardKeyT assign(DestinationT destination, UserT
>>> element,
>>> > 

Re: Custom shardingFn for FileIO

2019-05-07 Thread Reuven Lax
So you were able to use this in Flink? Did you see performance gains?

On Sun, May 5, 2019 at 5:25 AM Jozef Vilcek  wrote:

> Sorry, it took a while. I wanted to actually use this extension for
> WriteFiles in Flink and see it works and that proved too be a bit bumpy.
> PR is at https://github.com/apache/beam/pull/8499
>
> On Thu, May 2, 2019 at 3:22 PM Reuven Lax  wrote:
>
>> Great, let me know when to take another look at the PR!
>>
>> Reuven
>>
>> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek 
>> wrote:
>>
>>> That coder is added extra as a re-map stage from "original" key to new
>>> ShardAwareKey ... But pipeline might get broken I guess.
>>> Very fair point. I am having a second thought pass over this and will
>>> try to simplify it much more
>>>
>>> On Wed, May 1, 2019 at 2:12 PM Reuven Lax  wrote:
>>>
 I haven't looked at the PR in depth yet, but it appears that someone
 running a pipeline today who then tries to update post this PR will have
 the coder change to DefaultShardKeyCoder, even if they haven't picked any
 custom function. Is that correct, or am I misreading things?

 Reuven

 On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek 
 wrote:

> Hm, what would be the scenario? Have version A running with original
> random sharding and then start version B where I change sharding to some
> custom function?
> So I have to enable the pipeline to digest old keys from GBK restored
> state and also work with new keys produced to GBK going forward?
>
> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax  wrote:
>
>> Initial thought on PR: we usually try to limit changing coders in
>> these types of transforms to better support runners that allow in-place
>> updates of pipelines. Can this be done without changing the coder?
>>
>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek 
>> wrote:
>>
>>> I have created a PR for enhancing WriteFiles for custom sharding
>>> function.
>>> https://github.com/apache/beam/pull/8438
>>>
>>> If this sort of change looks good, then next step would be to use in
>>> in Flink runner transform override. Let me know what do you think
>>>
>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek 
>>> wrote:
>>>
 I guess it is fine to enable shardingFn control only on WriteFiles
 level rather than FileIO. On WriteFiles it can be manipulated in
 PTransformOverride by runner.

 On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax 
 wrote:

> Yes, a hook would have to be added to allow specifying a different
> function for choosing the shard number (I assume the problem is that 
> there
> are cases where the current random assignment is not good?). However 
> this
> can be set using PTransformOverride, we ideally shouldn't force the 
> user to
> know details of the runner when writing their code.
>
> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels 
> wrote:
>
>> Reuven is talking about PTransformOverride, e.g.
>> FlinkTransformOverrides. We already use this to determine the
>> number of
>> shards in case of Runner-determined sharding.
>>
>> Not sure if that would work for Jozef's case because setting the
>> number
>> of shards is not enough. We want to set the shard key directly
>> and that
>> logic is buried inside WriteFiles.
>>
>> -Max
>>
>> On 25.04.19 16:30, Reuven Lax wrote:
>> > Actually the runner is free to perform surgery on the graph.
>> The
>> > FlinkRunner can insert a custom function to determine the
>> sharding keys.
>> >
>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>> jozo.vil...@gmail.com
>> > > wrote:
>> >
>> > Right now, sharding can be specified only via target
>> `shardCount`,
>> > be it user or runner. Next to configurable shardCount, I am
>> > proposing to be able to pass also a function which will
>> allow to the
>> > user (or runner) control how is shard determined and what
>> key will
>> > be used to represent it
>> >
>> > interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>> extends
>> > Serializable {
>> > ShardKeyT assign(DestinationT destination, UserT
>> element,
>> > shardCount: Integer);
>> > }
>> >
>> > Default implementation can be what is right now =>  random
>> shard
>> > encapsulated as ShardedKey.
>> >
>> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax <
>> re...@google.com
>> > > wrote:
>> 

Re: Custom shardingFn for FileIO

2019-05-05 Thread Jozef Vilcek
Sorry, it took a while. I wanted to actually use this extension for
WriteFiles in Flink and see it works and that proved too be a bit bumpy.
PR is at https://github.com/apache/beam/pull/8499

On Thu, May 2, 2019 at 3:22 PM Reuven Lax  wrote:

> Great, let me know when to take another look at the PR!
>
> Reuven
>
> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek  wrote:
>
>> That coder is added extra as a re-map stage from "original" key to new
>> ShardAwareKey ... But pipeline might get broken I guess.
>> Very fair point. I am having a second thought pass over this and will try
>> to simplify it much more
>>
>> On Wed, May 1, 2019 at 2:12 PM Reuven Lax  wrote:
>>
>>> I haven't looked at the PR in depth yet, but it appears that someone
>>> running a pipeline today who then tries to update post this PR will have
>>> the coder change to DefaultShardKeyCoder, even if they haven't picked any
>>> custom function. Is that correct, or am I misreading things?
>>>
>>> Reuven
>>>
>>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek 
>>> wrote:
>>>
 Hm, what would be the scenario? Have version A running with original
 random sharding and then start version B where I change sharding to some
 custom function?
 So I have to enable the pipeline to digest old keys from GBK restored
 state and also work with new keys produced to GBK going forward?

 On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax  wrote:

> Initial thought on PR: we usually try to limit changing coders in
> these types of transforms to better support runners that allow in-place
> updates of pipelines. Can this be done without changing the coder?
>
> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek 
> wrote:
>
>> I have created a PR for enhancing WriteFiles for custom sharding
>> function.
>> https://github.com/apache/beam/pull/8438
>>
>> If this sort of change looks good, then next step would be to use in
>> in Flink runner transform override. Let me know what do you think
>>
>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek 
>> wrote:
>>
>>> I guess it is fine to enable shardingFn control only on WriteFiles
>>> level rather than FileIO. On WriteFiles it can be manipulated in
>>> PTransformOverride by runner.
>>>
>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax  wrote:
>>>
 Yes, a hook would have to be added to allow specifying a different
 function for choosing the shard number (I assume the problem is that 
 there
 are cases where the current random assignment is not good?). However 
 this
 can be set using PTransformOverride, we ideally shouldn't force the 
 user to
 know details of the runner when writing their code.

 On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels 
 wrote:

> Reuven is talking about PTransformOverride, e.g.
> FlinkTransformOverrides. We already use this to determine the
> number of
> shards in case of Runner-determined sharding.
>
> Not sure if that would work for Jozef's case because setting the
> number
> of shards is not enough. We want to set the shard key directly and
> that
> logic is buried inside WriteFiles.
>
> -Max
>
> On 25.04.19 16:30, Reuven Lax wrote:
> > Actually the runner is free to perform surgery on the graph. The
> > FlinkRunner can insert a custom function to determine the
> sharding keys.
> >
> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
> jozo.vil...@gmail.com
> > > wrote:
> >
> > Right now, sharding can be specified only via target
> `shardCount`,
> > be it user or runner. Next to configurable shardCount, I am
> > proposing to be able to pass also a function which will
> allow to the
> > user (or runner) control how is shard determined and what
> key will
> > be used to represent it
> >
> > interface ShardingFunction[UserT, DestinationT, ShardKeyT]
> extends
> > Serializable {
> > ShardKeyT assign(DestinationT destination, UserT element,
> > shardCount: Integer);
> > }
> >
> > Default implementation can be what is right now =>  random
> shard
> > encapsulated as ShardedKey.
> >
> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax  > > wrote:
> >
> > If sharding is not specified, then the semantics are
> > "runner-determined sharding." The DataflowRunner already
> takes
> > advantage of this to impose its own sharding if the user
> hasn't
> > specified an explicit one. 

Re: Custom shardingFn for FileIO

2019-05-02 Thread Reuven Lax
Great, let me know when to take another look at the PR!

Reuven

On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek  wrote:

> That coder is added extra as a re-map stage from "original" key to new
> ShardAwareKey ... But pipeline might get broken I guess.
> Very fair point. I am having a second thought pass over this and will try
> to simplify it much more
>
> On Wed, May 1, 2019 at 2:12 PM Reuven Lax  wrote:
>
>> I haven't looked at the PR in depth yet, but it appears that someone
>> running a pipeline today who then tries to update post this PR will have
>> the coder change to DefaultShardKeyCoder, even if they haven't picked any
>> custom function. Is that correct, or am I misreading things?
>>
>> Reuven
>>
>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek 
>> wrote:
>>
>>> Hm, what would be the scenario? Have version A running with original
>>> random sharding and then start version B where I change sharding to some
>>> custom function?
>>> So I have to enable the pipeline to digest old keys from GBK restored
>>> state and also work with new keys produced to GBK going forward?
>>>
>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax  wrote:
>>>
 Initial thought on PR: we usually try to limit changing coders in these
 types of transforms to better support runners that allow in-place updates
 of pipelines. Can this be done without changing the coder?

 On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek 
 wrote:

> I have created a PR for enhancing WriteFiles for custom sharding
> function.
> https://github.com/apache/beam/pull/8438
>
> If this sort of change looks good, then next step would be to use in
> in Flink runner transform override. Let me know what do you think
>
> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek 
> wrote:
>
>> I guess it is fine to enable shardingFn control only on WriteFiles
>> level rather than FileIO. On WriteFiles it can be manipulated in
>> PTransformOverride by runner.
>>
>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax  wrote:
>>
>>> Yes, a hook would have to be added to allow specifying a different
>>> function for choosing the shard number (I assume the problem is that 
>>> there
>>> are cases where the current random assignment is not good?). However 
>>> this
>>> can be set using PTransformOverride, we ideally shouldn't force the 
>>> user to
>>> know details of the runner when writing their code.
>>>
>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels 
>>> wrote:
>>>
 Reuven is talking about PTransformOverride, e.g.
 FlinkTransformOverrides. We already use this to determine the
 number of
 shards in case of Runner-determined sharding.

 Not sure if that would work for Jozef's case because setting the
 number
 of shards is not enough. We want to set the shard key directly and
 that
 logic is buried inside WriteFiles.

 -Max

 On 25.04.19 16:30, Reuven Lax wrote:
 > Actually the runner is free to perform surgery on the graph. The
 > FlinkRunner can insert a custom function to determine the
 sharding keys.
 >
 > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
 jozo.vil...@gmail.com
 > > wrote:
 >
 > Right now, sharding can be specified only via target
 `shardCount`,
 > be it user or runner. Next to configurable shardCount, I am
 > proposing to be able to pass also a function which will allow
 to the
 > user (or runner) control how is shard determined and what key
 will
 > be used to represent it
 >
 > interface ShardingFunction[UserT, DestinationT, ShardKeyT]
 extends
 > Serializable {
 > ShardKeyT assign(DestinationT destination, UserT element,
 > shardCount: Integer);
 > }
 >
 > Default implementation can be what is right now =>  random
 shard
 > encapsulated as ShardedKey.
 >
 > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax >>> > > wrote:
 >
 > If sharding is not specified, then the semantics are
 > "runner-determined sharding." The DataflowRunner already
 takes
 > advantage of this to impose its own sharding if the user
 hasn't
 > specified an explicit one. Could the Flink runner do the
 same
 > instead of pushing this to the users?
 >
 > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
 > mailto:m...@apache.org>> wrote:
 >
 > Hi Jozef,
 >
 > For sharding in FileIO there are basically two

Re: Custom shardingFn for FileIO

2019-05-01 Thread Jozef Vilcek
That coder is added extra as a re-map stage from "original" key to new
ShardAwareKey ... But pipeline might get broken I guess.
Very fair point. I am having a second thought pass over this and will try
to simplify it much more

On Wed, May 1, 2019 at 2:12 PM Reuven Lax  wrote:

> I haven't looked at the PR in depth yet, but it appears that someone
> running a pipeline today who then tries to update post this PR will have
> the coder change to DefaultShardKeyCoder, even if they haven't picked any
> custom function. Is that correct, or am I misreading things?
>
> Reuven
>
> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek 
> wrote:
>
>> Hm, what would be the scenario? Have version A running with original
>> random sharding and then start version B where I change sharding to some
>> custom function?
>> So I have to enable the pipeline to digest old keys from GBK restored
>> state and also work with new keys produced to GBK going forward?
>>
>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax  wrote:
>>
>>> Initial thought on PR: we usually try to limit changing coders in these
>>> types of transforms to better support runners that allow in-place updates
>>> of pipelines. Can this be done without changing the coder?
>>>
>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek 
>>> wrote:
>>>
 I have created a PR for enhancing WriteFiles for custom sharding
 function.
 https://github.com/apache/beam/pull/8438

 If this sort of change looks good, then next step would be to use in in
 Flink runner transform override. Let me know what do you think

 On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek 
 wrote:

> I guess it is fine to enable shardingFn control only on WriteFiles
> level rather than FileIO. On WriteFiles it can be manipulated in
> PTransformOverride by runner.
>
> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax  wrote:
>
>> Yes, a hook would have to be added to allow specifying a different
>> function for choosing the shard number (I assume the problem is that 
>> there
>> are cases where the current random assignment is not good?). However this
>> can be set using PTransformOverride, we ideally shouldn't force the user 
>> to
>> know details of the runner when writing their code.
>>
>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels 
>> wrote:
>>
>>> Reuven is talking about PTransformOverride, e.g.
>>> FlinkTransformOverrides. We already use this to determine the number
>>> of
>>> shards in case of Runner-determined sharding.
>>>
>>> Not sure if that would work for Jozef's case because setting the
>>> number
>>> of shards is not enough. We want to set the shard key directly and
>>> that
>>> logic is buried inside WriteFiles.
>>>
>>> -Max
>>>
>>> On 25.04.19 16:30, Reuven Lax wrote:
>>> > Actually the runner is free to perform surgery on the graph. The
>>> > FlinkRunner can insert a custom function to determine the sharding
>>> keys.
>>> >
>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek <
>>> jozo.vil...@gmail.com
>>> > > wrote:
>>> >
>>> > Right now, sharding can be specified only via target
>>> `shardCount`,
>>> > be it user or runner. Next to configurable shardCount, I am
>>> > proposing to be able to pass also a function which will allow
>>> to the
>>> > user (or runner) control how is shard determined and what key
>>> will
>>> > be used to represent it
>>> >
>>> > interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>>> extends
>>> > Serializable {
>>> > ShardKeyT assign(DestinationT destination, UserT element,
>>> > shardCount: Integer);
>>> > }
>>> >
>>> > Default implementation can be what is right now =>  random
>>> shard
>>> > encapsulated as ShardedKey.
>>> >
>>> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax >> > > wrote:
>>> >
>>> > If sharding is not specified, then the semantics are
>>> > "runner-determined sharding." The DataflowRunner already
>>> takes
>>> > advantage of this to impose its own sharding if the user
>>> hasn't
>>> > specified an explicit one. Could the Flink runner do the
>>> same
>>> > instead of pushing this to the users?
>>> >
>>> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>> > mailto:m...@apache.org>> wrote:
>>> >
>>> > Hi Jozef,
>>> >
>>> > For sharding in FileIO there are basically two options:
>>> >
>>> > (1) num_shards ~= num_workers => bad spread of the load
>>> > across workers
>>> > (2) num_shards >> num_workers => good spread of the
>>> load
>>> > across 

Re: Custom shardingFn for FileIO

2019-05-01 Thread Reuven Lax
I haven't looked at the PR in depth yet, but it appears that someone
running a pipeline today who then tries to update post this PR will have
the coder change to DefaultShardKeyCoder, even if they haven't picked any
custom function. Is that correct, or am I misreading things?

Reuven

On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek  wrote:

> Hm, what would be the scenario? Have version A running with original
> random sharding and then start version B where I change sharding to some
> custom function?
> So I have to enable the pipeline to digest old keys from GBK restored
> state and also work with new keys produced to GBK going forward?
>
> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax  wrote:
>
>> Initial thought on PR: we usually try to limit changing coders in these
>> types of transforms to better support runners that allow in-place updates
>> of pipelines. Can this be done without changing the coder?
>>
>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek 
>> wrote:
>>
>>> I have created a PR for enhancing WriteFiles for custom sharding
>>> function.
>>> https://github.com/apache/beam/pull/8438
>>>
>>> If this sort of change looks good, then next step would be to use in in
>>> Flink runner transform override. Let me know what do you think
>>>
>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek 
>>> wrote:
>>>
 I guess it is fine to enable shardingFn control only on WriteFiles
 level rather than FileIO. On WriteFiles it can be manipulated in
 PTransformOverride by runner.

 On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax  wrote:

> Yes, a hook would have to be added to allow specifying a different
> function for choosing the shard number (I assume the problem is that there
> are cases where the current random assignment is not good?). However this
> can be set using PTransformOverride, we ideally shouldn't force the user 
> to
> know details of the runner when writing their code.
>
> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels 
> wrote:
>
>> Reuven is talking about PTransformOverride, e.g.
>> FlinkTransformOverrides. We already use this to determine the number
>> of
>> shards in case of Runner-determined sharding.
>>
>> Not sure if that would work for Jozef's case because setting the
>> number
>> of shards is not enough. We want to set the shard key directly and
>> that
>> logic is buried inside WriteFiles.
>>
>> -Max
>>
>> On 25.04.19 16:30, Reuven Lax wrote:
>> > Actually the runner is free to perform surgery on the graph. The
>> > FlinkRunner can insert a custom function to determine the sharding
>> keys.
>> >
>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek > > > wrote:
>> >
>> > Right now, sharding can be specified only via target
>> `shardCount`,
>> > be it user or runner. Next to configurable shardCount, I am
>> > proposing to be able to pass also a function which will allow
>> to the
>> > user (or runner) control how is shard determined and what key
>> will
>> > be used to represent it
>> >
>> > interface ShardingFunction[UserT, DestinationT, ShardKeyT]
>> extends
>> > Serializable {
>> > ShardKeyT assign(DestinationT destination, UserT element,
>> > shardCount: Integer);
>> > }
>> >
>> > Default implementation can be what is right now =>  random shard
>> > encapsulated as ShardedKey.
>> >
>> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax > > > wrote:
>> >
>> > If sharding is not specified, then the semantics are
>> > "runner-determined sharding." The DataflowRunner already
>> takes
>> > advantage of this to impose its own sharding if the user
>> hasn't
>> > specified an explicit one. Could the Flink runner do the
>> same
>> > instead of pushing this to the users?
>> >
>> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>> > mailto:m...@apache.org>> wrote:
>> >
>> > Hi Jozef,
>> >
>> > For sharding in FileIO there are basically two options:
>> >
>> > (1) num_shards ~= num_workers => bad spread of the load
>> > across workers
>> > (2) num_shards >> num_workers => good spread of the load
>> > across workers,
>> > but huge number of files
>> >
>> > Your approach would give users control over the sharding
>> > keys such that
>> > they could be adjusted to spread load more evenly.
>> >
>> > I'd like to hear from Beam IO experts if that would
>> make sense.
>> >
>> > Thanks,
>> > Max
>> >
>> > On 25.04.19 08:52, 

Re: Custom shardingFn for FileIO

2019-04-30 Thread Jozef Vilcek
Hm, what would be the scenario? Have version A running with original random
sharding and then start version B where I change sharding to some custom
function?
So I have to enable the pipeline to digest old keys from GBK restored state
and also work with new keys produced to GBK going forward?

On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax  wrote:

> Initial thought on PR: we usually try to limit changing coders in these
> types of transforms to better support runners that allow in-place updates
> of pipelines. Can this be done without changing the coder?
>
> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek 
> wrote:
>
>> I have created a PR for enhancing WriteFiles for custom sharding function.
>> https://github.com/apache/beam/pull/8438
>>
>> If this sort of change looks good, then next step would be to use in in
>> Flink runner transform override. Let me know what do you think
>>
>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek 
>> wrote:
>>
>>> I guess it is fine to enable shardingFn control only on WriteFiles level
>>> rather than FileIO. On WriteFiles it can be manipulated in
>>> PTransformOverride by runner.
>>>
>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax  wrote:
>>>
 Yes, a hook would have to be added to allow specifying a different
 function for choosing the shard number (I assume the problem is that there
 are cases where the current random assignment is not good?). However this
 can be set using PTransformOverride, we ideally shouldn't force the user to
 know details of the runner when writing their code.

 On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels 
 wrote:

> Reuven is talking about PTransformOverride, e.g.
> FlinkTransformOverrides. We already use this to determine the number
> of
> shards in case of Runner-determined sharding.
>
> Not sure if that would work for Jozef's case because setting the
> number
> of shards is not enough. We want to set the shard key directly and
> that
> logic is buried inside WriteFiles.
>
> -Max
>
> On 25.04.19 16:30, Reuven Lax wrote:
> > Actually the runner is free to perform surgery on the graph. The
> > FlinkRunner can insert a custom function to determine the sharding
> keys.
> >
> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek  > > wrote:
> >
> > Right now, sharding can be specified only via target
> `shardCount`,
> > be it user or runner. Next to configurable shardCount, I am
> > proposing to be able to pass also a function which will allow to
> the
> > user (or runner) control how is shard determined and what key
> will
> > be used to represent it
> >
> > interface ShardingFunction[UserT, DestinationT, ShardKeyT]
> extends
> > Serializable {
> > ShardKeyT assign(DestinationT destination, UserT element,
> > shardCount: Integer);
> > }
> >
> > Default implementation can be what is right now =>  random shard
> > encapsulated as ShardedKey.
> >
> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax  > > wrote:
> >
> > If sharding is not specified, then the semantics are
> > "runner-determined sharding." The DataflowRunner already
> takes
> > advantage of this to impose its own sharding if the user
> hasn't
> > specified an explicit one. Could the Flink runner do the same
> > instead of pushing this to the users?
> >
> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
> > mailto:m...@apache.org>> wrote:
> >
> > Hi Jozef,
> >
> > For sharding in FileIO there are basically two options:
> >
> > (1) num_shards ~= num_workers => bad spread of the load
> > across workers
> > (2) num_shards >> num_workers => good spread of the load
> > across workers,
> > but huge number of files
> >
> > Your approach would give users control over the sharding
> > keys such that
> > they could be adjusted to spread load more evenly.
> >
> > I'd like to hear from Beam IO experts if that would make
> sense.
> >
> > Thanks,
> > Max
> >
> > On 25.04.19 08:52, Jozef Vilcek wrote:
> >  > Hello,
> >  >
> >  > Right now, if someone needs sharded files via FileIO,
> > there is only one
> >  > option which is random (round robin) shard assignment
> per
> > element and it
> >  > always use ShardedKey as a key for the GBK
> which
> > follows.
> >  >
> >  > I would like to 

Re: Custom shardingFn for FileIO

2019-04-30 Thread Reuven Lax
Initial thought on PR: we usually try to limit changing coders in these
types of transforms to better support runners that allow in-place updates
of pipelines. Can this be done without changing the coder?

On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek  wrote:

> I have created a PR for enhancing WriteFiles for custom sharding function.
> https://github.com/apache/beam/pull/8438
>
> If this sort of change looks good, then next step would be to use in in
> Flink runner transform override. Let me know what do you think
>
> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek 
> wrote:
>
>> I guess it is fine to enable shardingFn control only on WriteFiles level
>> rather than FileIO. On WriteFiles it can be manipulated in
>> PTransformOverride by runner.
>>
>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax  wrote:
>>
>>> Yes, a hook would have to be added to allow specifying a different
>>> function for choosing the shard number (I assume the problem is that there
>>> are cases where the current random assignment is not good?). However this
>>> can be set using PTransformOverride, we ideally shouldn't force the user to
>>> know details of the runner when writing their code.
>>>
>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels 
>>> wrote:
>>>
 Reuven is talking about PTransformOverride, e.g.
 FlinkTransformOverrides. We already use this to determine the number of
 shards in case of Runner-determined sharding.

 Not sure if that would work for Jozef's case because setting the number
 of shards is not enough. We want to set the shard key directly and that
 logic is buried inside WriteFiles.

 -Max

 On 25.04.19 16:30, Reuven Lax wrote:
 > Actually the runner is free to perform surgery on the graph. The
 > FlinkRunner can insert a custom function to determine the sharding
 keys.
 >
 > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek >>> > > wrote:
 >
 > Right now, sharding can be specified only via target `shardCount`,
 > be it user or runner. Next to configurable shardCount, I am
 > proposing to be able to pass also a function which will allow to
 the
 > user (or runner) control how is shard determined and what key will
 > be used to represent it
 >
 > interface ShardingFunction[UserT, DestinationT, ShardKeyT]
 extends
 > Serializable {
 > ShardKeyT assign(DestinationT destination, UserT element,
 > shardCount: Integer);
 > }
 >
 > Default implementation can be what is right now =>  random shard
 > encapsulated as ShardedKey.
 >
 > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax >>> > > wrote:
 >
 > If sharding is not specified, then the semantics are
 > "runner-determined sharding." The DataflowRunner already takes
 > advantage of this to impose its own sharding if the user
 hasn't
 > specified an explicit one. Could the Flink runner do the same
 > instead of pushing this to the users?
 >
 > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
 > mailto:m...@apache.org>> wrote:
 >
 > Hi Jozef,
 >
 > For sharding in FileIO there are basically two options:
 >
 > (1) num_shards ~= num_workers => bad spread of the load
 > across workers
 > (2) num_shards >> num_workers => good spread of the load
 > across workers,
 > but huge number of files
 >
 > Your approach would give users control over the sharding
 > keys such that
 > they could be adjusted to spread load more evenly.
 >
 > I'd like to hear from Beam IO experts if that would make
 sense.
 >
 > Thanks,
 > Max
 >
 > On 25.04.19 08:52, Jozef Vilcek wrote:
 >  > Hello,
 >  >
 >  > Right now, if someone needs sharded files via FileIO,
 > there is only one
 >  > option which is random (round robin) shard assignment
 per
 > element and it
 >  > always use ShardedKey as a key for the GBK
 which
 > follows.
 >  >
 >  > I would like to generalize this and have a possibility
 to
 > provide some
 >  > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
 >  > What I am mainly after is, to have a possibility to
 > provide optimisation
 >  > for Flink runtime and pass in a special function which
 > generates shard
 >  > keys in a way that they are evenly spread among workers
 > (BEAM-5865).
 >  

Re: Custom shardingFn for FileIO

2019-04-30 Thread Jozef Vilcek
I have created a PR for enhancing WriteFiles for custom sharding function.
https://github.com/apache/beam/pull/8438

If this sort of change looks good, then next step would be to use in in
Flink runner transform override. Let me know what do you think

On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek  wrote:

> I guess it is fine to enable shardingFn control only on WriteFiles level
> rather than FileIO. On WriteFiles it can be manipulated in
> PTransformOverride by runner.
>
> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax  wrote:
>
>> Yes, a hook would have to be added to allow specifying a different
>> function for choosing the shard number (I assume the problem is that there
>> are cases where the current random assignment is not good?). However this
>> can be set using PTransformOverride, we ideally shouldn't force the user to
>> know details of the runner when writing their code.
>>
>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels 
>> wrote:
>>
>>> Reuven is talking about PTransformOverride, e.g.
>>> FlinkTransformOverrides. We already use this to determine the number of
>>> shards in case of Runner-determined sharding.
>>>
>>> Not sure if that would work for Jozef's case because setting the number
>>> of shards is not enough. We want to set the shard key directly and that
>>> logic is buried inside WriteFiles.
>>>
>>> -Max
>>>
>>> On 25.04.19 16:30, Reuven Lax wrote:
>>> > Actually the runner is free to perform surgery on the graph. The
>>> > FlinkRunner can insert a custom function to determine the sharding
>>> keys.
>>> >
>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek >> > > wrote:
>>> >
>>> > Right now, sharding can be specified only via target `shardCount`,
>>> > be it user or runner. Next to configurable shardCount, I am
>>> > proposing to be able to pass also a function which will allow to
>>> the
>>> > user (or runner) control how is shard determined and what key will
>>> > be used to represent it
>>> >
>>> > interface ShardingFunction[UserT, DestinationT, ShardKeyT]  extends
>>> > Serializable {
>>> > ShardKeyT assign(DestinationT destination, UserT element,
>>> > shardCount: Integer);
>>> > }
>>> >
>>> > Default implementation can be what is right now =>  random shard
>>> > encapsulated as ShardedKey.
>>> >
>>> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax >> > > wrote:
>>> >
>>> > If sharding is not specified, then the semantics are
>>> > "runner-determined sharding." The DataflowRunner already takes
>>> > advantage of this to impose its own sharding if the user hasn't
>>> > specified an explicit one. Could the Flink runner do the same
>>> > instead of pushing this to the users?
>>> >
>>> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
>>> > mailto:m...@apache.org>> wrote:
>>> >
>>> > Hi Jozef,
>>> >
>>> > For sharding in FileIO there are basically two options:
>>> >
>>> > (1) num_shards ~= num_workers => bad spread of the load
>>> > across workers
>>> > (2) num_shards >> num_workers => good spread of the load
>>> > across workers,
>>> > but huge number of files
>>> >
>>> > Your approach would give users control over the sharding
>>> > keys such that
>>> > they could be adjusted to spread load more evenly.
>>> >
>>> > I'd like to hear from Beam IO experts if that would make
>>> sense.
>>> >
>>> > Thanks,
>>> > Max
>>> >
>>> > On 25.04.19 08:52, Jozef Vilcek wrote:
>>> >  > Hello,
>>> >  >
>>> >  > Right now, if someone needs sharded files via FileIO,
>>> > there is only one
>>> >  > option which is random (round robin) shard assignment
>>> per
>>> > element and it
>>> >  > always use ShardedKey as a key for the GBK
>>> which
>>> > follows.
>>> >  >
>>> >  > I would like to generalize this and have a possibility
>>> to
>>> > provide some
>>> >  > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
>>> >  > What I am mainly after is, to have a possibility to
>>> > provide optimisation
>>> >  > for Flink runtime and pass in a special function which
>>> > generates shard
>>> >  > keys in a way that they are evenly spread among workers
>>> > (BEAM-5865).
>>> >  >
>>> >  > Would such extension for FileIO make sense? If yes, I
>>> > would create a
>>> >  > ticket for it and try to draft a PR.
>>> >  >
>>> >  > Best,
>>> >  > Jozef
>>> >
>>>
>>


Re: Custom shardingFn for FileIO

2019-04-25 Thread Reuven Lax
Yes, a hook would have to be added to allow specifying a different function
for choosing the shard number (I assume the problem is that there are cases
where the current random assignment is not good?). However this can be set
using PTransformOverride, we ideally shouldn't force the user to know
details of the runner when writing their code.

On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels  wrote:

> Reuven is talking about PTransformOverride, e.g.
> FlinkTransformOverrides. We already use this to determine the number of
> shards in case of Runner-determined sharding.
>
> Not sure if that would work for Jozef's case because setting the number
> of shards is not enough. We want to set the shard key directly and that
> logic is buried inside WriteFiles.
>
> -Max
>
> On 25.04.19 16:30, Reuven Lax wrote:
> > Actually the runner is free to perform surgery on the graph. The
> > FlinkRunner can insert a custom function to determine the sharding keys.
> >
> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek  > > wrote:
> >
> > Right now, sharding can be specified only via target `shardCount`,
> > be it user or runner. Next to configurable shardCount, I am
> > proposing to be able to pass also a function which will allow to the
> > user (or runner) control how is shard determined and what key will
> > be used to represent it
> >
> > interface ShardingFunction[UserT, DestinationT, ShardKeyT]  extends
> > Serializable {
> > ShardKeyT assign(DestinationT destination, UserT element,
> > shardCount: Integer);
> > }
> >
> > Default implementation can be what is right now =>  random shard
> > encapsulated as ShardedKey.
> >
> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax  > > wrote:
> >
> > If sharding is not specified, then the semantics are
> > "runner-determined sharding." The DataflowRunner already takes
> > advantage of this to impose its own sharding if the user hasn't
> > specified an explicit one. Could the Flink runner do the same
> > instead of pushing this to the users?
> >
> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
> > mailto:m...@apache.org>> wrote:
> >
> > Hi Jozef,
> >
> > For sharding in FileIO there are basically two options:
> >
> > (1) num_shards ~= num_workers => bad spread of the load
> > across workers
> > (2) num_shards >> num_workers => good spread of the load
> > across workers,
> > but huge number of files
> >
> > Your approach would give users control over the sharding
> > keys such that
> > they could be adjusted to spread load more evenly.
> >
> > I'd like to hear from Beam IO experts if that would make
> sense.
> >
> > Thanks,
> > Max
> >
> > On 25.04.19 08:52, Jozef Vilcek wrote:
> >  > Hello,
> >  >
> >  > Right now, if someone needs sharded files via FileIO,
> > there is only one
> >  > option which is random (round robin) shard assignment per
> > element and it
> >  > always use ShardedKey as a key for the GBK which
> > follows.
> >  >
> >  > I would like to generalize this and have a possibility to
> > provide some
> >  > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
> >  > What I am mainly after is, to have a possibility to
> > provide optimisation
> >  > for Flink runtime and pass in a special function which
> > generates shard
> >  > keys in a way that they are evenly spread among workers
> > (BEAM-5865).
> >  >
> >  > Would such extension for FileIO make sense? If yes, I
> > would create a
> >  > ticket for it and try to draft a PR.
> >  >
> >  > Best,
> >  > Jozef
> >
>


Re: Custom shardingFn for FileIO

2019-04-25 Thread Maximilian Michels
Reuven is talking about PTransformOverride, e.g. 
FlinkTransformOverrides. We already use this to determine the number of 
shards in case of Runner-determined sharding.


Not sure if that would work for Jozef's case because setting the number 
of shards is not enough. We want to set the shard key directly and that 
logic is buried inside WriteFiles.


-Max

On 25.04.19 16:30, Reuven Lax wrote:
Actually the runner is free to perform surgery on the graph. The 
FlinkRunner can insert a custom function to determine the sharding keys.


On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek > wrote:


Right now, sharding can be specified only via target `shardCount`,
be it user or runner. Next to configurable shardCount, I am
proposing to be able to pass also a function which will allow to the
user (or runner) control how is shard determined and what key will
be used to represent it

interface ShardingFunction[UserT, DestinationT, ShardKeyT]  extends
Serializable {
    ShardKeyT assign(DestinationT destination, UserT element,
shardCount: Integer);
}

Default implementation can be what is right now =>  random shard
encapsulated as ShardedKey.

On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax mailto:re...@google.com>> wrote:

If sharding is not specified, then the semantics are
"runner-determined sharding." The DataflowRunner already takes
advantage of this to impose its own sharding if the user hasn't
specified an explicit one. Could the Flink runner do the same
instead of pushing this to the users?

On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels
mailto:m...@apache.org>> wrote:

Hi Jozef,

For sharding in FileIO there are basically two options:

(1) num_shards ~= num_workers => bad spread of the load
across workers
(2) num_shards >> num_workers => good spread of the load
across workers,
but huge number of files

Your approach would give users control over the sharding
keys such that
they could be adjusted to spread load more evenly.

I'd like to hear from Beam IO experts if that would make sense.

Thanks,
Max

On 25.04.19 08:52, Jozef Vilcek wrote:
 > Hello,
 >
 > Right now, if someone needs sharded files via FileIO,
there is only one
 > option which is random (round robin) shard assignment per
element and it
 > always use ShardedKey as a key for the GBK which
follows.
 >
 > I would like to generalize this and have a possibility to
provide some
 > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
 > What I am mainly after is, to have a possibility to
provide optimisation
 > for Flink runtime and pass in a special function which
generates shard
 > keys in a way that they are evenly spread among workers
(BEAM-5865).
 >
 > Would such extension for FileIO make sense? If yes, I
would create a
 > ticket for it and try to draft a PR.
 >
 > Best,
 > Jozef



Re: Custom shardingFn for FileIO

2019-04-25 Thread Reuven Lax
Actually the runner is free to perform surgery on the graph. The
FlinkRunner can insert a custom function to determine the sharding keys.

On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek  wrote:

> Right now, sharding can be specified only via target `shardCount`, be it
> user or runner. Next to configurable shardCount, I am proposing to be able
> to pass also a function which will allow to the user (or runner) control
> how is shard determined and what key will be used to represent it
>
> interface ShardingFunction[UserT, DestinationT, ShardKeyT]  extends
> Serializable {
>ShardKeyT assign(DestinationT destination, UserT element, shardCount:
> Integer);
> }
>
> Default implementation can be what is right now =>  random shard
> encapsulated as ShardedKey.
>
>
> On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax  wrote:
>
>> If sharding is not specified, then the semantics are "runner-determined
>> sharding." The DataflowRunner already takes advantage of this to impose its
>> own sharding if the user hasn't specified an explicit one. Could the Flink
>> runner do the same instead of pushing this to the users?
>>
>> On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels 
>> wrote:
>>
>>> Hi Jozef,
>>>
>>> For sharding in FileIO there are basically two options:
>>>
>>> (1) num_shards ~= num_workers => bad spread of the load across workers
>>> (2) num_shards >> num_workers => good spread of the load across workers,
>>> but huge number of files
>>>
>>> Your approach would give users control over the sharding keys such that
>>> they could be adjusted to spread load more evenly.
>>>
>>> I'd like to hear from Beam IO experts if that would make sense.
>>>
>>> Thanks,
>>> Max
>>>
>>> On 25.04.19 08:52, Jozef Vilcek wrote:
>>> > Hello,
>>> >
>>> > Right now, if someone needs sharded files via FileIO, there is only
>>> one
>>> > option which is random (round robin) shard assignment per element and
>>> it
>>> > always use ShardedKey as a key for the GBK which follows.
>>> >
>>> > I would like to generalize this and have a possibility to provide some
>>> > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
>>> > What I am mainly after is, to have a possibility to provide
>>> optimisation
>>> > for Flink runtime and pass in a special function which generates shard
>>> > keys in a way that they are evenly spread among workers (BEAM-5865).
>>> >
>>> > Would such extension for FileIO make sense? If yes, I would create a
>>> > ticket for it and try to draft a PR.
>>> >
>>> > Best,
>>> > Jozef
>>>
>>


Re: Custom shardingFn for FileIO

2019-04-25 Thread Jozef Vilcek
Right now, sharding can be specified only via target `shardCount`, be it
user or runner. Next to configurable shardCount, I am proposing to be able
to pass also a function which will allow to the user (or runner) control
how is shard determined and what key will be used to represent it

interface ShardingFunction[UserT, DestinationT, ShardKeyT]  extends
Serializable {
   ShardKeyT assign(DestinationT destination, UserT element, shardCount:
Integer);
}

Default implementation can be what is right now =>  random shard
encapsulated as ShardedKey.


On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax  wrote:

> If sharding is not specified, then the semantics are "runner-determined
> sharding." The DataflowRunner already takes advantage of this to impose its
> own sharding if the user hasn't specified an explicit one. Could the Flink
> runner do the same instead of pushing this to the users?
>
> On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels  wrote:
>
>> Hi Jozef,
>>
>> For sharding in FileIO there are basically two options:
>>
>> (1) num_shards ~= num_workers => bad spread of the load across workers
>> (2) num_shards >> num_workers => good spread of the load across workers,
>> but huge number of files
>>
>> Your approach would give users control over the sharding keys such that
>> they could be adjusted to spread load more evenly.
>>
>> I'd like to hear from Beam IO experts if that would make sense.
>>
>> Thanks,
>> Max
>>
>> On 25.04.19 08:52, Jozef Vilcek wrote:
>> > Hello,
>> >
>> > Right now, if someone needs sharded files via FileIO, there is only one
>> > option which is random (round robin) shard assignment per element and
>> it
>> > always use ShardedKey as a key for the GBK which follows.
>> >
>> > I would like to generalize this and have a possibility to provide some
>> > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
>> > What I am mainly after is, to have a possibility to provide
>> optimisation
>> > for Flink runtime and pass in a special function which generates shard
>> > keys in a way that they are evenly spread among workers (BEAM-5865).
>> >
>> > Would such extension for FileIO make sense? If yes, I would create a
>> > ticket for it and try to draft a PR.
>> >
>> > Best,
>> > Jozef
>>
>


Re: Custom shardingFn for FileIO

2019-04-25 Thread Reuven Lax
If sharding is not specified, then the semantics are "runner-determined
sharding." The DataflowRunner already takes advantage of this to impose its
own sharding if the user hasn't specified an explicit one. Could the Flink
runner do the same instead of pushing this to the users?

On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels  wrote:

> Hi Jozef,
>
> For sharding in FileIO there are basically two options:
>
> (1) num_shards ~= num_workers => bad spread of the load across workers
> (2) num_shards >> num_workers => good spread of the load across workers,
> but huge number of files
>
> Your approach would give users control over the sharding keys such that
> they could be adjusted to spread load more evenly.
>
> I'd like to hear from Beam IO experts if that would make sense.
>
> Thanks,
> Max
>
> On 25.04.19 08:52, Jozef Vilcek wrote:
> > Hello,
> >
> > Right now, if someone needs sharded files via FileIO, there is only one
> > option which is random (round robin) shard assignment per element and it
> > always use ShardedKey as a key for the GBK which follows.
> >
> > I would like to generalize this and have a possibility to provide some
> > ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
> > What I am mainly after is, to have a possibility to provide optimisation
> > for Flink runtime and pass in a special function which generates shard
> > keys in a way that they are evenly spread among workers (BEAM-5865).
> >
> > Would such extension for FileIO make sense? If yes, I would create a
> > ticket for it and try to draft a PR.
> >
> > Best,
> > Jozef
>


Re: Custom shardingFn for FileIO

2019-04-25 Thread Maximilian Michels

Hi Jozef,

For sharding in FileIO there are basically two options:

(1) num_shards ~= num_workers => bad spread of the load across workers
(2) num_shards >> num_workers => good spread of the load across workers, 
but huge number of files


Your approach would give users control over the sharding keys such that 
they could be adjusted to spread load more evenly.


I'd like to hear from Beam IO experts if that would make sense.

Thanks,
Max

On 25.04.19 08:52, Jozef Vilcek wrote:

Hello,

Right now, if someone needs sharded files via FileIO, there is only one 
option which is random (round robin) shard assignment per element and it 
always use ShardedKey as a key for the GBK which follows.


I would like to generalize this and have a possibility to provide some 
ShardingFn[UserT, DestinationT, ShardKeyT] via FileIO.
What I am mainly after is, to have a possibility to provide optimisation 
for Flink runtime and pass in a special function which generates shard 
keys in a way that they are evenly spread among workers (BEAM-5865).


Would such extension for FileIO make sense? If yes, I would create a 
ticket for it and try to draft a PR.


Best,
Jozef