Hi, There is a flink pipeline option `parallelism` that you can set: https://github.com/apache/beam/blob/master/sdks/python/apache_beam/options/pipeline_options.py#L1504-L1510 .
This parallelism is applied to each step (there is no API to configure a different value for each step). So if you have 10 steps and set the parallelism to 10, there will be 100 tasks created. You may use the `max_parallelism` to limit the pipeline wide parallelism. The reason you want to limit the max_parallelism is that a Flink cluster might run into network issues when there are too many tasks running in parallel. You can configure the flink cluster through configurations (an example <https://github.com/apache/beam/blob/master/sdks/python/apache_beam/runners/interactive/dataproc/dataproc_cluster_manager.py#L56-L74>) to allocate more resources to the task manager if applicable (say you have the access to control the cluster's creation) to increase the capacity of concurrent tasks. This is specific to Flink, you can find more guidance from Flink's document: https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/config/ . Ning. On Wed, Apr 19, 2023 at 8:23 AM Nimalan Mahendran < [email protected]> wrote: > Same need here, using Flink runner. We are processing a pcollection > (extracting features per element) then combining these into groups of > features and running the next operator on those groups. > > Each group contains ~50 elements, so the parallelism of the operator > upstream of the groupby should be higher, to be balanced with the > downstream operator. > > On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <[email protected]> wrote: > >> Hi Reuven, >> >> It would be better to set parallelism for operators, as I mentioned >> before, there may be multiple groupby, join operators in one pipeline, and >> their parallelism can be different due to different input data sizes. >> >> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <[email protected]> wrote: >> >>> Jeff - does setting the global default work for you, or do you need >>> per-operator control? Seems like it would be to add this to ResourceHints. >>> >>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <[email protected]> >>> wrote: >>> >>>> Yeah, I don't think we have a good per-operator API for this. If we >>>> were to add it, it probably belongs in ResourceHints. >>>> >>>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <[email protected]> wrote: >>>> >>>>> Looking at FlinkPipelineOptions, there is a parallelism option you can >>>>> set. I believe this sets the default parallelism for all Flink operators. >>>>> >>>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <[email protected]> wrote: >>>>> >>>>>> Thanks Holden, this would work for Spark, but Flink doesn't have such >>>>>> kind of mechanism, so I am looking for a general solution on the beam >>>>>> side. >>>>>> >>>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <[email protected]> >>>>>> wrote: >>>>>> >>>>>>> To a (small) degree Sparks “new” AQE might be able to help depending >>>>>>> on what kind of operations Beam is compiling it down to. >>>>>>> >>>>>>> Have you tried setting spark.sql.adaptive.enabled & >>>>>>> spark.sql.adaptive.coalescePartitions.enabled >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user < >>>>>>> [email protected]> wrote: >>>>>>> >>>>>>>> I see. Robert - what is the story for parallelism controls on >>>>>>>> GBK with the Spark or Flink runners? >>>>>>>> >>>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <[email protected]> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> No, I don't use dataflow, I use Spark & Flink. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <[email protected]> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike >>>>>>>>>> Spark and Flink - dynamically modifies the parallelism as the >>>>>>>>>> operator >>>>>>>>>> runs, so there is no need to have such controls. In fact these >>>>>>>>>> specific >>>>>>>>>> controls wouldn't make much sense for the way Dataflow implements >>>>>>>>>> these >>>>>>>>>> operators. >>>>>>>>>> >>>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <[email protected]> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Just for performance tuning like in Spark and Flink. >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user < >>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>> >>>>>>>>>>>> What are you trying to achieve by setting the parallelism? >>>>>>>>>>>> >>>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <[email protected]> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in >>>>>>>>>>>>> operator level. And the input size of the operator is unknown at >>>>>>>>>>>>> compiling >>>>>>>>>>>>> stage if it is not a source >>>>>>>>>>>>> operator, >>>>>>>>>>>>> >>>>>>>>>>>>> Here's an example of flink >>>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level >>>>>>>>>>>>> Spark also support to set operator level parallelism (see >>>>>>>>>>>>> groupByKey >>>>>>>>>>>>> and reduceByKey): >>>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user < >>>>>>>>>>>>> [email protected]> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> The maximum parallelism is always determined by the >>>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, >>>>>>>>>>>>>> the number of >>>>>>>>>>>>>> keys in your data determines the maximum parallelism. >>>>>>>>>>>>>> >>>>>>>>>>>>>> Beyond the limitations in your data, it depends on your >>>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed >>>>>>>>>>>>>> to >>>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be >>>>>>>>>>>>>> dynamically >>>>>>>>>>>>>> split and moved around between workers, the number of workers >>>>>>>>>>>>>> will >>>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the >>>>>>>>>>>>>> parallelism of >>>>>>>>>>>>>> the execution. >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <[email protected]> >>>>>>>>>>>>>> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any way >>>>>>>>>>>>>>> to set parallelism for individual operators like group by and >>>>>>>>>>>>>>> join? I >>>>>>>>>>>>>>> understand the parallelism setting depends on the underlying >>>>>>>>>>>>>>> execution >>>>>>>>>>>>>>> engine, but it is very common to set parallelism like group by >>>>>>>>>>>>>>> and join in >>>>>>>>>>>>>>> both spark & flink. >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> -- >>>>>>>>>>>>>>> Best Regards >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> Jeff Zhang >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Best Regards >>>>>>>>>>>>> >>>>>>>>>>>>> Jeff Zhang >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Best Regards >>>>>>>>>>> >>>>>>>>>>> Jeff Zhang >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Best Regards >>>>>>>>> >>>>>>>>> Jeff Zhang >>>>>>>>> >>>>>>>> -- >>>>>>> Twitter: https://twitter.com/holdenkarau >>>>>>> Books (Learning Spark, High Performance Spark, etc.): >>>>>>> https://amzn.to/2MaRAG9 <https://amzn.to/2MaRAG9> >>>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau >>>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best Regards >>>>>> >>>>>> Jeff Zhang >>>>>> >>>>> >> >> -- >> Best Regards >> >> Jeff Zhang >> >
