Same need here, using Flink runner. We are processing a pcollection
(extracting features per element) then combining these into groups of
features and running the next operator on those groups.

Each group contains ~50 elements, so the parallelism of the operator
upstream of the groupby should be higher, to be balanced with the
downstream operator.

On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zjf...@gmail.com> wrote:

> Hi Reuven,
>
> It would be better to set parallelism for operators, as I mentioned
> before, there may be multiple groupby, join operators in one pipeline, and
> their parallelism can be different due to different input data sizes.
>
> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote:
>
>> Jeff - does setting the global default work for you, or do you need
>> per-operator control? Seems like it would be to add this to ResourceHints.
>>
>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <rober...@google.com>
>> wrote:
>>
>>> Yeah, I don't think we have a good per-operator API for this. If we were
>>> to add it, it probably belongs in ResourceHints.
>>>
>>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> wrote:
>>>
>>>> Looking at FlinkPipelineOptions, there is a parallelism option you can
>>>> set. I believe this sets the default parallelism for all Flink operators.
>>>>
>>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zjf...@gmail.com> wrote:
>>>>
>>>>> Thanks Holden, this would work for Spark, but Flink doesn't have such
>>>>> kind of mechanism, so I am looking for a general solution on the beam 
>>>>> side.
>>>>>
>>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <hol...@pigscanfly.ca>
>>>>> wrote:
>>>>>
>>>>>> To a (small) degree Sparks “new” AQE might be able to help depending
>>>>>> on what kind of operations Beam is compiling it down to.
>>>>>>
>>>>>> Have you tried setting spark.sql.adaptive.enabled &
>>>>>> spark.sql.adaptive.coalescePartitions.enabled
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>>>>> user@beam.apache.org> wrote:
>>>>>>
>>>>>>> I see. Robert - what is the story for parallelism controls on
>>>>>>> GBK with the Spark or Flink runners?
>>>>>>>
>>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zjf...@gmail.com> wrote:
>>>>>>>
>>>>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike
>>>>>>>>> Spark and Flink - dynamically modifies the parallelism as the operator
>>>>>>>>> runs, so there is no need to have such controls. In fact these 
>>>>>>>>> specific
>>>>>>>>> controls wouldn't make much sense for the way Dataflow implements 
>>>>>>>>> these
>>>>>>>>> operators.
>>>>>>>>>
>>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zjf...@gmail.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>
>>>>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>>>>
>>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zjf...@gmail.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in
>>>>>>>>>>>> operator level. And the input size of the operator is unknown at 
>>>>>>>>>>>> compiling
>>>>>>>>>>>> stage if it is not a source
>>>>>>>>>>>>  operator,
>>>>>>>>>>>>
>>>>>>>>>>>> Here's an example of flink
>>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>>>>> Spark also support to set operator level parallelism (see 
>>>>>>>>>>>> groupByKey
>>>>>>>>>>>> and reduceByKey):
>>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> The maximum parallelism is always determined by the
>>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, the 
>>>>>>>>>>>>> number of
>>>>>>>>>>>>> keys in your data determines the maximum parallelism.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Beyond the limitations in your data, it depends on your
>>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed 
>>>>>>>>>>>>> to
>>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be 
>>>>>>>>>>>>> dynamically
>>>>>>>>>>>>> split and moved around between workers, the number of workers will
>>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the 
>>>>>>>>>>>>> parallelism of
>>>>>>>>>>>>> the execution.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zjf...@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any way
>>>>>>>>>>>>>> to set parallelism for individual operators like group by and 
>>>>>>>>>>>>>> join? I
>>>>>>>>>>>>>> understand the parallelism setting depends on the underlying 
>>>>>>>>>>>>>> execution
>>>>>>>>>>>>>> engine, but it is very common to set parallelism like group by 
>>>>>>>>>>>>>> and join in
>>>>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> --
>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> --
>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>
>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>>> Best Regards
>>>>>>>>>>
>>>>>>>>>> Jeff Zhang
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> Jeff Zhang
>>>>>>>>
>>>>>>> --
>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Best Regards
>>>>>
>>>>> Jeff Zhang
>>>>>
>>>>
>
> --
> Best Regards
>
> Jeff Zhang
>

Reply via email to