Hi Ning,

I might have missed that in the discussion, but we talk about batch execution, am I right? In streaming, all operators (PTransforms) of a Pipeline are run in the same slots, thus the downsides are limited. You can enforce streaming mode using --streaming command-line argument. But yes, this might have other implications. For batch only it obviously makes sense to limit parallelism of a (fused) 'stage', which is not an transform-level concept, but rather a more complex union of transforms divided by shuffle barrier. Would you be willing to start a follow-up thread in @dev mailing list for this for deeper discussion?

 Jan

On 4/20/23 19:18, Ning Kang via user wrote:
Hi Jan,

The approach works when your pipeline doesn't have too many operators. And the operator that needs the highest parallelism can only use at most #total_task_slots / #operators resources available in the cluster.

Another downside is wasted resources for other smaller operators who cannot make full use of task slots assigned to them. You might see only 1/10 tasks running while the other 9/10 tasks idle for an operator with parallelism 10, especially when it's doing some aggregation like a SUM.

One redeeming method is that, for operators following another operator with high fanout, we can explicitly add a Reshuffle to allow a higher parallelism. But this circles back to the first downside: if your pipeline has exponentially high fanout through it, setting a single parallelism for the whole pipeline is not ideal because it limits the scalability of your pipeline significantly.

Ning.


On Thu, Apr 20, 2023 at 5:53 AM Jan Lukavský <je...@seznam.cz> wrote:

    Hi,

    this topic was discussed many years ago and the conclusion there
    was that setting the parallelism of individual operators via
    FlinkPipelineOptions (or ResourceHints) is be possible, but would
    be somewhat cumbersome. Although I understand that it "feels"
    weird to have high parallelism for operators with small inputs,
    does this actually bring any relevant performance impact? I always
    use parallelism based on the largest operator in the Pipeline and
    this seems to work just fine. Is there any particular need or
    measurable impact of such approach?

     Jan

    On 4/19/23 17:23, Nimalan Mahendran wrote:
    Same need here, using Flink runner. We are processing a
    pcollection (extracting features per element) then combining
    these into groups of features and running the next operator on
    those groups.

    Each group contains ~50 elements, so the parallelism of the
    operator upstream of the groupby should be higher, to be balanced
    with the downstream operator.

    On Tue, Apr 18, 2023 at 19:17 Jeff Zhang <zjf...@gmail.com> wrote:

        Hi Reuven,

        It would be better to set parallelism for operators, as I
        mentioned before, there may be multiple groupby, join
        operators in one pipeline, and their parallelism can be
        different due to different input data sizes.

        On Wed, Apr 19, 2023 at 3:59 AM Reuven Lax <re...@google.com>
        wrote:

            Jeff - does setting the global default work for you, or
            do you need per-operator control? Seems like it would be
            to add this to ResourceHints.

            On Tue, Apr 18, 2023 at 12:35 PM Robert Bradshaw
            <rober...@google.com> wrote:

                Yeah, I don't think we have a good per-operator API
                for this. If we were to add it, it probably belongs
                in ResourceHints.

                On Sun, Apr 16, 2023 at 11:28 PM Reuven Lax
                <re...@google.com> wrote:

                    Looking at FlinkPipelineOptions, there is a
                    parallelism option you can set. I believe this
                    sets the default parallelism for all Flink operators.

                    On Sun, Apr 16, 2023 at 7:20 PM Jeff Zhang
                    <zjf...@gmail.com> wrote:

                        Thanks Holden, this would work for Spark, but
                        Flink doesn't have such kind of mechanism, so
                        I am looking for a general solution on the
                        beam side.

                        On Mon, Apr 17, 2023 at 10:08 AM Holden Karau
                        <hol...@pigscanfly.ca> wrote:

                            To a (small) degree Sparks “new” AQE
                            might be able to help depending on what
                            kind of operations Beam is compiling it
                            down to.

                            Have you tried setting
                            spark.sql.adaptive.enabled &
                            spark.sql.adaptive.coalescePartitions.enabled



                            On Mon, Apr 17, 2023 at 10:34 AM Reuven
                            Lax via user <user@beam.apache.org> wrote:

                                I see. Robert - what is the story for
                                parallelism controls on GBK with the
                                Spark or Flink runners?

                                On Sun, Apr 16, 2023 at 6:24 PM Jeff
                                Zhang <zjf...@gmail.com> wrote:

                                    No, I don't use dataflow, I use
                                    Spark & Flink.


                                    On Mon, Apr 17, 2023 at 8:08 AM
                                    Reuven Lax <re...@google.com> wrote:

                                        Are you running on the
                                        Dataflow runner? If so,
                                        Dataflow - unlike Spark and
                                        Flink - dynamically modifies
                                        the parallelism as the
                                        operator runs, so there is no
                                        need to have such controls.
                                        In fact these specific
                                        controls wouldn't make much
                                        sense for the way Dataflow
                                        implements these operators.

                                        On Sun, Apr 16, 2023 at
                                        12:25 AM Jeff Zhang
                                        <zjf...@gmail.com> wrote:

                                            Just for
                                            performance tuning like
                                            in Spark and Flink.


                                            On Sun, Apr 16, 2023 at
                                            1:10 PM Robert Bradshaw
                                            via user
                                            <user@beam.apache.org> wrote:

                                                What are you trying
                                                to achieve by setting
                                                the parallelism?

                                                On Sat, Apr 15, 2023
                                                at 5:13 PM Jeff Zhang
                                                <zjf...@gmail.com> wrote:

                                                    Thanks Reuven,
                                                    what I mean is to
                                                    set the
                                                    parallelism in
                                                    operator level.
                                                    And the input
                                                    size of the
                                                    operator is
                                                    unknown at
                                                    compiling stage
                                                    if it is not a
                                                    source
                                                     operator,

                                                    Here's an example
                                                    of flink
                                                    
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/execution/parallel/#operator-level
                                                    Spark also
                                                    support to set
                                                    operator level
                                                    parallelism (see
                                                    groupByKey and
                                                    reduceByKey):
                                                    
https://spark.apache.org/docs/latest/rdd-programming-guide.html


                                                    On Sun, Apr 16,
                                                    2023 at 1:42 AM
                                                    Reuven Lax via
                                                    user
                                                    <user@beam.apache.org>
                                                    wrote:

                                                        The maximum
                                                        parallelism is
                                                        always
                                                        determined by
                                                        the
                                                        parallelism
                                                        of your data.
                                                        If you do a
                                                        GroupByKey
                                                        for example,
                                                        the number of
                                                        keys in your
                                                        data
                                                        determines
                                                        the maximum
                                                        parallelism.

                                                        Beyond the
                                                        limitations
                                                        in your data,
                                                        it depends on
                                                        your
                                                        execution
                                                        engine. If
                                                        you're using
                                                        Dataflow,
                                                        Dataflow is
                                                        designed to
                                                        automatically
                                                        determine the
                                                        parallelism
                                                        (e.g. work
                                                        will be
                                                        dynamically
                                                        split and
                                                        moved around
                                                        between
                                                        workers, the
                                                        number of
                                                        workers will
                                                        autoscale,
                                                        etc.), so
                                                        there's no
                                                        need to
                                                        explicitly
                                                        set the
                                                        parallelism
                                                        of the execution.

                                                        On Sat, Apr
                                                        15, 2023 at
                                                        1:12 AM Jeff
                                                        Zhang
                                                        <zjf...@gmail.com>
                                                        wrote:

                                                            Besides
                                                            the
                                                            global
                                                            parallelism of
                                                            beam job,
                                                            is there
                                                            any way
                                                            to set
                                                            parallelism for
                                                            individual
                                                            operators
                                                            like
                                                            group by
                                                            and join?
                                                            I
                                                            understand the
                                                            parallelism setting
                                                            depends
                                                            on the
                                                            underlying
                                                            execution
                                                            engine,
                                                            but it is
                                                            very
                                                            common to
                                                            set
                                                            parallelism like
                                                            group by
                                                            and join
                                                            in both
                                                            spark &
                                                            flink.







-- Best Regards

                                                            Jeff Zhang



-- Best Regards

                                                    Jeff Zhang



-- Best Regards

                                            Jeff Zhang



-- Best Regards

                                    Jeff Zhang

-- Twitter: https://twitter.com/holdenkarau
                            Books (Learning Spark, High Performance
                            Spark, etc.): https://amzn.to/2MaRAG9
                            <https://amzn.to/2MaRAG9>
                            YouTube Live Streams:
                            https://www.youtube.com/user/holdenkarau



-- Best Regards

                        Jeff Zhang



-- Best Regards

        Jeff Zhang

Reply via email to