Filed https://issues.apache.org/jira/browse/FLINK-35165 to address the above described issue. Will share the PR here once it is ready for review.
Regards Venkata krishnan On Wed, Apr 17, 2024 at 5:32 AM Junrui Lee <jrlee....@gmail.com> wrote: > Thanks Venkata and Xia for providing further clarification. I think your > example illustrates the significance of this proposal very well. Please > feel free go ahead and address the concerns. > > Best, > Junrui > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月16日周二 07:01写道: > > > Thanks for adding your thoughts to this discussion. > > > > If we all agree that the source vertex parallelism shouldn't be bound by > > the downstream max parallelism > > (jobmanager.adaptive-batch-scheduler.max-parallelism) > > based on the rationale and the issues described above, I can take a stab > at > > addressing the issue. > > > > Let me file a ticket to track this issue. Otherwise, I'm looking forward > to > > hearing more thoughts from others as well, especially Lijie and Junrui > who > > have more context on the AdaptiveBatchScheduler. > > > > Regards > > Venkata krishnan > > > > > > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun <xingbe...@gmail.com> wrote: > > > > > Hi Venkat, > > > I agree that the parallelism of source vertex should not be upper > bounded > > > by the job's global max parallelism. The case you mentioned, >> High > > filter > > > selectivity with huge amounts of data to read excellently supports > this > > > viewpoint. (In fact, in the current implementation, if the source > > > parallelism is pre-specified at job create stage, rather than relying > on > > > the dynamic parallelism inference of the AdaptiveBatchScheduler, the > > source > > > vertex's parallelism can indeed exceed the job's global max > parallelism.) > > > > > > As Lijie and Junrui pointed out, the key issue is "semantic > consistency." > > > Currently, if a vertex has not set maxParallelism, the > > > AdaptiveBatchScheduler will use > > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as the > > vertex's > > > maxParallelism. Since the current implementation does not distinguish > > > between source vertices and downstream vertices, source vertices are > also > > > subject to this limitation. > > > > > > Therefore, I believe that if the issue of "semantic consistency" can be > > > well explained in the code and configuration documentation, the > > > AdaptiveBatchScheduler should support that the parallelism of source > > > vertices can exceed the job's global max parallelism. > > > > > > Best, > > > Xia > > > > > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月14日周日 10:31写道: > > > > > > > Let me state why I think "*jobmanager.adaptive-batch-sche* > > > > *duler.default-source-parallelism*" should not be bound by the " > > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*". > > > > > > > > - Source vertex is unique and does not have any upstream vertices > > > > - Downstream vertices read shuffled data partitioned by key, which > > is > > > > not the case for the Source vertex > > > > - Limiting source parallelism by downstream vertices' max > > parallelism > > > is > > > > incorrect > > > > > > > > If we say for ""semantic consistency" the source vertex parallelism > has > > > to > > > > be bound by the overall job's max parallelism, it can lead to > following > > > > issues: > > > > > > > > - High filter selectivity with huge amounts of data to read - > > setting > > > > high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so > that > > > > source parallelism can be set higher can lead to small blocks and > > > > sub-optimal performance. > > > > - Setting high > > "*jobmanager.adaptive-batch-scheduler.max-parallelism*" > > > > requires careful tuning of network buffer configurations which is > > > > unnecessary in cases where it is not required just so that the > > source > > > > parallelism can be set high. > > > > > > > > Regards > > > > Venkata krishnan > > > > > > > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee <jrlee....@gmail.com> > > wrote: > > > > > > > > > Hello Venkata krishnan, > > > > > > > > > > I think the term "semantic inconsistency" defined by > > > > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to > > > > maintaining a > > > > > uniform upper limit on parallelism across all vertices within a > job. > > As > > > > the > > > > > source vertices are part of the global execution graph, they should > > > also > > > > > respect this rule to ensure consistent application of parallelism > > > > > constraints. > > > > > > > > > > Best, > > > > > Junrui > > > > > > > > > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月12日周五 > 02:10写道: > > > > > > > > > > > Gentle bump on this question. cc @Becket Qin < > becket....@gmail.com > > > > > > as > > > > > > well. > > > > > > > > > > > > Regards > > > > > > Venkata krishnan > > > > > > > > > > > > > > > > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan < > > > > > > vsowr...@asu.edu> wrote: > > > > > > > > > > > > > Thanks for the response Lijie and Junrui. Sorry for the late > > reply. > > > > Few > > > > > > > follow up questions. > > > > > > > > > > > > > > > Source can actually ignore this limit > > > > > > > because it has no upstream, but this will lead to semantic > > > > > inconsistency. > > > > > > > > > > > > > > Lijie, can you please elaborate on the above comment further? > > What > > > do > > > > > you > > > > > > > mean when you say it will lead to "semantic inconsistency"? > > > > > > > > > > > > > > > Secondly, we first need to limit the max parallelism of > > > > (downstream) > > > > > > > vertex, and then we can decide how many subpartitions (upstream > > > > vertex) > > > > > > > should produce. The limit should be effective, otherwise some > > > > > downstream > > > > > > > tasks will have no data to process. > > > > > > > > > > > > > > This makes sense in the context of any other vertices other > than > > > the > > > > > > > source vertex. As you mentioned above ("Source can actually > > ignore > > > > this > > > > > > > limit because it has no upstream"), therefore I feel " > > > > > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism" > > > need > > > > > not > > > > > > > be upper bounded by > > > > > > "jobmanager.adaptive-batch-scheduler.max-parallelism". > > > > > > > > > > > > > > Regards > > > > > > > Venkata krishnan > > > > > > > > > > > > > > > > > > > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee < > jrlee....@gmail.com> > > > > > wrote: > > > > > > > > > > > > > >> Hi Venkat, > > > > > > >> > > > > > > >> As Lijie mentioned, in Flink, the parallelism is required to > be > > > > less > > > > > > than > > > > > > >> or equal to the maximum parallelism. The config option > > > > > > >> jobmanager.adaptive-batch-scheduler.max-parallelism and > > > > > > >> jobmanager.adaptive-batch-scheduler.default-source-parallelism > > > will > > > > be > > > > > > set > > > > > > >> as the source's parallelism and max-parallelism, respectively. > > > > > > Therefore, > > > > > > >> the check failed situation you encountered is in line with the > > > > > > >> expectations. > > > > > > >> > > > > > > >> Best, > > > > > > >> Junrui > > > > > > >> > > > > > > >> Lijie Wang <wangdachui9...@gmail.com> 于2024年2月29日周四 17:35写道: > > > > > > >> > > > > > > >> > Hi Venkat, > > > > > > >> > > > > > > > >> > >> default-source-parallelism config should be independent > > from > > > > the > > > > > > >> > max-parallelism > > > > > > >> > > > > > > > >> > Actually, it's not. > > > > > > >> > > > > > > > >> > Firstly, it's obvious that the parallelism should be less > than > > > or > > > > > > equal > > > > > > >> to > > > > > > >> > the max parallelism(both literally and execution). The > > > > > > >> > "jobmanager.adaptive-batch-scheduler.max-parallelism" will > be > > > used > > > > > as > > > > > > >> the > > > > > > >> > max parallelism for a vertex if you don't set max > parallelism > > > for > > > > it > > > > > > >> > individually (Just like the source in your case). > > > > > > >> > > > > > > > >> > Secondly, we first need to limit the max parallelism of > > > > (downstream) > > > > > > >> > vertex, and then we can decide how many subpartitions > > (upstream > > > > > > vertex) > > > > > > >> > should produce. The limit should be effective, otherwise > some > > > > > > downstream > > > > > > >> > tasks will have no data to process. Source can actually > ignore > > > > this > > > > > > >> limit > > > > > > >> > because it has no upstream, but this will lead to semantic > > > > > > >> inconsistency. > > > > > > >> > > > > > > > >> > Best, > > > > > > >> > Lijie > > > > > > >> > > > > > > > >> > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年2月29日周四 > > > > > 05:49写道: > > > > > > >> > > > > > > > >> > > Hi Flink devs, > > > > > > >> > > > > > > > > >> > > With Flink's AdaptiveBatchScheduler > > > > > > >> > > < > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$ > > > > > > >> > > > > > > > > > >> > > (Note: > > > > > > >> > > this is different from AdaptiveScheduler > > > > > > >> > > < > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$ > > > > > > >> > > >), > > > > > > >> > > the scheduler automatically determines the correct number > of > > > > > > >> downstream > > > > > > >> > > tasks required to process the shuffle generated by the > > > upstream > > > > > > >> vertex. > > > > > > >> > > > > > > > > >> > > I have a question regarding the current behavior. There > are > > 2 > > > > > > configs > > > > > > >> > which > > > > > > >> > > are in interplay here. > > > > > > >> > > 1. > > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism > > > > > > >> > > < > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$ > > > > > > >> > > > > > > > > > >> > > - The default parallelism of data source. > > > > > > >> > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism > > > > > > >> > > < > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$ > > > > > > >> > > > > > > > > > >> > > - > > > > > > >> > > Upper bound of allowed parallelism to set adaptively. > > > > > > >> > > > > > > > > >> > > Currently, if " > > > > > > >> > > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism > > > > > > >> > > < > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$ > > > > > > >> > > >" > > > > > > >> > > is greater than > > > > > "jobmanager.adaptive-batch-scheduler.max-parallelism > > > > > > >> > > < > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$ > > > > > > >> > > >", > > > > > > >> > > Flink application fails with the below message: > > > > > > >> > > > > > > > > >> > > "Vertex's parallelism should be smaller than or equal to > > > > vertex's > > > > > > max > > > > > > >> > > parallelism." > > > > > > >> > > > > > > > > >> > > This is the corresponding code in Flink's > > > > > > DefaultVertexParallelismInfo > > > > > > >> > > < > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java*L110__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqBRDEfwA$ > > > > > > >> > > >. > > > > > > >> > > My question is, "default-source-parallelism" config should > > be > > > > > > >> independent > > > > > > >> > > from the "max-parallelism" flag. The former controls the > > > default > > > > > > >> source > > > > > > >> > > parallelism while the latter controls the max number of > > > > partitions > > > > > > to > > > > > > >> > write > > > > > > >> > > the intermediate shuffle. > > > > > > >> > > > > > > > > >> > > If this is true, then the above check should be fixed. > > > > Otherwise, > > > > > > >> wanted > > > > > > >> > to > > > > > > >> > > understand why the "default-source-parallelism` should be > > less > > > > > than > > > > > > >> the > > > > > > >> > > "max-parallelism" > > > > > > >> > > > > > > > > >> > > Thanks > > > > > > >> > > Venkat > > > > > > >> > > > > > > > > >> > > > > > > > >> > > > > > > > > > > > > > > > > > > > > > > > > > > > >