Hi Jan, The approach works when your pipeline doesn't have too many operators. And the operator that needs the highest parallelism can only use at most #total_task_slots / #operators resources available in the cluster.
Another downside is wasted resources for other smaller operators who cannot make full use of task slots assigned to them. You might see only 1/10 tasks running while the other 9/10 tasks idle for an operator with parallelism 10, especially when it's doing some aggregation like a SUM. One redeeming method is that, for operators following another operator with high fanout, we can explicitly add a Reshuffle to allow a higher parallelism. But this circles back to the first downside: if your pipeline has exponentially high fanout through it, setting a single parallelism for the whole pipeline is not ideal because it limits the scalability of your pipeline significantly. Ning. On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote: > Hi, > > this topic was discussed many years ago and the conclusion there was that > setting the parallelism of individual operators via FlinkPipelineOptions > (or ResourceHints) is be possible, but would be somewhat cumbersome. > Although I understand that it "feels" weird to have high parallelism for > operators with small inputs, does this actually bring any relevant > performance impact? I always use parallelism based on the largest operator > in the Pipeline and this seems to work just fine. Is there any particular > need or measurable impact of such approach? > > Jan > On 4/19/23 17:23, Nimalan Mahendran wrote: > > Same need here, using Flink runner. We are processing a pcollection > (extracting features per element) then combining these into groups of > features and running the next operator on those groups. > > Each group contains ~50 elements, so the parallelism of the operator > upstream of the groupby should be higher, to be balanced with the > downstream operator. > > On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zjf...@gmail.com> wrote: > >> Hi Reuven, >> >> It would be better to set parallelism for operators, as I mentioned >> before, there may be multiple groupby, join operators in one pipeline, and >> their parallelism can be different due to different input data sizes. >> >> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote: >> >>> Jeff - does setting the global default work for you, or do you need >>> per-operator control? Seems like it would be to add this to ResourceHints. >>> >>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <rober...@google.com> >>> wrote: >>> >>>> Yeah, I don't think we have a good per-operator API for this. If we >>>> were to add it, it probably belongs in ResourceHints. >>>> >>>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> wrote: >>>> >>>>> Looking at FlinkPipelineOptions, there is a parallelism option you can >>>>> set. I believe this sets the default parallelism for all Flink operators. >>>>> >>>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zjf...@gmail.com> wrote: >>>>> >>>>>> Thanks Holden, this would work for Spark, but Flink doesn't have such >>>>>> kind of mechanism, so I am looking for a general solution on the beam >>>>>> side. >>>>>> >>>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <hol...@pigscanfly.ca> >>>>>> wrote: >>>>>> >>>>>>> To a (small) degree Sparks “new” AQE might be able to help depending >>>>>>> on what kind of operations Beam is compiling it down to. >>>>>>> >>>>>>> Have you tried setting spark.sql.adaptive.enabled & >>>>>>> spark.sql.adaptive.coalescePartitions.enabled >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user < >>>>>>> user@beam.apache.org> wrote: >>>>>>> >>>>>>>> I see. Robert - what is the story for parallelism controls on >>>>>>>> GBK with the Spark or Flink runners? >>>>>>>> >>>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zjf...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> No, I don't use dataflow, I use Spark & Flink. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike >>>>>>>>>> Spark and Flink - dynamically modifies the parallelism as the >>>>>>>>>> operator >>>>>>>>>> runs, so there is no need to have such controls. In fact these >>>>>>>>>> specific >>>>>>>>>> controls wouldn't make much sense for the way Dataflow implements >>>>>>>>>> these >>>>>>>>>> operators. >>>>>>>>>> >>>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zjf...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Just for performance tuning like in Spark and Flink. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user < >>>>>>>>>>> user@beam.apache.org> wrote: >>>>>>>>>>> >>>>>>>>>>>> What are you trying to achieve by setting the parallelism? >>>>>>>>>>>> >>>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zjf...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in >>>>>>>>>>>>> operator level. And the input size of the operator is unknown at >>>>>>>>>>>>> compiling >>>>>>>>>>>>> stage if it is not a source >>>>>>>>>>>>> operator, >>>>>>>>>>>>> >>>>>>>>>>>>> Here's an example of flink >>>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level >>>>>>>>>>>>> Spark also support to set operator level parallelism (see >>>>>>>>>>>>> groupByKey >>>>>>>>>>>>> and reduceByKey): >>>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user < >>>>>>>>>>>>> user@beam.apache.org> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> The maximum parallelism is always determined by the >>>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, >>>>>>>>>>>>>> the number of >>>>>>>>>>>>>> keys in your data determines the maximum parallelism. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Beyond the limitations in your data, it depends on your >>>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed >>>>>>>>>>>>>> to >>>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be >>>>>>>>>>>>>> dynamically >>>>>>>>>>>>>> split and moved around between workers, the number of workers >>>>>>>>>>>>>> will >>>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the >>>>>>>>>>>>>> parallelism of >>>>>>>>>>>>>> the execution. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zjf...@gmail.com> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any way >>>>>>>>>>>>>>> to set parallelism for individual operators like group by and >>>>>>>>>>>>>>> join? I >>>>>>>>>>>>>>> understand the parallelism setting depends on the underlying >>>>>>>>>>>>>>> execution >>>>>>>>>>>>>>> engine, but it is very common to set parallelism like group by >>>>>>>>>>>>>>> and join in >>>>>>>>>>>>>>> both spark & flink. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Jeff Zhang >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Best Regards >>>>>>>>>>>>> >>>>>>>>>>>>> Jeff Zhang >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Best Regards >>>>>>>>>>> >>>>>>>>>>> Jeff Zhang >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Best Regards >>>>>>>>> >>>>>>>>> Jeff Zhang >>>>>>>>> >>>>>>>> -- >>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>> Books (Learning Spark, High Performance Spark, etc.): >>>>>>> https://amzn.to/2MaRAG9 <https://amzn.to/2MaRAG9> >>>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best Regards >>>>>> >>>>>> Jeff Zhang >>>>>> >>>>> >> >> -- >> Best Regards >> >> Jeff Zhang >> >