Hello Venkata krishnan, I think the term "semantic inconsistency" defined by jobmanager.adaptive-batch-scheduler.max-parallelism refers to maintaining a uniform upper limit on parallelism across all vertices within a job. As the source vertices are part of the global execution graph, they should also respect this rule to ensure consistent application of parallelism constraints.
Best, Junrui Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月12日周五 02:10写道: > Gentle bump on this question. cc @Becket Qin <becket....@gmail.com> as > well. > > Regards > Venkata krishnan > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan < > vsowr...@asu.edu> wrote: > > > Thanks for the response Lijie and Junrui. Sorry for the late reply. Few > > follow up questions. > > > > > Source can actually ignore this limit > > because it has no upstream, but this will lead to semantic inconsistency. > > > > Lijie, can you please elaborate on the above comment further? What do you > > mean when you say it will lead to "semantic inconsistency"? > > > > > Secondly, we first need to limit the max parallelism of (downstream) > > vertex, and then we can decide how many subpartitions (upstream vertex) > > should produce. The limit should be effective, otherwise some downstream > > tasks will have no data to process. > > > > This makes sense in the context of any other vertices other than the > > source vertex. As you mentioned above ("Source can actually ignore this > > limit because it has no upstream"), therefore I feel " > > jobmanager.adaptive-batch-scheduler.default-source-parallelism" need not > > be upper bounded by > "jobmanager.adaptive-batch-scheduler.max-parallelism". > > > > Regards > > Venkata krishnan > > > > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee <jrlee....@gmail.com> wrote: > > > >> Hi Venkat, > >> > >> As Lijie mentioned, in Flink, the parallelism is required to be less > than > >> or equal to the maximum parallelism. The config option > >> jobmanager.adaptive-batch-scheduler.max-parallelism and > >> jobmanager.adaptive-batch-scheduler.default-source-parallelism will be > set > >> as the source's parallelism and max-parallelism, respectively. > Therefore, > >> the check failed situation you encountered is in line with the > >> expectations. > >> > >> Best, > >> Junrui > >> > >> Lijie Wang <wangdachui9...@gmail.com> 于2024年2月29日周四 17:35写道: > >> > >> > Hi Venkat, > >> > > >> > >> default-source-parallelism config should be independent from the > >> > max-parallelism > >> > > >> > Actually, it's not. > >> > > >> > Firstly, it's obvious that the parallelism should be less than or > equal > >> to > >> > the max parallelism(both literally and execution). The > >> > "jobmanager.adaptive-batch-scheduler.max-parallelism" will be used as > >> the > >> > max parallelism for a vertex if you don't set max parallelism for it > >> > individually (Just like the source in your case). > >> > > >> > Secondly, we first need to limit the max parallelism of (downstream) > >> > vertex, and then we can decide how many subpartitions (upstream > vertex) > >> > should produce. The limit should be effective, otherwise some > downstream > >> > tasks will have no data to process. Source can actually ignore this > >> limit > >> > because it has no upstream, but this will lead to semantic > >> inconsistency. > >> > > >> > Best, > >> > Lijie > >> > > >> > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年2月29日周四 05:49写道: > >> > > >> > > Hi Flink devs, > >> > > > >> > > With Flink's AdaptiveBatchScheduler > >> > > < > >> > > > >> > > >> > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$ > >> > > > > >> > > (Note: > >> > > this is different from AdaptiveScheduler > >> > > < > >> > > > >> > > >> > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$ > >> > > >), > >> > > the scheduler automatically determines the correct number of > >> downstream > >> > > tasks required to process the shuffle generated by the upstream > >> vertex. > >> > > > >> > > I have a question regarding the current behavior. There are 2 > configs > >> > which > >> > > are in interplay here. > >> > > 1. jobmanager.adaptive-batch-scheduler.default-source-parallelism > >> > > < > >> > > > >> > > >> > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$ > >> > > > > >> > > - The default parallelism of data source. > >> > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism > >> > > < > >> > > > >> > > >> > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$ > >> > > > > >> > > - > >> > > Upper bound of allowed parallelism to set adaptively. > >> > > > >> > > Currently, if " > >> > > jobmanager.adaptive-batch-scheduler.default-source-parallelism > >> > > < > >> > > > >> > > >> > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$ > >> > > >" > >> > > is greater than "jobmanager.adaptive-batch-scheduler.max-parallelism > >> > > < > >> > > > >> > > >> > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$ > >> > > >", > >> > > Flink application fails with the below message: > >> > > > >> > > "Vertex's parallelism should be smaller than or equal to vertex's > max > >> > > parallelism." > >> > > > >> > > This is the corresponding code in Flink's > DefaultVertexParallelismInfo > >> > > < > >> > > > >> > > >> > https://urldefense.com/v3/__https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java*L110__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqBRDEfwA$ > >> > > >. > >> > > My question is, "default-source-parallelism" config should be > >> independent > >> > > from the "max-parallelism" flag. The former controls the default > >> source > >> > > parallelism while the latter controls the max number of partitions > to > >> > write > >> > > the intermediate shuffle. > >> > > > >> > > If this is true, then the above check should be fixed. Otherwise, > >> wanted > >> > to > >> > > understand why the "default-source-parallelism` should be less than > >> the > >> > > "max-parallelism" > >> > > > >> > > Thanks > >> > > Venkat > >> > > > >> > > >> > > >