Yes, I was able to use it in Flink and I do see performance gain. I also see, which is important for me, more predictable and uniform memory usage among workers
On Wed, May 8, 2019 at 7:19 AM Reuven Lax <re...@google.com> wrote: > So you were able to use this in Flink? Did you see performance gains? > > On Sun, May 5, 2019 at 5:25 AM Jozef Vilcek <jozo.vil...@gmail.com> wrote: > >> Sorry, it took a while. I wanted to actually use this extension for >> WriteFiles in Flink and see it works and that proved too be a bit bumpy. >> PR is at https://github.com/apache/beam/pull/8499 >> >> On Thu, May 2, 2019 at 3:22 PM Reuven Lax <re...@google.com> wrote: >> >>> Great, let me know when to take another look at the PR! >>> >>> Reuven >>> >>> On Wed, May 1, 2019 at 6:47 AM Jozef Vilcek <jozo.vil...@gmail.com> >>> wrote: >>> >>>> That coder is added extra as a re-map stage from "original" key to new >>>> ShardAwareKey ... But pipeline might get broken I guess. >>>> Very fair point. I am having a second thought pass over this and will >>>> try to simplify it much more >>>> >>>> On Wed, May 1, 2019 at 2:12 PM Reuven Lax <re...@google.com> wrote: >>>> >>>>> I haven't looked at the PR in depth yet, but it appears that someone >>>>> running a pipeline today who then tries to update post this PR will have >>>>> the coder change to DefaultShardKeyCoder, even if they haven't picked any >>>>> custom function. Is that correct, or am I misreading things? >>>>> >>>>> Reuven >>>>> >>>>> On Tue, Apr 30, 2019 at 8:42 AM Jozef Vilcek <jozo.vil...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hm, what would be the scenario? Have version A running with original >>>>>> random sharding and then start version B where I change sharding to some >>>>>> custom function? >>>>>> So I have to enable the pipeline to digest old keys from GBK restored >>>>>> state and also work with new keys produced to GBK going forward? >>>>>> >>>>>> On Tue, Apr 30, 2019 at 5:32 PM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> Initial thought on PR: we usually try to limit changing coders in >>>>>>> these types of transforms to better support runners that allow in-place >>>>>>> updates of pipelines. Can this be done without changing the coder? >>>>>>> >>>>>>> On Tue, Apr 30, 2019 at 8:21 AM Jozef Vilcek <jozo.vil...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> I have created a PR for enhancing WriteFiles for custom sharding >>>>>>>> function. >>>>>>>> https://github.com/apache/beam/pull/8438 >>>>>>>> >>>>>>>> If this sort of change looks good, then next step would be to use >>>>>>>> in in Flink runner transform override. Let me know what do you think >>>>>>>> >>>>>>>> On Fri, Apr 26, 2019 at 9:24 AM Jozef Vilcek <jozo.vil...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I guess it is fine to enable shardingFn control only on WriteFiles >>>>>>>>> level rather than FileIO. On WriteFiles it can be manipulated in >>>>>>>>> PTransformOverride by runner. >>>>>>>>> >>>>>>>>> On Thu, Apr 25, 2019 at 6:17 PM Reuven Lax <re...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Yes, a hook would have to be added to allow specifying a >>>>>>>>>> different function for choosing the shard number (I assume the >>>>>>>>>> problem is >>>>>>>>>> that there are cases where the current random assignment is not >>>>>>>>>> good?). >>>>>>>>>> However this can be set using PTransformOverride, we ideally >>>>>>>>>> shouldn't >>>>>>>>>> force the user to know details of the runner when writing their code. >>>>>>>>>> >>>>>>>>>> On Thu, Apr 25, 2019 at 7:52 AM Maximilian Michels < >>>>>>>>>> m...@apache.org> wrote: >>>>>>>>>> >>>>>>>>>>> Reuven is talking about PTransformOverride, e.g. >>>>>>>>>>> FlinkTransformOverrides. We already use this to determine the >>>>>>>>>>> number of >>>>>>>>>>> shards in case of Runner-determined sharding. >>>>>>>>>>> >>>>>>>>>>> Not sure if that would work for Jozef's case because setting the >>>>>>>>>>> number >>>>>>>>>>> of shards is not enough. We want to set the shard key directly >>>>>>>>>>> and that >>>>>>>>>>> logic is buried inside WriteFiles. >>>>>>>>>>> >>>>>>>>>>> -Max >>>>>>>>>>> >>>>>>>>>>> On 25.04.19 16:30, Reuven Lax wrote: >>>>>>>>>>> > Actually the runner is free to perform surgery on the graph. >>>>>>>>>>> The >>>>>>>>>>> > FlinkRunner can insert a custom function to determine the >>>>>>>>>>> sharding keys. >>>>>>>>>>> > >>>>>>>>>>> > On Thu, Apr 25, 2019 at 7:28 AM Jozef Vilcek < >>>>>>>>>>> jozo.vil...@gmail.com >>>>>>>>>>> > <mailto:jozo.vil...@gmail.com>> wrote: >>>>>>>>>>> > >>>>>>>>>>> > Right now, sharding can be specified only via target >>>>>>>>>>> `shardCount`, >>>>>>>>>>> > be it user or runner. Next to configurable shardCount, I am >>>>>>>>>>> > proposing to be able to pass also a function which will >>>>>>>>>>> allow to the >>>>>>>>>>> > user (or runner) control how is shard determined and what >>>>>>>>>>> key will >>>>>>>>>>> > be used to represent it >>>>>>>>>>> > >>>>>>>>>>> > interface ShardingFunction[UserT, DestinationT, >>>>>>>>>>> ShardKeyT] extends >>>>>>>>>>> > Serializable { >>>>>>>>>>> > ShardKeyT assign(DestinationT destination, UserT >>>>>>>>>>> element, >>>>>>>>>>> > shardCount: Integer); >>>>>>>>>>> > } >>>>>>>>>>> > >>>>>>>>>>> > Default implementation can be what is right now => random >>>>>>>>>>> shard >>>>>>>>>>> > encapsulated as ShardedKey<Integer>. >>>>>>>>>>> > >>>>>>>>>>> > On Thu, Apr 25, 2019 at 4:07 PM Reuven Lax < >>>>>>>>>>> re...@google.com >>>>>>>>>>> > <mailto:re...@google.com>> wrote: >>>>>>>>>>> > >>>>>>>>>>> > If sharding is not specified, then the semantics are >>>>>>>>>>> > "runner-determined sharding." The DataflowRunner >>>>>>>>>>> already takes >>>>>>>>>>> > advantage of this to impose its own sharding if the >>>>>>>>>>> user hasn't >>>>>>>>>>> > specified an explicit one. Could the Flink runner do >>>>>>>>>>> the same >>>>>>>>>>> > instead of pushing this to the users? >>>>>>>>>>> > >>>>>>>>>>> > On Thu, Apr 25, 2019 at 6:23 AM Maximilian Michels >>>>>>>>>>> > <m...@apache.org <mailto:m...@apache.org>> wrote: >>>>>>>>>>> > >>>>>>>>>>> > Hi Jozef, >>>>>>>>>>> > >>>>>>>>>>> > For sharding in FileIO there are basically two >>>>>>>>>>> options: >>>>>>>>>>> > >>>>>>>>>>> > (1) num_shards ~= num_workers => bad spread of the >>>>>>>>>>> load >>>>>>>>>>> > across workers >>>>>>>>>>> > (2) num_shards >> num_workers => good spread of >>>>>>>>>>> the load >>>>>>>>>>> > across workers, >>>>>>>>>>> > but huge number of files >>>>>>>>>>> > >>>>>>>>>>> > Your approach would give users control over the >>>>>>>>>>> sharding >>>>>>>>>>> > keys such that >>>>>>>>>>> > they could be adjusted to spread load more evenly. >>>>>>>>>>> > >>>>>>>>>>> > I'd like to hear from Beam IO experts if that >>>>>>>>>>> would make sense. >>>>>>>>>>> > >>>>>>>>>>> > Thanks, >>>>>>>>>>> > Max >>>>>>>>>>> > >>>>>>>>>>> > On 25.04.19 08:52, Jozef Vilcek wrote: >>>>>>>>>>> > > Hello, >>>>>>>>>>> > > >>>>>>>>>>> > > Right now, if someone needs sharded files via >>>>>>>>>>> FileIO, >>>>>>>>>>> > there is only one >>>>>>>>>>> > > option which is random (round robin) shard >>>>>>>>>>> assignment per >>>>>>>>>>> > element and it >>>>>>>>>>> > > always use ShardedKey<Integer> as a key for the >>>>>>>>>>> GBK which >>>>>>>>>>> > follows. >>>>>>>>>>> > > >>>>>>>>>>> > > I would like to generalize this and have a >>>>>>>>>>> possibility to >>>>>>>>>>> > provide some >>>>>>>>>>> > > ShardingFn[UserT, DestinationT, ShardKeyT] via >>>>>>>>>>> FileIO. >>>>>>>>>>> > > What I am mainly after is, to have a >>>>>>>>>>> possibility to >>>>>>>>>>> > provide optimisation >>>>>>>>>>> > > for Flink runtime and pass in a special >>>>>>>>>>> function which >>>>>>>>>>> > generates shard >>>>>>>>>>> > > keys in a way that they are evenly spread among >>>>>>>>>>> workers >>>>>>>>>>> > (BEAM-5865). >>>>>>>>>>> > > >>>>>>>>>>> > > Would such extension for FileIO make sense? If >>>>>>>>>>> yes, I >>>>>>>>>>> > would create a >>>>>>>>>>> > > ticket for it and try to draft a PR. >>>>>>>>>>> > > >>>>>>>>>>> > > Best, >>>>>>>>>>> > > Jozef >>>>>>>>>>> > >>>>>>>>>>> >>>>>>>>>>