Hello Venkata krishnan,

I think the term "semantic inconsistency" defined by
jobmanager.adaptive-batch-scheduler.max-parallelism refers to maintaining a
uniform upper limit on parallelism across all vertices within a job. As the
source vertices are part of the global execution graph, they should also
respect this rule to ensure consistent application of parallelism
constraints.

Best,
Junrui

Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月12日周五 02:10写道:

> Gentle bump on this question. cc @Becket Qin <becket....@gmail.com> as
> well.
>
> Regards
> Venkata krishnan
>
>
> On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan <
> vsowr...@asu.edu> wrote:
>
> > Thanks for the response Lijie and Junrui. Sorry for the late reply. Few
> > follow up questions.
> >
> > > Source can actually ignore this limit
> > because it has no upstream, but this will lead to semantic inconsistency.
> >
> > Lijie, can you please elaborate on the above comment further? What do you
> > mean when you say it will lead to "semantic inconsistency"?
> >
> > > Secondly, we first need to limit the max parallelism of (downstream)
> > vertex, and then we can decide how many subpartitions (upstream vertex)
> > should produce. The limit should be effective, otherwise some downstream
> > tasks will have no data to process.
> >
> > This makes sense in the context of any other vertices other than the
> > source vertex. As you mentioned above ("Source can actually ignore this
> > limit because it has no upstream"), therefore I feel "
> > jobmanager.adaptive-batch-scheduler.default-source-parallelism" need not
> > be upper bounded by
> "jobmanager.adaptive-batch-scheduler.max-parallelism".
> >
> > Regards
> > Venkata krishnan
> >
> >
> > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee <jrlee....@gmail.com> wrote:
> >
> >> Hi Venkat,
> >>
> >> As Lijie mentioned,  in Flink, the parallelism is required to be less
> than
> >> or equal to the maximum parallelism. The config option
> >> jobmanager.adaptive-batch-scheduler.max-parallelism and
> >> jobmanager.adaptive-batch-scheduler.default-source-parallelism will be
> set
> >> as the source's parallelism and max-parallelism, respectively.
> Therefore,
> >> the check failed situation you encountered is in line with the
> >> expectations.
> >>
> >> Best,
> >> Junrui
> >>
> >> Lijie Wang <wangdachui9...@gmail.com> 于2024年2月29日周四 17:35写道:
> >>
> >> > Hi Venkat,
> >> >
> >> > >> default-source-parallelism config should be independent from the
> >> > max-parallelism
> >> >
> >> > Actually, it's not.
> >> >
> >> > Firstly, it's obvious that the parallelism should be less than or
> equal
> >> to
> >> > the max parallelism(both literally and execution). The
> >> > "jobmanager.adaptive-batch-scheduler.max-parallelism" will be used as
> >> the
> >> > max parallelism for a vertex if you don't set max parallelism for it
> >> > individually (Just like the source in your case).
> >> >
> >> > Secondly, we first need to limit the max parallelism of (downstream)
> >> > vertex, and then we can decide how many subpartitions (upstream
> vertex)
> >> > should produce. The limit should be effective, otherwise some
> downstream
> >> > tasks will have no data to process. Source can actually ignore this
> >> limit
> >> > because it has no upstream, but this will lead to semantic
> >> inconsistency.
> >> >
> >> > Best,
> >> > Lijie
> >> >
> >> > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年2月29日周四 05:49写道:
> >> >
> >> > > Hi Flink devs,
> >> > >
> >> > > With Flink's AdaptiveBatchScheduler
> >> > > <
> >> > >
> >> >
> >>
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$
> >> > > >
> >> > > (Note:
> >> > > this is different from AdaptiveScheduler
> >> > > <
> >> > >
> >> >
> >>
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$
> >> > > >),
> >> > > the scheduler automatically determines the correct number of
> >> downstream
> >> > > tasks required to process the shuffle generated by the upstream
> >> vertex.
> >> > >
> >> > > I have a question regarding the current behavior. There are 2
> configs
> >> > which
> >> > > are in interplay here.
> >> > > 1. jobmanager.adaptive-batch-scheduler.default-source-parallelism
> >> > > <
> >> > >
> >> >
> >>
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
> >> > > >
> >> > >  - The default parallelism of data source.
> >> > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism
> >> > > <
> >> > >
> >> >
> >>
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$
> >> > > >
> >> > > -
> >> > > Upper bound of allowed parallelism to set adaptively.
> >> > >
> >> > > Currently, if "
> >> > > jobmanager.adaptive-batch-scheduler.default-source-parallelism
> >> > > <
> >> > >
> >> >
> >>
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
> >> > > >"
> >> > > is greater than "jobmanager.adaptive-batch-scheduler.max-parallelism
> >> > > <
> >> > >
> >> >
> >>
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$
> >> > > >",
> >> > > Flink application fails with the below message:
> >> > >
> >> > > "Vertex's parallelism should be smaller than or equal to vertex's
> max
> >> > > parallelism."
> >> > >
> >> > > This is the corresponding code in Flink's
> DefaultVertexParallelismInfo
> >> > > <
> >> > >
> >> >
> >>
> https://urldefense.com/v3/__https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java*L110__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqBRDEfwA$
> >> > > >.
> >> > > My question is, "default-source-parallelism" config should be
> >> independent
> >> > > from the "max-parallelism" flag. The former controls the default
> >> source
> >> > > parallelism while the latter controls the max number of partitions
> to
> >> > write
> >> > > the intermediate shuffle.
> >> > >
> >> > > If this is true, then the above check should be fixed. Otherwise,
> >> wanted
> >> > to
> >> > > understand why the "default-source-parallelism` should be less than
> >> the
> >> > > "max-parallelism"
> >> > >
> >> > > Thanks
> >> > > Venkat
> >> > >
> >> >
> >>
> >
>

Reply via email to