Thanks Venkata and Xia for providing further clarification. I think your example illustrates the significance of this proposal very well. Please feel free go ahead and address the concerns.
Best, Junrui Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月16日周二 07:01写道: > Thanks for adding your thoughts to this discussion. > > If we all agree that the source vertex parallelism shouldn't be bound by > the downstream max parallelism > (jobmanager.adaptive-batch-scheduler.max-parallelism) > based on the rationale and the issues described above, I can take a stab at > addressing the issue. > > Let me file a ticket to track this issue. Otherwise, I'm looking forward to > hearing more thoughts from others as well, especially Lijie and Junrui who > have more context on the AdaptiveBatchScheduler. > > Regards > Venkata krishnan > > > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun <xingbe...@gmail.com> wrote: > > > Hi Venkat, > > I agree that the parallelism of source vertex should not be upper bounded > > by the job's global max parallelism. The case you mentioned, >> High > filter > > selectivity with huge amounts of data to read excellently supports this > > viewpoint. (In fact, in the current implementation, if the source > > parallelism is pre-specified at job create stage, rather than relying on > > the dynamic parallelism inference of the AdaptiveBatchScheduler, the > source > > vertex's parallelism can indeed exceed the job's global max parallelism.) > > > > As Lijie and Junrui pointed out, the key issue is "semantic consistency." > > Currently, if a vertex has not set maxParallelism, the > > AdaptiveBatchScheduler will use > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as the > vertex's > > maxParallelism. Since the current implementation does not distinguish > > between source vertices and downstream vertices, source vertices are also > > subject to this limitation. > > > > Therefore, I believe that if the issue of "semantic consistency" can be > > well explained in the code and configuration documentation, the > > AdaptiveBatchScheduler should support that the parallelism of source > > vertices can exceed the job's global max parallelism. > > > > Best, > > Xia > > > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月14日周日 10:31写道: > > > > > Let me state why I think "*jobmanager.adaptive-batch-sche* > > > *duler.default-source-parallelism*" should not be bound by the " > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*". > > > > > > - Source vertex is unique and does not have any upstream vertices > > > - Downstream vertices read shuffled data partitioned by key, which > is > > > not the case for the Source vertex > > > - Limiting source parallelism by downstream vertices' max > parallelism > > is > > > incorrect > > > > > > If we say for ""semantic consistency" the source vertex parallelism has > > to > > > be bound by the overall job's max parallelism, it can lead to following > > > issues: > > > > > > - High filter selectivity with huge amounts of data to read - > setting > > > high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so that > > > source parallelism can be set higher can lead to small blocks and > > > sub-optimal performance. > > > - Setting high > "*jobmanager.adaptive-batch-scheduler.max-parallelism*" > > > requires careful tuning of network buffer configurations which is > > > unnecessary in cases where it is not required just so that the > source > > > parallelism can be set high. > > > > > > Regards > > > Venkata krishnan > > > > > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee <jrlee....@gmail.com> > wrote: > > > > > > > Hello Venkata krishnan, > > > > > > > > I think the term "semantic inconsistency" defined by > > > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to > > > maintaining a > > > > uniform upper limit on parallelism across all vertices within a job. > As > > > the > > > > source vertices are part of the global execution graph, they should > > also > > > > respect this rule to ensure consistent application of parallelism > > > > constraints. > > > > > > > > Best, > > > > Junrui > > > > > > > > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年4月12日周五 02:10写道: > > > > > > > > > Gentle bump on this question. cc @Becket Qin <becket....@gmail.com > > > > as > > > > > well. > > > > > > > > > > Regards > > > > > Venkata krishnan > > > > > > > > > > > > > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan < > > > > > vsowr...@asu.edu> wrote: > > > > > > > > > > > Thanks for the response Lijie and Junrui. Sorry for the late > reply. > > > Few > > > > > > follow up questions. > > > > > > > > > > > > > Source can actually ignore this limit > > > > > > because it has no upstream, but this will lead to semantic > > > > inconsistency. > > > > > > > > > > > > Lijie, can you please elaborate on the above comment further? > What > > do > > > > you > > > > > > mean when you say it will lead to "semantic inconsistency"? > > > > > > > > > > > > > Secondly, we first need to limit the max parallelism of > > > (downstream) > > > > > > vertex, and then we can decide how many subpartitions (upstream > > > vertex) > > > > > > should produce. The limit should be effective, otherwise some > > > > downstream > > > > > > tasks will have no data to process. > > > > > > > > > > > > This makes sense in the context of any other vertices other than > > the > > > > > > source vertex. As you mentioned above ("Source can actually > ignore > > > this > > > > > > limit because it has no upstream"), therefore I feel " > > > > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism" > > need > > > > not > > > > > > be upper bounded by > > > > > "jobmanager.adaptive-batch-scheduler.max-parallelism". > > > > > > > > > > > > Regards > > > > > > Venkata krishnan > > > > > > > > > > > > > > > > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee <jrlee....@gmail.com> > > > > wrote: > > > > > > > > > > > >> Hi Venkat, > > > > > >> > > > > > >> As Lijie mentioned, in Flink, the parallelism is required to be > > > less > > > > > than > > > > > >> or equal to the maximum parallelism. The config option > > > > > >> jobmanager.adaptive-batch-scheduler.max-parallelism and > > > > > >> jobmanager.adaptive-batch-scheduler.default-source-parallelism > > will > > > be > > > > > set > > > > > >> as the source's parallelism and max-parallelism, respectively. > > > > > Therefore, > > > > > >> the check failed situation you encountered is in line with the > > > > > >> expectations. > > > > > >> > > > > > >> Best, > > > > > >> Junrui > > > > > >> > > > > > >> Lijie Wang <wangdachui9...@gmail.com> 于2024年2月29日周四 17:35写道: > > > > > >> > > > > > >> > Hi Venkat, > > > > > >> > > > > > > >> > >> default-source-parallelism config should be independent > from > > > the > > > > > >> > max-parallelism > > > > > >> > > > > > > >> > Actually, it's not. > > > > > >> > > > > > > >> > Firstly, it's obvious that the parallelism should be less than > > or > > > > > equal > > > > > >> to > > > > > >> > the max parallelism(both literally and execution). The > > > > > >> > "jobmanager.adaptive-batch-scheduler.max-parallelism" will be > > used > > > > as > > > > > >> the > > > > > >> > max parallelism for a vertex if you don't set max parallelism > > for > > > it > > > > > >> > individually (Just like the source in your case). > > > > > >> > > > > > > >> > Secondly, we first need to limit the max parallelism of > > > (downstream) > > > > > >> > vertex, and then we can decide how many subpartitions > (upstream > > > > > vertex) > > > > > >> > should produce. The limit should be effective, otherwise some > > > > > downstream > > > > > >> > tasks will have no data to process. Source can actually ignore > > > this > > > > > >> limit > > > > > >> > because it has no upstream, but this will lead to semantic > > > > > >> inconsistency. > > > > > >> > > > > > > >> > Best, > > > > > >> > Lijie > > > > > >> > > > > > > >> > Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年2月29日周四 > > > > 05:49写道: > > > > > >> > > > > > > >> > > Hi Flink devs, > > > > > >> > > > > > > > >> > > With Flink's AdaptiveBatchScheduler > > > > > >> > > < > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$ > > > > > >> > > > > > > > > >> > > (Note: > > > > > >> > > this is different from AdaptiveScheduler > > > > > >> > > < > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$ > > > > > >> > > >), > > > > > >> > > the scheduler automatically determines the correct number of > > > > > >> downstream > > > > > >> > > tasks required to process the shuffle generated by the > > upstream > > > > > >> vertex. > > > > > >> > > > > > > > >> > > I have a question regarding the current behavior. There are > 2 > > > > > configs > > > > > >> > which > > > > > >> > > are in interplay here. > > > > > >> > > 1. > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism > > > > > >> > > < > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$ > > > > > >> > > > > > > > > >> > > - The default parallelism of data source. > > > > > >> > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism > > > > > >> > > < > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$ > > > > > >> > > > > > > > > >> > > - > > > > > >> > > Upper bound of allowed parallelism to set adaptively. > > > > > >> > > > > > > > >> > > Currently, if " > > > > > >> > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism > > > > > >> > > < > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$ > > > > > >> > > >" > > > > > >> > > is greater than > > > > "jobmanager.adaptive-batch-scheduler.max-parallelism > > > > > >> > > < > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$ > > > > > >> > > >", > > > > > >> > > Flink application fails with the below message: > > > > > >> > > > > > > > >> > > "Vertex's parallelism should be smaller than or equal to > > > vertex's > > > > > max > > > > > >> > > parallelism." > > > > > >> > > > > > > > >> > > This is the corresponding code in Flink's > > > > > DefaultVertexParallelismInfo > > > > > >> > > < > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > https://urldefense.com/v3/__https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java*L110__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqBRDEfwA$ > > > > > >> > > >. > > > > > >> > > My question is, "default-source-parallelism" config should > be > > > > > >> independent > > > > > >> > > from the "max-parallelism" flag. The former controls the > > default > > > > > >> source > > > > > >> > > parallelism while the latter controls the max number of > > > partitions > > > > > to > > > > > >> > write > > > > > >> > > the intermediate shuffle. > > > > > >> > > > > > > > >> > > If this is true, then the above check should be fixed. > > > Otherwise, > > > > > >> wanted > > > > > >> > to > > > > > >> > > understand why the "default-source-parallelism` should be > less > > > > than > > > > > >> the > > > > > >> > > "max-parallelism" > > > > > >> > > > > > > > >> > > Thanks > > > > > >> > > Venkat > > > > > >> > > > > > > > >> > > > > > > >> > > > > > > > > > > > > > > > > > > > > >