+1 to not requiring details like this in the Beam model. There is, however, the question of how to pass such implementation-detail specific hints to a runner that requires them. Generally that's done via ResourceHints or annotations, and while the former seems a good fit it's primarily focused on setting up the right context for user code (which GBK is not).
A complete hack is to add an experiment like flink_parallelism_for_stage=STAGE_NAME:value. It'd be nice to do something cleaner. On Fri, Apr 21, 2023 at 10:37 AM Ning Kang via user <u...@beam.apache.org> wrote: > Hi Jan, > > To generalize the per-stage parallelism configuration, we should have a FR > proposing the capability to explicitly set autoscaling (in this case, fixed > size per stage) policy in Beam pipelines. > > Per-step or per-stage parallelism, or fusion/optimization is not part of > the Beam model. They are [Flink] runner implementation details and should > be configured for each runner. > > Also, when building the pipeline, it's not clear what the fusion looks > like until the pipeline is submitted to a runner, thus making configuration > of the parallelism/worker-per-stage not straightforward. > Flink's parallelism settings can be found here > <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/>, > it's still kind of a black box since you don't really know how many tasks > are actually spawned until you run a pipeline. > > That being said, if we have a general interface controlling how a pipeline > scales, each runner could adapt [auto]scaling in their own way. > For example, in a Flink job, each operator/stage's task slot is prorated > by their key numbers; the maximum parallelism is throttled by task slot > utilization. > Another example, in a Dataflow job, each stage horizontally scales by CPU > utilization; vertically scales by memory/disk utilization. > > +dev@beam.apache.org <dev@beam.apache.org> > Let's use this thread to discuss how to configure a pipeline for runners > so that they can scale workers appropriately without exposing > runner-specific details to the Beam model. > > Ning. > > > On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský <je...@seznam.cz> wrote: > >> Hi Ning, >> >> I might have missed that in the discussion, but we talk about batch >> execution, am I right? In streaming, all operators (PTransforms) of a >> Pipeline are run in the same slots, thus the downsides are limited. You can >> enforce streaming mode using --streaming command-line argument. But yes, >> this might have other implications. For batch only it obviously makes sense >> to limit parallelism of a (fused) 'stage', which is not an transform-level >> concept, but rather a more complex union of transforms divided by shuffle >> barrier. Would you be willing to start a follow-up thread in @dev mailing >> list for this for deeper discussion? >> >> Jan >> On 4/20/23 19:18, Ning Kang via user wrote: >> >> Hi Jan, >> >> The approach works when your pipeline doesn't have too many operators. >> And the operator that needs the highest parallelism can only use at most >> #total_task_slots / #operators resources available in the cluster. >> >> Another downside is wasted resources for other smaller operators who >> cannot make full use of task slots assigned to them. You might see only >> 1/10 tasks running while the other 9/10 tasks idle for an operator with >> parallelism 10, especially when it's doing some aggregation like a SUM. >> >> One redeeming method is that, for operators following another operator >> with high fanout, we can explicitly add a Reshuffle to allow a higher >> parallelism. But this circles back to the first downside: if your pipeline >> has exponentially high fanout through it, setting a single parallelism for >> the whole pipeline is not ideal because it limits the scalability of your >> pipeline significantly. >> >> Ning. >> >> >> On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote: >> >>> Hi, >>> >>> this topic was discussed many years ago and the conclusion there was >>> that setting the parallelism of individual operators via >>> FlinkPipelineOptions (or ResourceHints) is be possible, but would be >>> somewhat cumbersome. Although I understand that it "feels" weird to have >>> high parallelism for operators with small inputs, does this actually bring >>> any relevant performance impact? I always use parallelism based on the >>> largest operator in the Pipeline and this seems to work just fine. Is there >>> any particular need or measurable impact of such approach? >>> >>> Jan >>> On 4/19/23 17:23, Nimalan Mahendran wrote: >>> >>> Same need here, using Flink runner. We are processing a pcollection >>> (extracting features per element) then combining these into groups of >>> features and running the next operator on those groups. >>> >>> Each group contains ~50 elements, so the parallelism of the operator >>> upstream of the groupby should be higher, to be balanced with the >>> downstream operator. >>> >>> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zjf...@gmail.com> wrote: >>> >>>> Hi Reuven, >>>> >>>> It would be better to set parallelism for operators, as I mentioned >>>> before, there may be multiple groupby, join operators in one pipeline, and >>>> their parallelism can be different due to different input data sizes. >>>> >>>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote: >>>> >>>>> Jeff - does setting the global default work for you, or do you need >>>>> per-operator control? Seems like it would be to add this to ResourceHints. >>>>> >>>>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <rober...@google.com> >>>>> wrote: >>>>> >>>>>> Yeah, I don't think we have a good per-operator API for this. If we >>>>>> were to add it, it probably belongs in ResourceHints. >>>>>> >>>>>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> wrote: >>>>>> >>>>>>> Looking at FlinkPipelineOptions, there is a parallelism option you >>>>>>> can set. I believe this sets the default parallelism for all Flink >>>>>>> operators. >>>>>>> >>>>>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zjf...@gmail.com> wrote: >>>>>>> >>>>>>>> Thanks Holden, this would work for Spark, but Flink doesn't have >>>>>>>> such kind of mechanism, so I am looking for a general solution on the >>>>>>>> beam >>>>>>>> side. >>>>>>>> >>>>>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <hol...@pigscanfly.ca> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> To a (small) degree Sparks “new” AQE might be able to help >>>>>>>>> depending on what kind of operations Beam is compiling it down to. >>>>>>>>> >>>>>>>>> Have you tried setting spark.sql.adaptive.enabled & >>>>>>>>> spark.sql.adaptive.coalescePartitions.enabled >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user < >>>>>>>>> u...@beam.apache.org> wrote: >>>>>>>>> >>>>>>>>>> I see. Robert - what is the story for parallelism controls on >>>>>>>>>> GBK with the Spark or Flink runners? >>>>>>>>>> >>>>>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zjf...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> No, I don't use dataflow, I use Spark & Flink. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com> >>>>>>>>>>> wrote: >>>>>>>>>>> >>>>>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow - >>>>>>>>>>>> unlike Spark and Flink - dynamically modifies the parallelism as >>>>>>>>>>>> the >>>>>>>>>>>> operator runs, so there is no need to have such controls. In fact >>>>>>>>>>>> these >>>>>>>>>>>> specific controls wouldn't make much sense for the way Dataflow >>>>>>>>>>>> implements >>>>>>>>>>>> these operators. >>>>>>>>>>>> >>>>>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zjf...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Just for performance tuning like in Spark and Flink. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user < >>>>>>>>>>>>> u...@beam.apache.org> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> What are you trying to achieve by setting the parallelism? >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zjf...@gmail.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in >>>>>>>>>>>>>>> operator level. And the input size of the operator is unknown >>>>>>>>>>>>>>> at compiling >>>>>>>>>>>>>>> stage if it is not a source >>>>>>>>>>>>>>> operator, >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Here's an example of flink >>>>>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level >>>>>>>>>>>>>>> Spark also support to set operator level parallelism (see >>>>>>>>>>>>>>> groupByKey >>>>>>>>>>>>>>> and reduceByKey): >>>>>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user < >>>>>>>>>>>>>>> u...@beam.apache.org> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The maximum parallelism is always determined by the >>>>>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, >>>>>>>>>>>>>>>> the number of >>>>>>>>>>>>>>>> keys in your data determines the maximum parallelism. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Beyond the limitations in your data, it depends on your >>>>>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is >>>>>>>>>>>>>>>> designed to >>>>>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be >>>>>>>>>>>>>>>> dynamically >>>>>>>>>>>>>>>> split and moved around between workers, the number of workers >>>>>>>>>>>>>>>> will >>>>>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the >>>>>>>>>>>>>>>> parallelism of >>>>>>>>>>>>>>>> the execution. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang < >>>>>>>>>>>>>>>> zjf...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any >>>>>>>>>>>>>>>>> way to set parallelism for individual operators like group by >>>>>>>>>>>>>>>>> and join? I >>>>>>>>>>>>>>>>> understand the parallelism setting depends on the underlying >>>>>>>>>>>>>>>>> execution >>>>>>>>>>>>>>>>> engine, but it is very common to set parallelism like group >>>>>>>>>>>>>>>>> by and join in >>>>>>>>>>>>>>>>> both spark & flink. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Jeff Zhang >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Jeff Zhang >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Best Regards >>>>>>>>>>>>> >>>>>>>>>>>>> Jeff Zhang >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Best Regards >>>>>>>>>>> >>>>>>>>>>> Jeff Zhang >>>>>>>>>>> >>>>>>>>>> -- >>>>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>>>> Books (Learning Spark, High Performance Spark, etc.): >>>>>>>>> https://amzn.to/2MaRAG9 <https://amzn.to/2MaRAG9> >>>>>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> -- >>>>>>>> Best Regards >>>>>>>> >>>>>>>> Jeff Zhang >>>>>>>> >>>>>>> >>>> >>>> -- >>>> Best Regards >>>> >>>> Jeff Zhang >>>> >>>