Hi Jan,

The approach works when your pipeline doesn't have too many operators. And
the operator that needs the highest parallelism can only use at most
#total_task_slots / #operators resources available in the cluster.

Another downside is wasted resources for other smaller operators who cannot
make full use of task slots assigned to them. You might see only 1/10 tasks
running while the other 9/10 tasks idle for an operator with parallelism
10, especially when it's doing some aggregation like a SUM.

One redeeming method is that, for operators following another operator with
high fanout, we can explicitly add a Reshuffle to allow a higher
parallelism. But this circles back to the first downside: if your pipeline
has exponentially high fanout through it, setting a single parallelism for
the whole pipeline is not ideal because it limits the scalability of your
pipeline significantly.

Ning.


On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote:

> Hi,
>
> this topic was discussed many years ago and the conclusion there was that
> setting the parallelism of individual operators via FlinkPipelineOptions
> (or ResourceHints) is be possible, but would be somewhat cumbersome.
> Although I understand that it "feels" weird to have high parallelism for
> operators with small inputs, does this actually bring any relevant
> performance impact? I always use parallelism based on the largest operator
> in the Pipeline and this seems to work just fine. Is there any particular
> need or measurable impact of such approach?
>
>  Jan
> On 4/19/23 17:23, Nimalan Mahendran wrote:
>
> Same need here, using Flink runner. We are processing a pcollection
> (extracting features per element) then combining these into groups of
> features and running the next operator on those groups.
>
> Each group contains ~50 elements, so the parallelism of the operator
> upstream of the groupby should be higher, to be balanced with the
> downstream operator.
>
> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zjf...@gmail.com> wrote:
>
>> Hi Reuven,
>>
>> It would be better to set parallelism for operators, as I mentioned
>> before, there may be multiple groupby, join operators in one pipeline, and
>> their parallelism can be different due to different input data sizes.
>>
>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote:
>>
>>> Jeff - does setting the global default work for you, or do you need
>>> per-operator control? Seems like it would be to add this to ResourceHints.
>>>
>>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <rober...@google.com>
>>> wrote:
>>>
>>>> Yeah, I don't think we have a good per-operator API for this. If we
>>>> were to add it, it probably belongs in ResourceHints.
>>>>
>>>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Looking at FlinkPipelineOptions, there is a parallelism option you can
>>>>> set. I believe this sets the default parallelism for all Flink operators.
>>>>>
>>>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zjf...@gmail.com> wrote:
>>>>>
>>>>>> Thanks Holden, this would work for Spark, but Flink doesn't have such
>>>>>> kind of mechanism, so I am looking for a general solution on the beam 
>>>>>> side.
>>>>>>
>>>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <hol...@pigscanfly.ca>
>>>>>> wrote:
>>>>>>
>>>>>>> To a (small) degree Sparks “new” AQE might be able to help depending
>>>>>>> on what kind of operations Beam is compiling it down to.
>>>>>>>
>>>>>>> Have you tried setting spark.sql.adaptive.enabled &
>>>>>>> spark.sql.adaptive.coalescePartitions.enabled
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>>>>>> user@beam.apache.org> wrote:
>>>>>>>
>>>>>>>> I see. Robert - what is the story for parallelism controls on
>>>>>>>> GBK with the Spark or Flink runners?
>>>>>>>>
>>>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zjf...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com>
>>>>>>>>> wrote:
>>>>>>>>>
>>>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow - unlike
>>>>>>>>>> Spark and Flink - dynamically modifies the parallelism as the 
>>>>>>>>>> operator
>>>>>>>>>> runs, so there is no need to have such controls. In fact these 
>>>>>>>>>> specific
>>>>>>>>>> controls wouldn't make much sense for the way Dataflow implements 
>>>>>>>>>> these
>>>>>>>>>> operators.
>>>>>>>>>>
>>>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zjf...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zjf...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in
>>>>>>>>>>>>> operator level. And the input size of the operator is unknown at 
>>>>>>>>>>>>> compiling
>>>>>>>>>>>>> stage if it is not a source
>>>>>>>>>>>>>  operator,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Here's an example of flink
>>>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>>>>>> Spark also support to set operator level parallelism (see 
>>>>>>>>>>>>> groupByKey
>>>>>>>>>>>>> and reduceByKey):
>>>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>>>>>> user@beam.apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> The maximum parallelism is always determined by the
>>>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, 
>>>>>>>>>>>>>> the number of
>>>>>>>>>>>>>> keys in your data determines the maximum parallelism.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Beyond the limitations in your data, it depends on your
>>>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is designed 
>>>>>>>>>>>>>> to
>>>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be 
>>>>>>>>>>>>>> dynamically
>>>>>>>>>>>>>> split and moved around between workers, the number of workers 
>>>>>>>>>>>>>> will
>>>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the 
>>>>>>>>>>>>>> parallelism of
>>>>>>>>>>>>>> the execution.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <zjf...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any way
>>>>>>>>>>>>>>> to set parallelism for individual operators like group by and 
>>>>>>>>>>>>>>> join? I
>>>>>>>>>>>>>>> understand the parallelism setting depends on the underlying 
>>>>>>>>>>>>>>> execution
>>>>>>>>>>>>>>> engine, but it is very common to set parallelism like group by 
>>>>>>>>>>>>>>> and join in
>>>>>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best Regards
>>>>>>>>>>>
>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Best Regards
>>>>>>>>>
>>>>>>>>> Jeff Zhang
>>>>>>>>>
>>>>>>>> --
>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>>>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> Best Regards
>>>>>>
>>>>>> Jeff Zhang
>>>>>>
>>>>>
>>
>> --
>> Best Regards
>>
>> Jeff Zhang
>>
>

Reply via email to