Hi Reuven, It would be better to set parallelism for operators, as I mentioned before, there may be multiple groupby, join operators in one pipeline, and their parallelism can be different due to different input data sizes.
On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote: > Jeff - does setting the global default work for you, or do you need > per-operator control? Seems like it would be to add this to ResourceHints. > > On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <rober...@google.com> > wrote: > >> Yeah, I don't think we have a good per-operator API for this. If we were >> to add it, it probably belongs in ResourceHints. >> >> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> wrote: >> >>> Looking at FlinkPipelineOptions, there is a parallelism option you can >>> set. I believe this sets the default parallelism for all Flink operators. >>> >>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zjf...@gmail.com> wrote: >>> >>>> Thanks Holden, this would work for Spark, but Flink doesn't have such >>>> kind of mechanism, so I am looking for a general solution on the beam side. >>>> >>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <hol...@pigscanfly.ca> >>>> wrote: >>>> >>>>> To a (small) degree Sparks “new” AQE might be able to help depending >>>>> on what kind of operations Beam is compiling it down to. >>>>> >>>>> Have you tried setting spark.sql.adaptive.enabled & >>>>> spark.sql.adaptive.coalescePartitions.enabled >>>>> >>>>> >>>>> >>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user < >>>>> user@beam.apache.org> wrote: >>>>> >>>>>> I see. Robert - what is the story for parallelism controls on >>>>>> GBK with the Spark or Flink runners? >>>>>> >>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zjf...@gmail.com> wrote: >>>>>> >>>>>>> No, I don't use dataflow, I use Spark & Flink. >>>>>>> >>>>>>> >>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com> wrote: >>>>>>> >>>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike >>>>>>>> Spark and Flink - dynamically modifies the parallelism as the operator >>>>>>>> runs, so there is no need to have such controls. In fact these specific >>>>>>>> controls wouldn't make much sense for the way Dataflow implements these >>>>>>>> operators. >>>>>>>> >>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zjf...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Just for performance tuning like in Spark and Flink. >>>>>>>>> >>>>>>>>> >>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user < >>>>>>>>> user@beam.apache.org> wrote: >>>>>>>>> >>>>>>>>>> What are you trying to achieve by setting the parallelism? >>>>>>>>>> >>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zjf...@gmail.com> >>>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in operator >>>>>>>>>>> level. And the input size of the operator is unknown at compiling >>>>>>>>>>> stage if >>>>>>>>>>> it is not a source >>>>>>>>>>> operator, >>>>>>>>>>> >>>>>>>>>>> Here's an example of flink >>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level >>>>>>>>>>> Spark also support to set operator level parallelism (see groupByKey >>>>>>>>>>> and reduceByKey): >>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user < >>>>>>>>>>> user@beam.apache.org> wrote: >>>>>>>>>>> >>>>>>>>>>>> The maximum parallelism is always determined by the parallelism >>>>>>>>>>>> of your data. If you do a GroupByKey for example, the number of >>>>>>>>>>>> keys in >>>>>>>>>>>> your data determines the maximum parallelism. >>>>>>>>>>>> >>>>>>>>>>>> Beyond the limitations in your data, it depends on your >>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed to >>>>>>>>>>>> automatically determine the parallelism (e.g. work will be >>>>>>>>>>>> dynamically >>>>>>>>>>>> split and moved around between workers, the number of workers will >>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the >>>>>>>>>>>> parallelism of >>>>>>>>>>>> the execution. >>>>>>>>>>>> >>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zjf...@gmail.com> >>>>>>>>>>>> wrote: >>>>>>>>>>>> >>>>>>>>>>>>> Besides the global parallelism of beam job, is there any way >>>>>>>>>>>>> to set parallelism for individual operators like group by and >>>>>>>>>>>>> join? I >>>>>>>>>>>>> understand the parallelism setting depends on the underlying >>>>>>>>>>>>> execution >>>>>>>>>>>>> engine, but it is very common to set parallelism like group by >>>>>>>>>>>>> and join in >>>>>>>>>>>>> both spark & flink. >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> -- >>>>>>>>>>>>> Best Regards >>>>>>>>>>>>> >>>>>>>>>>>>> Jeff Zhang >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> -- >>>>>>>>>>> Best Regards >>>>>>>>>>> >>>>>>>>>>> Jeff Zhang >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>>> -- >>>>>>>>> Best Regards >>>>>>>>> >>>>>>>>> Jeff Zhang >>>>>>>>> >>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Best Regards >>>>>>> >>>>>>> Jeff Zhang >>>>>>> >>>>>> -- >>>>> Twitter: https://twitter.com/holdenkarau >>>>> Books (Learning Spark, High Performance Spark, etc.): >>>>> https://amzn.to/2MaRAG9 <https://amzn.to/2MaRAG9> >>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau >>>>> >>>> >>>> >>>> -- >>>> Best Regards >>>> >>>> Jeff Zhang >>>> >>> -- Best Regards Jeff Zhang