So you were able to use this in Flink? Did you see performance gains? On Sun, May 5, 2019 at 5:25 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote:
> Sorry, it took a while. I wanted to actually use this extension for > WriteFiles in Flink and see it works and that proved too be a bit bumpy. > PR is at https://github.com/apache/beam/pull/8499 > > On Thu, May 2, 2019 at 3:22 PM Reuven Lax <re...@google.com> wrote: > >> Great, let me know when to take another look at the PR! >> >> Reuven >> >> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek <jozo.vil...@gmail.com> >> wrote: >> >>> That coder is added extra as a re-map stage from "original" key to new >>> ShardAwareKey ... But pipeline might get broken I guess. >>> Very fair point. I am having a second thought pass over this and will >>> try to simplify it much more >>> >>> On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote: >>> >>>> I haven't looked at the PR in depth yet, but it appears that someone >>>> running a pipeline today who then tries to update post this PR will have >>>> the coder change to DefaultShardKeyCoder, even if they haven't picked any >>>> custom function. Is that correct, or am I misreading things? >>>> >>>> Reuven >>>> >>>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jozo.vil...@gmail.com> >>>> wrote: >>>> >>>>> Hm, what would be the scenario? Have version A running with original >>>>> random sharding and then start version B where I change sharding to some >>>>> custom function? >>>>> So I have to enable the pipeline to digest old keys from GBK restored >>>>> state and also work with new keys produced to GBK going forward? >>>>> >>>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote: >>>>> >>>>>> Initial thought on PR: we usually try to limit changing coders in >>>>>> these types of transforms to better support runners that allow in-place >>>>>> updates of pipelines. Can this be done without changing the coder? >>>>>> >>>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jozo.vil...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I have created a PR for enhancing WriteFiles for custom sharding >>>>>>> function. >>>>>>> https://github.com/apache/beam/pull/8438 >>>>>>> >>>>>>> If this sort of change looks good, then next step would be to use in >>>>>>> in Flink runner transform override. Let me know what do you think >>>>>>> >>>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jozo.vil...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles >>>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in >>>>>>>> PTransformOverride by runner. >>>>>>>> >>>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Yes, a hook would have to be added to allow specifying a different >>>>>>>>> function for choosing the shard number (I assume the problem is that >>>>>>>>> there >>>>>>>>> are cases where the current random assignment is not good?). However >>>>>>>>> this >>>>>>>>> can be set using PTransformOverride, we ideally shouldn't force the >>>>>>>>> user to >>>>>>>>> know details of the runner when writing their code. >>>>>>>>> >>>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels <m...@apache.org> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Reuven is talking about PTransformOverride, e.g. >>>>>>>>>> FlinkTransformOverrides. We already use this to determine the >>>>>>>>>> number of >>>>>>>>>> shards in case of Runner-determined sharding. >>>>>>>>>> >>>>>>>>>> Not sure if that would work for Jozef's case because setting the >>>>>>>>>> number >>>>>>>>>> of shards is not enough. We want to set the shard key directly >>>>>>>>>> and that >>>>>>>>>> logic is buried inside WriteFiles. >>>>>>>>>> >>>>>>>>>> -Max >>>>>>>>>> >>>>>>>>>> On 25.04.19 16:30, Reuven Lax wrote: >>>>>>>>>> > Actually the runner is free to perform surgery on the graph. >>>>>>>>>> The >>>>>>>>>> > FlinkRunner can insert a custom function to determine the >>>>>>>>>> sharding keys. >>>>>>>>>> > >>>>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek < >>>>>>>>>> jozo.vil...@gmail.com >>>>>>>>>> > <mailto:jozo.vil...@gmail.com>> wrote: >>>>>>>>>> > >>>>>>>>>> > Right now, sharding can be specified only via target >>>>>>>>>> `shardCount`, >>>>>>>>>> > be it user or runner. Next to configurable shardCount, I am >>>>>>>>>> > proposing to be able to pass also a function which will >>>>>>>>>> allow to the >>>>>>>>>> > user (or runner) control how is shard determined and what >>>>>>>>>> key will >>>>>>>>>> > be used to represent it >>>>>>>>>> > >>>>>>>>>> > interface ShardingFunction[UserT, DestinationT, ShardKeyT] >>>>>>>>>> extends >>>>>>>>>> > Serializable { >>>>>>>>>> > ShardKeyT assign(DestinationT destination, UserT >>>>>>>>>> element, >>>>>>>>>> > shardCount: Integer); >>>>>>>>>> > } >>>>>>>>>> > >>>>>>>>>> > Default implementation can be what is right now => random >>>>>>>>>> shard >>>>>>>>>> > encapsulated as ShardedKey<Integer>. >>>>>>>>>> > >>>>>>>>>> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax < >>>>>>>>>> re...@google.com >>>>>>>>>> > <mailto:re...@google.com>> wrote: >>>>>>>>>> > >>>>>>>>>> > If sharding is not specified, then the semantics are >>>>>>>>>> > "runner-determined sharding." The DataflowRunner >>>>>>>>>> already takes >>>>>>>>>> > advantage of this to impose its own sharding if the >>>>>>>>>> user hasn't >>>>>>>>>> > specified an explicit one. Could the Flink runner do >>>>>>>>>> the same >>>>>>>>>> > instead of pushing this to the users? >>>>>>>>>> > >>>>>>>>>> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels >>>>>>>>>> > <m...@apache.org <mailto:m...@apache.org>> wrote: >>>>>>>>>> > >>>>>>>>>> > Hi Jozef, >>>>>>>>>> > >>>>>>>>>> > For sharding in FileIO there are basically two >>>>>>>>>> options: >>>>>>>>>> > >>>>>>>>>> > (1) num_shards ~= num_workers => bad spread of the >>>>>>>>>> load >>>>>>>>>> > across workers >>>>>>>>>> > (2) num_shards >> num_workers => good spread of the >>>>>>>>>> load >>>>>>>>>> > across workers, >>>>>>>>>> > but huge number of files >>>>>>>>>> > >>>>>>>>>> > Your approach would give users control over the >>>>>>>>>> sharding >>>>>>>>>> > keys such that >>>>>>>>>> > they could be adjusted to spread load more evenly. >>>>>>>>>> > >>>>>>>>>> > I'd like to hear from Beam IO experts if that would >>>>>>>>>> make sense. >>>>>>>>>> > >>>>>>>>>> > Thanks, >>>>>>>>>> > Max >>>>>>>>>> > >>>>>>>>>> > On 25.04.19 08:52, Jozef Vilcek wrote: >>>>>>>>>> > > Hello, >>>>>>>>>> > > >>>>>>>>>> > > Right now, if someone needs sharded files via >>>>>>>>>> FileIO, >>>>>>>>>> > there is only one >>>>>>>>>> > > option which is random (round robin) shard >>>>>>>>>> assignment per >>>>>>>>>> > element and it >>>>>>>>>> > > always use ShardedKey<Integer> as a key for the >>>>>>>>>> GBK which >>>>>>>>>> > follows. >>>>>>>>>> > > >>>>>>>>>> > > I would like to generalize this and have a >>>>>>>>>> possibility to >>>>>>>>>> > provide some >>>>>>>>>> > > ShardingFn[UserT, DestinationT, ShardKeyT] via >>>>>>>>>> FileIO. >>>>>>>>>> > > What I am mainly after is, to have a possibility >>>>>>>>>> to >>>>>>>>>> > provide optimisation >>>>>>>>>> > > for Flink runtime and pass in a special function >>>>>>>>>> which >>>>>>>>>> > generates shard >>>>>>>>>> > > keys in a way that they are evenly spread among >>>>>>>>>> workers >>>>>>>>>> > (BEAM-5865). >>>>>>>>>> > > >>>>>>>>>> > > Would such extension for FileIO make sense? If >>>>>>>>>> yes, I >>>>>>>>>> > would create a >>>>>>>>>> > > ticket for it and try to draft a PR. >>>>>>>>>> > > >>>>>>>>>> > > Best, >>>>>>>>>> > > Jozef >>>>>>>>>> > >>>>>>>>>> >>>>>>>>>