Hi Venkat,
I agree that the parallelism of source vertex should not be upper bounded
by the job's global max parallelism. The case you mentioned, >> High filter
selectivity with huge amounts of data to read  excellently supports this
viewpoint. (In fact, in the current implementation, if the source
parallelism is pre-specified at job create stage, rather than relying on
the dynamic parallelism inference of the AdaptiveBatchScheduler, the source
vertex's parallelism can indeed exceed the job's global max parallelism.)

As Lijie and Junrui pointed out, the key issue is "semantic consistency."
Currently, if a vertex has not set maxParallelism, the
AdaptiveBatchScheduler will use
`execution.batch.adaptive.auto-parallelism.max-parallelism` as the vertex's
maxParallelism. Since the current implementation does not distinguish
between source vertices and downstream vertices, source vertices are also
subject to this limitation.

Therefore, I believe that if the issue of "semantic consistency" can be
well explained in the code and configuration documentation, the
AdaptiveBatchScheduler should support that the parallelism of source
vertices can exceed the job's global max parallelism.

Best,
Xia

Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月14日周日 10:31写道:

> Let me state why I think "*jobmanager.adaptive-batch-sche*
> *duler.default-source-parallelism*" should not be bound by the "
> *jobmanager.adaptive-batch-sche**duler.max-parallelism*".
>
>    - Source vertex is unique and does not have any upstream vertices
>    - Downstream vertices read shuffled data partitioned by key, which is
>    not the case for the Source vertex
>    - Limiting source parallelism by downstream vertices' max parallelism is
>    incorrect
>
> If we say for ""semantic consistency" the source vertex parallelism has to
> be bound by the overall job's max parallelism, it can lead to following
> issues:
>
>    - High filter selectivity with huge amounts of data to read - setting
>    high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so that
>    source parallelism can be set higher can lead to small blocks and
>    sub-optimal performance.
>    - Setting high "*jobmanager.adaptive-batch-scheduler.max-parallelism*"
>    requires careful tuning of network buffer configurations which is
>    unnecessary in cases where it is not required just so that the source
>    parallelism can be set high.
>
> Regards
> Venkata krishnan
>
> On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee <jrlee....@gmail.com> wrote:
>
> > Hello Venkata krishnan,
> >
> > I think the term "semantic inconsistency" defined by
> > jobmanager.adaptive-batch-scheduler.max-parallelism refers to
> maintaining a
> > uniform upper limit on parallelism across all vertices within a job. As
> the
> > source vertices are part of the global execution graph, they should also
> > respect this rule to ensure consistent application of parallelism
> > constraints.
> >
> > Best,
> > Junrui
> >
> > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月12日周五 02:10写道:
> >
> > > Gentle bump on this question. cc @Becket Qin <becket....@gmail.com> as
> > > well.
> > >
> > > Regards
> > > Venkata krishnan
> > >
> > >
> > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan <
> > > vsowr...@asu.edu> wrote:
> > >
> > > > Thanks for the response Lijie and Junrui. Sorry for the late reply.
> Few
> > > > follow up questions.
> > > >
> > > > > Source can actually ignore this limit
> > > > because it has no upstream, but this will lead to semantic
> > inconsistency.
> > > >
> > > > Lijie, can you please elaborate on the above comment further? What do
> > you
> > > > mean when you say it will lead to "semantic inconsistency"?
> > > >
> > > > > Secondly, we first need to limit the max parallelism of
> (downstream)
> > > > vertex, and then we can decide how many subpartitions (upstream
> vertex)
> > > > should produce. The limit should be effective, otherwise some
> > downstream
> > > > tasks will have no data to process.
> > > >
> > > > This makes sense in the context of any other vertices other than the
> > > > source vertex. As you mentioned above ("Source can actually ignore
> this
> > > > limit because it has no upstream"), therefore I feel "
> > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism" need
> > not
> > > > be upper bounded by
> > > "jobmanager.adaptive-batch-scheduler.max-parallelism".
> > > >
> > > > Regards
> > > > Venkata krishnan
> > > >
> > > >
> > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee <jrlee....@gmail.com>
> > wrote:
> > > >
> > > >> Hi Venkat,
> > > >>
> > > >> As Lijie mentioned,  in Flink, the parallelism is required to be
> less
> > > than
> > > >> or equal to the maximum parallelism. The config option
> > > >> jobmanager.adaptive-batch-scheduler.max-parallelism and
> > > >> jobmanager.adaptive-batch-scheduler.default-source-parallelism will
> be
> > > set
> > > >> as the source's parallelism and max-parallelism, respectively.
> > > Therefore,
> > > >> the check failed situation you encountered is in line with the
> > > >> expectations.
> > > >>
> > > >> Best,
> > > >> Junrui
> > > >>
> > > >> Lijie Wang <wangdachui9...@gmail.com> 于2024年2月29日周四 17:35写道:
> > > >>
> > > >> > Hi Venkat,
> > > >> >
> > > >> > >> default-source-parallelism config should be independent from
> the
> > > >> > max-parallelism
> > > >> >
> > > >> > Actually, it's not.
> > > >> >
> > > >> > Firstly, it's obvious that the parallelism should be less than or
> > > equal
> > > >> to
> > > >> > the max parallelism(both literally and execution). The
> > > >> > "jobmanager.adaptive-batch-scheduler.max-parallelism" will be used
> > as
> > > >> the
> > > >> > max parallelism for a vertex if you don't set max parallelism for
> it
> > > >> > individually (Just like the source in your case).
> > > >> >
> > > >> > Secondly, we first need to limit the max parallelism of
> (downstream)
> > > >> > vertex, and then we can decide how many subpartitions (upstream
> > > vertex)
> > > >> > should produce. The limit should be effective, otherwise some
> > > downstream
> > > >> > tasks will have no data to process. Source can actually ignore
> this
> > > >> limit
> > > >> > because it has no upstream, but this will lead to semantic
> > > >> inconsistency.
> > > >> >
> > > >> > Best,
> > > >> > Lijie
> > > >> >
> > > >> > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年2月29日周四
> > 05:49写道:
> > > >> >
> > > >> > > Hi Flink devs,
> > > >> > >
> > > >> > > With Flink's AdaptiveBatchScheduler
> > > >> > > <
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$
> > > >> > > >
> > > >> > > (Note:
> > > >> > > this is different from AdaptiveScheduler
> > > >> > > <
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$
> > > >> > > >),
> > > >> > > the scheduler automatically determines the correct number of
> > > >> downstream
> > > >> > > tasks required to process the shuffle generated by the upstream
> > > >> vertex.
> > > >> > >
> > > >> > > I have a question regarding the current behavior. There are 2
> > > configs
> > > >> > which
> > > >> > > are in interplay here.
> > > >> > > 1.
> jobmanager.adaptive-batch-scheduler.default-source-parallelism
> > > >> > > <
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
> > > >> > > >
> > > >> > >  - The default parallelism of data source.
> > > >> > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism
> > > >> > > <
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$
> > > >> > > >
> > > >> > > -
> > > >> > > Upper bound of allowed parallelism to set adaptively.
> > > >> > >
> > > >> > > Currently, if "
> > > >> > > jobmanager.adaptive-batch-scheduler.default-source-parallelism
> > > >> > > <
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
> > > >> > > >"
> > > >> > > is greater than
> > "jobmanager.adaptive-batch-scheduler.max-parallelism
> > > >> > > <
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$
> > > >> > > >",
> > > >> > > Flink application fails with the below message:
> > > >> > >
> > > >> > > "Vertex's parallelism should be smaller than or equal to
> vertex's
> > > max
> > > >> > > parallelism."
> > > >> > >
> > > >> > > This is the corresponding code in Flink's
> > > DefaultVertexParallelismInfo
> > > >> > > <
> > > >> > >
> > > >> >
> > > >>
> > >
> >
> https://urldefense.com/v3/__https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java*L110__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqBRDEfwA$
> > > >> > > >.
> > > >> > > My question is, "default-source-parallelism" config should be
> > > >> independent
> > > >> > > from the "max-parallelism" flag. The former controls the default
> > > >> source
> > > >> > > parallelism while the latter controls the max number of
> partitions
> > > to
> > > >> > write
> > > >> > > the intermediate shuffle.
> > > >> > >
> > > >> > > If this is true, then the above check should be fixed.
> Otherwise,
> > > >> wanted
> > > >> > to
> > > >> > > understand why the "default-source-parallelism` should be less
> > than
> > > >> the
> > > >> > > "max-parallelism"
> > > >> > >
> > > >> > > Thanks
> > > >> > > Venkat
> > > >> > >
> > > >> >
> > > >>
> > > >
> > >
> >
>

Reply via email to