+1 to not requiring details like this in the Beam model. There is, however,
the question of how to pass such implementation-detail specific hints to a
runner that requires them. Generally that's done via ResourceHints or
annotations, and while the former seems a good fit it's primarily focused
on setting up the right context for user code (which GBK is not).

A complete hack is to add an experiment like
flink_parallelism_for_stage=STAGE_NAME:value. It'd be nice to do something
cleaner.


On Fri, Apr 21, 2023 at 10:37 AM Ning Kang via user <u...@beam.apache.org>
wrote:

> Hi Jan,
>
> To generalize the per-stage parallelism configuration, we should have a FR
> proposing the capability to explicitly set autoscaling (in this case, fixed
> size per stage) policy in Beam pipelines.
>
> Per-step or per-stage parallelism, or fusion/optimization is not part of
> the Beam model. They are [Flink] runner implementation details and should
> be configured for each runner.
>
> Also, when building the pipeline, it's not clear what the fusion looks
> like until the pipeline is submitted to a runner, thus making configuration
> of the parallelism/worker-per-stage not straightforward.
> Flink's parallelism settings can be found here
> <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/>,
> it's still kind of a black box since you don't really know how many tasks
> are actually spawned until you run a pipeline.
>
> That being said, if we have a general interface controlling how a pipeline
> scales, each runner could adapt [auto]scaling in their own way.
> For example, in a Flink job, each operator/stage's task slot is prorated
> by their key numbers; the maximum parallelism is throttled by task slot
> utilization.
> Another example, in a Dataflow job, each stage horizontally scales by CPU
> utilization; vertically scales by memory/disk utilization.
>
> +dev@beam.apache.org <dev@beam.apache.org>
> Let's use this thread to discuss how to configure a pipeline for runners
> so that they can scale workers appropriately without exposing
> runner-specific details to the Beam model.
>
> Ning.
>
>
> On Thu, Apr 20, 2023 at 1:41 PM Jan Lukavský <je...@seznam.cz> wrote:
>
>> Hi Ning,
>>
>> I might have missed that in the discussion, but we talk about batch
>> execution, am I right? In streaming, all operators (PTransforms) of a
>> Pipeline are run in the same slots, thus the downsides are limited. You can
>> enforce streaming mode using --streaming command-line argument. But yes,
>> this might have other implications. For batch only it obviously makes sense
>> to limit parallelism of a (fused) 'stage', which is not an transform-level
>> concept, but rather a more complex union of transforms divided by shuffle
>> barrier. Would you be willing to start a follow-up thread in @dev mailing
>> list for this for deeper discussion?
>>
>>  Jan
>> On 4/20/23 19:18, Ning Kang via user wrote:
>>
>> Hi Jan,
>>
>> The approach works when your pipeline doesn't have too many operators.
>> And the operator that needs the highest parallelism can only use at most
>> #total_task_slots / #operators resources available in the cluster.
>>
>> Another downside is wasted resources for other smaller operators who
>> cannot make full use of task slots assigned to them. You might see only
>> 1/10 tasks running while the other 9/10 tasks idle for an operator with
>> parallelism 10, especially when it's doing some aggregation like a SUM.
>>
>> One redeeming method is that, for operators following another operator
>> with high fanout, we can explicitly add a Reshuffle to allow a higher
>> parallelism. But this circles back to the first downside: if your pipeline
>> has exponentially high fanout through it, setting a single parallelism for
>> the whole pipeline is not ideal because it limits the scalability of your
>> pipeline significantly.
>>
>> Ning.
>>
>>
>> On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote:
>>
>>> Hi,
>>>
>>> this topic was discussed many years ago and the conclusion there was
>>> that setting the parallelism of individual operators via
>>> FlinkPipelineOptions (or ResourceHints) is be possible, but would be
>>> somewhat cumbersome. Although I understand that it "feels" weird to have
>>> high parallelism for operators with small inputs, does this actually bring
>>> any relevant performance impact? I always use parallelism based on the
>>> largest operator in the Pipeline and this seems to work just fine. Is there
>>> any particular need or measurable impact of such approach?
>>>
>>>  Jan
>>> On 4/19/23 17:23, Nimalan Mahendran wrote:
>>>
>>> Same need here, using Flink runner. We are processing a pcollection
>>> (extracting features per element) then combining these into groups of
>>> features and running the next operator on those groups.
>>>
>>> Each group contains ~50 elements, so the parallelism of the operator
>>> upstream of the groupby should be higher, to be balanced with the
>>> downstream operator.
>>>
>>> On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zjf...@gmail.com> wrote:
>>>
>>>> Hi Reuven,
>>>>
>>>> It would be better to set parallelism for operators, as I mentioned
>>>> before, there may be multiple groupby, join operators in one pipeline, and
>>>> their parallelism can be different due to different input data sizes.
>>>>
>>>> On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com> wrote:
>>>>
>>>>> Jeff - does setting the global default work for you, or do you need
>>>>> per-operator control? Seems like it would be to add this to ResourceHints.
>>>>>
>>>>> On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw <rober...@google.com>
>>>>> wrote:
>>>>>
>>>>>> Yeah, I don't think we have a good per-operator API for this. If we
>>>>>> were to add it, it probably belongs in ResourceHints.
>>>>>>
>>>>>> On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax <re...@google.com> wrote:
>>>>>>
>>>>>>> Looking at FlinkPipelineOptions, there is a parallelism option you
>>>>>>> can set. I believe this sets the default parallelism for all Flink
>>>>>>> operators.
>>>>>>>
>>>>>>> On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang <zjf...@gmail.com> wrote:
>>>>>>>
>>>>>>>> Thanks Holden, this would work for Spark, but Flink doesn't have
>>>>>>>> such kind of mechanism, so I am looking for a general solution on the 
>>>>>>>> beam
>>>>>>>> side.
>>>>>>>>
>>>>>>>> On Mon, Apr 17, 2023 at 10:08 AM Holden Karau <hol...@pigscanfly.ca>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> To a (small) degree Sparks “new” AQE might be able to help
>>>>>>>>> depending on what kind of operations Beam is compiling it down to.
>>>>>>>>>
>>>>>>>>> Have you tried setting spark.sql.adaptive.enabled &
>>>>>>>>> spark.sql.adaptive.coalescePartitions.enabled
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, Apr 17, 2023 at 10:34 AM Reuven Lax via user <
>>>>>>>>> u...@beam.apache.org> wrote:
>>>>>>>>>
>>>>>>>>>> I see. Robert - what is the story for parallelism controls on
>>>>>>>>>> GBK with the Spark or Flink runners?
>>>>>>>>>>
>>>>>>>>>> On Sun, Apr 16, 2023 at 6:24 PM Jeff Zhang <zjf...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> No, I don't use dataflow, I use Spark & Flink.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Apr 17, 2023 at 8:08 AM Reuven Lax <re...@google.com>
>>>>>>>>>>> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Are you running on the Dataflow runner? If so, Dataflow -
>>>>>>>>>>>> unlike Spark and Flink - dynamically modifies the parallelism as 
>>>>>>>>>>>> the
>>>>>>>>>>>> operator runs, so there is no need to have such controls. In fact 
>>>>>>>>>>>> these
>>>>>>>>>>>> specific controls wouldn't make much sense for the way Dataflow 
>>>>>>>>>>>> implements
>>>>>>>>>>>> these operators.
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Apr 16, 2023 at 12:25 AM Jeff Zhang <zjf...@gmail.com>
>>>>>>>>>>>> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Just for performance tuning like in Spark and Flink.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:10 PM Robert Bradshaw via user <
>>>>>>>>>>>>> u...@beam.apache.org> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> What are you trying to achieve by setting the parallelism?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 5:13 PM Jeff Zhang <zjf...@gmail.com>
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks Reuven, what I mean is to set the parallelism in
>>>>>>>>>>>>>>> operator level. And the input size of the operator is unknown 
>>>>>>>>>>>>>>> at compiling
>>>>>>>>>>>>>>> stage if it is not a source
>>>>>>>>>>>>>>>  operator,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Here's an example of flink
>>>>>>>>>>>>>>> https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
>>>>>>>>>>>>>>> Spark also support to set operator level parallelism (see 
>>>>>>>>>>>>>>> groupByKey
>>>>>>>>>>>>>>> and reduceByKey):
>>>>>>>>>>>>>>> https://spark.apache.org/docs/latest/rdd-programming-guide.html
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Sun, Apr 16, 2023 at 1:42 AM Reuven Lax via user <
>>>>>>>>>>>>>>> u...@beam.apache.org> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> The maximum parallelism is always determined by the
>>>>>>>>>>>>>>>> parallelism of your data. If you do a GroupByKey for example, 
>>>>>>>>>>>>>>>> the number of
>>>>>>>>>>>>>>>> keys in your data determines the maximum parallelism.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Beyond the limitations in your data, it depends on your
>>>>>>>>>>>>>>>> execution engine. If you're using Dataflow, Dataflow is 
>>>>>>>>>>>>>>>> designed to
>>>>>>>>>>>>>>>> automatically determine the parallelism (e.g. work will be 
>>>>>>>>>>>>>>>> dynamically
>>>>>>>>>>>>>>>> split and moved around between workers, the number of workers 
>>>>>>>>>>>>>>>> will
>>>>>>>>>>>>>>>> autoscale, etc.), so there's no need to explicitly set the 
>>>>>>>>>>>>>>>> parallelism of
>>>>>>>>>>>>>>>> the execution.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Sat, Apr 15, 2023 at 1:12 AM Jeff Zhang <
>>>>>>>>>>>>>>>> zjf...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Besides the global parallelism of beam job, is there any
>>>>>>>>>>>>>>>>> way to set parallelism for individual operators like group by 
>>>>>>>>>>>>>>>>> and join? I
>>>>>>>>>>>>>>>>> understand the parallelism setting depends on the underlying 
>>>>>>>>>>>>>>>>> execution
>>>>>>>>>>>>>>>>> engine, but it is very common to set parallelism like group 
>>>>>>>>>>>>>>>>> by and join in
>>>>>>>>>>>>>>>>> both spark & flink.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> --
>>>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> --
>>>>>>>>>>>>> Best Regards
>>>>>>>>>>>>>
>>>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> Best Regards
>>>>>>>>>>>
>>>>>>>>>>> Jeff Zhang
>>>>>>>>>>>
>>>>>>>>>> --
>>>>>>>>> Twitter: https://twitter.com/holdenkarau
>>>>>>>>> Books (Learning Spark, High Performance Spark, etc.):
>>>>>>>>> https://amzn.to/2MaRAG9  <https://amzn.to/2MaRAG9>
>>>>>>>>> YouTube Live Streams: https://www.youtube.com/user/holdenkarau
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> --
>>>>>>>> Best Regards
>>>>>>>>
>>>>>>>> Jeff Zhang
>>>>>>>>
>>>>>>>
>>>>
>>>> --
>>>> Best Regards
>>>>
>>>> Jeff Zhang
>>>>
>>>

Reply via email to