Re: Question around Flink's AdaptiveBatchScheduler
First of all my sincere apologies for slow progress on this one. @Junrui Lee and Xia, I updated the PR with respect to the last set of feedback comments on the PR. Please take a look. I am hoping to finish this one quickly. Regards Venkata krishnan On Thu, May 9, 2024 at 4:35 AM Venkatakrishnan Sowrirajan wrote: > Xia, > > Thanks for the reviews. Unfortunately due to work commitments I am little > delayed in addressing your review comments. Mostly will be done by end of > this week. Just a quick heads up. > > Jinrui, > > Thanks, that would be great. > > On Mon, May 6, 2024, 12:45 AM Junrui Lee wrote: > >> Hi, >> Thanks for the reminder. I will review it soon during my free time. >> >> Venkatakrishnan Sowrirajan 于2024年5月4日周六 10:10写道: >> >> > Jinrui and Xia >> > >> > Gentle ping for reviews. >> > >> > On Mon, Apr 29, 2024, 8:28 PM Venkatakrishnan Sowrirajan < >> vsowr...@asu.edu >> > > >> > wrote: >> > >> > > Hi Xia and Jinrui, >> > > >> > > Filed >> https://urldefense.com/v3/__https://github.com/apache/flink/pull/24736__;!!IKRxdwAv5BmarQ!YHVAC2UWD7ITI-Xyk6Flu6WhuSWYsHTLCOtJkLUhtIyojo0OxQOfsQmoS7d9-q0OhcRzbZ0hjO19qbictUrQLQ$ >> to address the above >> > > described issue. Please take a look whenever you can. >> > > >> > > Thanks >> > > Venkat >> > > >> > > >> > > On Thu, Apr 18, 2024 at 12:16 PM Venkatakrishnan Sowrirajan < >> > > vsowr...@asu.edu> wrote: >> > > >> > >> Filed >> https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLINK-35165__;!!IKRxdwAv5BmarQ!YHVAC2UWD7ITI-Xyk6Flu6WhuSWYsHTLCOtJkLUhtIyojo0OxQOfsQmoS7d9-q0OhcRzbZ0hjO19qbim4QLkBQ$ >> to address the >> > >> above described issue. Will share the PR here once it is ready for >> > review. >> > >> >> > >> Regards >> > >> Venkata krishnan >> > >> >> > >> >> > >> On Wed, Apr 17, 2024 at 5:32 AM Junrui Lee >> wrote: >> > >> >> > >>> Thanks Venkata and Xia for providing further clarification. I think >> > your >> > >>> example illustrates the significance of this proposal very well. >> Please >> > >>> feel free go ahead and address the concerns. >> > >>> >> > >>> Best, >> > >>> Junrui >> > >>> >> > >>> Venkatakrishnan Sowrirajan 于2024年4月16日周二 >> 07:01写道: >> > >>> >> > >>> > Thanks for adding your thoughts to this discussion. >> > >>> > >> > >>> > If we all agree that the source vertex parallelism shouldn't be >> bound >> > >>> by >> > >>> > the downstream max parallelism >> > >>> > (jobmanager.adaptive-batch-scheduler.max-parallelism) >> > >>> > based on the rationale and the issues described above, I can take >> a >> > >>> stab at >> > >>> > addressing the issue. >> > >>> > >> > >>> > Let me file a ticket to track this issue. Otherwise, I'm looking >> > >>> forward to >> > >>> > hearing more thoughts from others as well, especially Lijie and >> > Junrui >> > >>> who >> > >>> > have more context on the AdaptiveBatchScheduler. >> > >>> > >> > >>> > Regards >> > >>> > Venkata krishnan >> > >>> > >> > >>> > >> > >>> > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun >> > wrote: >> > >>> > >> > >>> > > Hi Venkat, >> > >>> > > I agree that the parallelism of source vertex should not be >> upper >> > >>> bounded >> > >>> > > by the job's global max parallelism. The case you mentioned, >> >> > High >> > >>> > filter >> > >>> > > selectivity with huge amounts of data to read excellently >> supports >> > >>> this >> > >>> > > viewpoint. (In fact, in the current implementation, if the >> source >> > >>> > > parallelism is pre-specified at job create stage, rather than >> > >>> relying on >> > >>> > > the dynamic parallelism inference of the AdaptiveBatchScheduler, >> > the >> > >>> > source >> > >>> > > vertex's parallelism can indeed exceed the job's global max >> > >>> parallelism.) >> > >>> > > >> > >>> > > As Lijie and Junrui pointed out, the key issue is "semantic >> > >>> consistency." >> > >>> > > Currently, if a vertex has not set maxParallelism, the >> > >>> > > AdaptiveBatchScheduler will use >> > >>> > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as >> the >> > >>> > vertex's >> > >>> > > maxParallelism. Since the current implementation does not >> > distinguish >> > >>> > > between source vertices and downstream vertices, source vertices >> > are >> > >>> also >> > >>> > > subject to this limitation. >> > >>> > > >> > >>> > > Therefore, I believe that if the issue of "semantic consistency" >> > can >> > >>> be >> > >>> > > well explained in the code and configuration documentation, the >> > >>> > > AdaptiveBatchScheduler should support that the parallelism of >> > source >> > >>> > > vertices can exceed the job's global max parallelism. >> > >>> > > >> > >>> > > Best, >> > >>> > > Xia >> > >>> > > >> > >>> > > Venkatakrishnan Sowrirajan 于2024年4月14日周日 >> > 10:31写道: >> > >>> > > >> > >>> > > > Let me state why I think "*jobmanager.adaptive-batch-sche* >> > >>> > > > *duler.default-source-parallelism*" should not be bound by >> the " >> > >>> > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*". >> > >>>
Re: Question around Flink's AdaptiveBatchScheduler
Xia, Thanks for the reviews. Unfortunately due to work commitments I am little delayed in addressing your review comments. Mostly will be done by end of this week. Just a quick heads up. Jinrui, Thanks, that would be great. On Mon, May 6, 2024, 12:45 AM Junrui Lee wrote: > Hi, > Thanks for the reminder. I will review it soon during my free time. > > Venkatakrishnan Sowrirajan 于2024年5月4日周六 10:10写道: > > > Jinrui and Xia > > > > Gentle ping for reviews. > > > > On Mon, Apr 29, 2024, 8:28 PM Venkatakrishnan Sowrirajan < > vsowr...@asu.edu > > > > > wrote: > > > > > Hi Xia and Jinrui, > > > > > > Filed > https://urldefense.com/v3/__https://github.com/apache/flink/pull/24736__;!!IKRxdwAv5BmarQ!YHVAC2UWD7ITI-Xyk6Flu6WhuSWYsHTLCOtJkLUhtIyojo0OxQOfsQmoS7d9-q0OhcRzbZ0hjO19qbictUrQLQ$ > to address the above > > > described issue. Please take a look whenever you can. > > > > > > Thanks > > > Venkat > > > > > > > > > On Thu, Apr 18, 2024 at 12:16 PM Venkatakrishnan Sowrirajan < > > > vsowr...@asu.edu> wrote: > > > > > >> Filed > https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLINK-35165__;!!IKRxdwAv5BmarQ!YHVAC2UWD7ITI-Xyk6Flu6WhuSWYsHTLCOtJkLUhtIyojo0OxQOfsQmoS7d9-q0OhcRzbZ0hjO19qbim4QLkBQ$ > to address the > > >> above described issue. Will share the PR here once it is ready for > > review. > > >> > > >> Regards > > >> Venkata krishnan > > >> > > >> > > >> On Wed, Apr 17, 2024 at 5:32 AM Junrui Lee > wrote: > > >> > > >>> Thanks Venkata and Xia for providing further clarification. I think > > your > > >>> example illustrates the significance of this proposal very well. > Please > > >>> feel free go ahead and address the concerns. > > >>> > > >>> Best, > > >>> Junrui > > >>> > > >>> Venkatakrishnan Sowrirajan 于2024年4月16日周二 07:01写道: > > >>> > > >>> > Thanks for adding your thoughts to this discussion. > > >>> > > > >>> > If we all agree that the source vertex parallelism shouldn't be > bound > > >>> by > > >>> > the downstream max parallelism > > >>> > (jobmanager.adaptive-batch-scheduler.max-parallelism) > > >>> > based on the rationale and the issues described above, I can take a > > >>> stab at > > >>> > addressing the issue. > > >>> > > > >>> > Let me file a ticket to track this issue. Otherwise, I'm looking > > >>> forward to > > >>> > hearing more thoughts from others as well, especially Lijie and > > Junrui > > >>> who > > >>> > have more context on the AdaptiveBatchScheduler. > > >>> > > > >>> > Regards > > >>> > Venkata krishnan > > >>> > > > >>> > > > >>> > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun > > wrote: > > >>> > > > >>> > > Hi Venkat, > > >>> > > I agree that the parallelism of source vertex should not be upper > > >>> bounded > > >>> > > by the job's global max parallelism. The case you mentioned, >> > > High > > >>> > filter > > >>> > > selectivity with huge amounts of data to read excellently > supports > > >>> this > > >>> > > viewpoint. (In fact, in the current implementation, if the source > > >>> > > parallelism is pre-specified at job create stage, rather than > > >>> relying on > > >>> > > the dynamic parallelism inference of the AdaptiveBatchScheduler, > > the > > >>> > source > > >>> > > vertex's parallelism can indeed exceed the job's global max > > >>> parallelism.) > > >>> > > > > >>> > > As Lijie and Junrui pointed out, the key issue is "semantic > > >>> consistency." > > >>> > > Currently, if a vertex has not set maxParallelism, the > > >>> > > AdaptiveBatchScheduler will use > > >>> > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as > the > > >>> > vertex's > > >>> > > maxParallelism. Since the current implementation does not > > distinguish > > >>> > > between source vertices and downstream vertices, source vertices > > are > > >>> also > > >>> > > subject to this limitation. > > >>> > > > > >>> > > Therefore, I believe that if the issue of "semantic consistency" > > can > > >>> be > > >>> > > well explained in the code and configuration documentation, the > > >>> > > AdaptiveBatchScheduler should support that the parallelism of > > source > > >>> > > vertices can exceed the job's global max parallelism. > > >>> > > > > >>> > > Best, > > >>> > > Xia > > >>> > > > > >>> > > Venkatakrishnan Sowrirajan 于2024年4月14日周日 > > 10:31写道: > > >>> > > > > >>> > > > Let me state why I think "*jobmanager.adaptive-batch-sche* > > >>> > > > *duler.default-source-parallelism*" should not be bound by the > " > > >>> > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*". > > >>> > > > > > >>> > > >- Source vertex is unique and does not have any upstream > > >>> vertices > > >>> > > >- Downstream vertices read shuffled data partitioned by key, > > >>> which > > >>> > is > > >>> > > >not the case for the Source vertex > > >>> > > >- Limiting source parallelism by downstream vertices' max > > >>> > parallelism > > >>> > > is > > >>> > > >incorrect > > >>> > > > > > >>> > > > If we say for ""semantic consistency" the source vertex > >
Re: Question around Flink's AdaptiveBatchScheduler
Hi, Thanks for the reminder. I will review it soon during my free time. Venkatakrishnan Sowrirajan 于2024年5月4日周六 10:10写道: > Jinrui and Xia > > Gentle ping for reviews. > > On Mon, Apr 29, 2024, 8:28 PM Venkatakrishnan Sowrirajan > > wrote: > > > Hi Xia and Jinrui, > > > > Filed https://github.com/apache/flink/pull/24736 to address the above > > described issue. Please take a look whenever you can. > > > > Thanks > > Venkat > > > > > > On Thu, Apr 18, 2024 at 12:16 PM Venkatakrishnan Sowrirajan < > > vsowr...@asu.edu> wrote: > > > >> Filed https://issues.apache.org/jira/browse/FLINK-35165 to address the > >> above described issue. Will share the PR here once it is ready for > review. > >> > >> Regards > >> Venkata krishnan > >> > >> > >> On Wed, Apr 17, 2024 at 5:32 AM Junrui Lee wrote: > >> > >>> Thanks Venkata and Xia for providing further clarification. I think > your > >>> example illustrates the significance of this proposal very well. Please > >>> feel free go ahead and address the concerns. > >>> > >>> Best, > >>> Junrui > >>> > >>> Venkatakrishnan Sowrirajan 于2024年4月16日周二 07:01写道: > >>> > >>> > Thanks for adding your thoughts to this discussion. > >>> > > >>> > If we all agree that the source vertex parallelism shouldn't be bound > >>> by > >>> > the downstream max parallelism > >>> > (jobmanager.adaptive-batch-scheduler.max-parallelism) > >>> > based on the rationale and the issues described above, I can take a > >>> stab at > >>> > addressing the issue. > >>> > > >>> > Let me file a ticket to track this issue. Otherwise, I'm looking > >>> forward to > >>> > hearing more thoughts from others as well, especially Lijie and > Junrui > >>> who > >>> > have more context on the AdaptiveBatchScheduler. > >>> > > >>> > Regards > >>> > Venkata krishnan > >>> > > >>> > > >>> > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun > wrote: > >>> > > >>> > > Hi Venkat, > >>> > > I agree that the parallelism of source vertex should not be upper > >>> bounded > >>> > > by the job's global max parallelism. The case you mentioned, >> > High > >>> > filter > >>> > > selectivity with huge amounts of data to read excellently supports > >>> this > >>> > > viewpoint. (In fact, in the current implementation, if the source > >>> > > parallelism is pre-specified at job create stage, rather than > >>> relying on > >>> > > the dynamic parallelism inference of the AdaptiveBatchScheduler, > the > >>> > source > >>> > > vertex's parallelism can indeed exceed the job's global max > >>> parallelism.) > >>> > > > >>> > > As Lijie and Junrui pointed out, the key issue is "semantic > >>> consistency." > >>> > > Currently, if a vertex has not set maxParallelism, the > >>> > > AdaptiveBatchScheduler will use > >>> > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as the > >>> > vertex's > >>> > > maxParallelism. Since the current implementation does not > distinguish > >>> > > between source vertices and downstream vertices, source vertices > are > >>> also > >>> > > subject to this limitation. > >>> > > > >>> > > Therefore, I believe that if the issue of "semantic consistency" > can > >>> be > >>> > > well explained in the code and configuration documentation, the > >>> > > AdaptiveBatchScheduler should support that the parallelism of > source > >>> > > vertices can exceed the job's global max parallelism. > >>> > > > >>> > > Best, > >>> > > Xia > >>> > > > >>> > > Venkatakrishnan Sowrirajan 于2024年4月14日周日 > 10:31写道: > >>> > > > >>> > > > Let me state why I think "*jobmanager.adaptive-batch-sche* > >>> > > > *duler.default-source-parallelism*" should not be bound by the " > >>> > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*". > >>> > > > > >>> > > >- Source vertex is unique and does not have any upstream > >>> vertices > >>> > > >- Downstream vertices read shuffled data partitioned by key, > >>> which > >>> > is > >>> > > >not the case for the Source vertex > >>> > > >- Limiting source parallelism by downstream vertices' max > >>> > parallelism > >>> > > is > >>> > > >incorrect > >>> > > > > >>> > > > If we say for ""semantic consistency" the source vertex > >>> parallelism has > >>> > > to > >>> > > > be bound by the overall job's max parallelism, it can lead to > >>> following > >>> > > > issues: > >>> > > > > >>> > > >- High filter selectivity with huge amounts of data to read - > >>> > setting > >>> > > >high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" > so > >>> that > >>> > > >source parallelism can be set higher can lead to small blocks > >>> and > >>> > > >sub-optimal performance. > >>> > > >- Setting high > >>> > "*jobmanager.adaptive-batch-scheduler.max-parallelism*" > >>> > > >requires careful tuning of network buffer configurations which > >>> is > >>> > > >unnecessary in cases where it is not required just so that the > >>> > source > >>> > > >parallelism can be set high. > >>> > > > > >>> > > > Regards > >>> > > > Venkata krishnan >
Re: Question around Flink's AdaptiveBatchScheduler
Jinrui and Xia Gentle ping for reviews. On Mon, Apr 29, 2024, 8:28 PM Venkatakrishnan Sowrirajan wrote: > Hi Xia and Jinrui, > > Filed https://github.com/apache/flink/pull/24736 to address the above > described issue. Please take a look whenever you can. > > Thanks > Venkat > > > On Thu, Apr 18, 2024 at 12:16 PM Venkatakrishnan Sowrirajan < > vsowr...@asu.edu> wrote: > >> Filed https://issues.apache.org/jira/browse/FLINK-35165 to address the >> above described issue. Will share the PR here once it is ready for review. >> >> Regards >> Venkata krishnan >> >> >> On Wed, Apr 17, 2024 at 5:32 AM Junrui Lee wrote: >> >>> Thanks Venkata and Xia for providing further clarification. I think your >>> example illustrates the significance of this proposal very well. Please >>> feel free go ahead and address the concerns. >>> >>> Best, >>> Junrui >>> >>> Venkatakrishnan Sowrirajan 于2024年4月16日周二 07:01写道: >>> >>> > Thanks for adding your thoughts to this discussion. >>> > >>> > If we all agree that the source vertex parallelism shouldn't be bound >>> by >>> > the downstream max parallelism >>> > (jobmanager.adaptive-batch-scheduler.max-parallelism) >>> > based on the rationale and the issues described above, I can take a >>> stab at >>> > addressing the issue. >>> > >>> > Let me file a ticket to track this issue. Otherwise, I'm looking >>> forward to >>> > hearing more thoughts from others as well, especially Lijie and Junrui >>> who >>> > have more context on the AdaptiveBatchScheduler. >>> > >>> > Regards >>> > Venkata krishnan >>> > >>> > >>> > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun wrote: >>> > >>> > > Hi Venkat, >>> > > I agree that the parallelism of source vertex should not be upper >>> bounded >>> > > by the job's global max parallelism. The case you mentioned, >> High >>> > filter >>> > > selectivity with huge amounts of data to read excellently supports >>> this >>> > > viewpoint. (In fact, in the current implementation, if the source >>> > > parallelism is pre-specified at job create stage, rather than >>> relying on >>> > > the dynamic parallelism inference of the AdaptiveBatchScheduler, the >>> > source >>> > > vertex's parallelism can indeed exceed the job's global max >>> parallelism.) >>> > > >>> > > As Lijie and Junrui pointed out, the key issue is "semantic >>> consistency." >>> > > Currently, if a vertex has not set maxParallelism, the >>> > > AdaptiveBatchScheduler will use >>> > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as the >>> > vertex's >>> > > maxParallelism. Since the current implementation does not distinguish >>> > > between source vertices and downstream vertices, source vertices are >>> also >>> > > subject to this limitation. >>> > > >>> > > Therefore, I believe that if the issue of "semantic consistency" can >>> be >>> > > well explained in the code and configuration documentation, the >>> > > AdaptiveBatchScheduler should support that the parallelism of source >>> > > vertices can exceed the job's global max parallelism. >>> > > >>> > > Best, >>> > > Xia >>> > > >>> > > Venkatakrishnan Sowrirajan 于2024年4月14日周日 10:31写道: >>> > > >>> > > > Let me state why I think "*jobmanager.adaptive-batch-sche* >>> > > > *duler.default-source-parallelism*" should not be bound by the " >>> > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*". >>> > > > >>> > > >- Source vertex is unique and does not have any upstream >>> vertices >>> > > >- Downstream vertices read shuffled data partitioned by key, >>> which >>> > is >>> > > >not the case for the Source vertex >>> > > >- Limiting source parallelism by downstream vertices' max >>> > parallelism >>> > > is >>> > > >incorrect >>> > > > >>> > > > If we say for ""semantic consistency" the source vertex >>> parallelism has >>> > > to >>> > > > be bound by the overall job's max parallelism, it can lead to >>> following >>> > > > issues: >>> > > > >>> > > >- High filter selectivity with huge amounts of data to read - >>> > setting >>> > > >high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so >>> that >>> > > >source parallelism can be set higher can lead to small blocks >>> and >>> > > >sub-optimal performance. >>> > > >- Setting high >>> > "*jobmanager.adaptive-batch-scheduler.max-parallelism*" >>> > > >requires careful tuning of network buffer configurations which >>> is >>> > > >unnecessary in cases where it is not required just so that the >>> > source >>> > > >parallelism can be set high. >>> > > > >>> > > > Regards >>> > > > Venkata krishnan >>> > > > >>> > > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee >>> > wrote: >>> > > > >>> > > > > Hello Venkata krishnan, >>> > > > > >>> > > > > I think the term "semantic inconsistency" defined by >>> > > > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to >>> > > > maintaining a >>> > > > > uniform upper limit on parallelism across all vertices within a >>> job. >>> > As >>> > > > the >>> > > > > s
Re: Question around Flink's AdaptiveBatchScheduler
Hi Xia and Jinrui, Filed https://github.com/apache/flink/pull/24736 to address the above described issue. Please take a look whenever you can. Thanks Venkat On Thu, Apr 18, 2024 at 12:16 PM Venkatakrishnan Sowrirajan < vsowr...@asu.edu> wrote: > Filed https://issues.apache.org/jira/browse/FLINK-35165 to address the > above described issue. Will share the PR here once it is ready for review. > > Regards > Venkata krishnan > > > On Wed, Apr 17, 2024 at 5:32 AM Junrui Lee wrote: > >> Thanks Venkata and Xia for providing further clarification. I think your >> example illustrates the significance of this proposal very well. Please >> feel free go ahead and address the concerns. >> >> Best, >> Junrui >> >> Venkatakrishnan Sowrirajan 于2024年4月16日周二 07:01写道: >> >> > Thanks for adding your thoughts to this discussion. >> > >> > If we all agree that the source vertex parallelism shouldn't be bound by >> > the downstream max parallelism >> > (jobmanager.adaptive-batch-scheduler.max-parallelism) >> > based on the rationale and the issues described above, I can take a >> stab at >> > addressing the issue. >> > >> > Let me file a ticket to track this issue. Otherwise, I'm looking >> forward to >> > hearing more thoughts from others as well, especially Lijie and Junrui >> who >> > have more context on the AdaptiveBatchScheduler. >> > >> > Regards >> > Venkata krishnan >> > >> > >> > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun wrote: >> > >> > > Hi Venkat, >> > > I agree that the parallelism of source vertex should not be upper >> bounded >> > > by the job's global max parallelism. The case you mentioned, >> High >> > filter >> > > selectivity with huge amounts of data to read excellently supports >> this >> > > viewpoint. (In fact, in the current implementation, if the source >> > > parallelism is pre-specified at job create stage, rather than relying >> on >> > > the dynamic parallelism inference of the AdaptiveBatchScheduler, the >> > source >> > > vertex's parallelism can indeed exceed the job's global max >> parallelism.) >> > > >> > > As Lijie and Junrui pointed out, the key issue is "semantic >> consistency." >> > > Currently, if a vertex has not set maxParallelism, the >> > > AdaptiveBatchScheduler will use >> > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as the >> > vertex's >> > > maxParallelism. Since the current implementation does not distinguish >> > > between source vertices and downstream vertices, source vertices are >> also >> > > subject to this limitation. >> > > >> > > Therefore, I believe that if the issue of "semantic consistency" can >> be >> > > well explained in the code and configuration documentation, the >> > > AdaptiveBatchScheduler should support that the parallelism of source >> > > vertices can exceed the job's global max parallelism. >> > > >> > > Best, >> > > Xia >> > > >> > > Venkatakrishnan Sowrirajan 于2024年4月14日周日 10:31写道: >> > > >> > > > Let me state why I think "*jobmanager.adaptive-batch-sche* >> > > > *duler.default-source-parallelism*" should not be bound by the " >> > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*". >> > > > >> > > >- Source vertex is unique and does not have any upstream vertices >> > > >- Downstream vertices read shuffled data partitioned by key, >> which >> > is >> > > >not the case for the Source vertex >> > > >- Limiting source parallelism by downstream vertices' max >> > parallelism >> > > is >> > > >incorrect >> > > > >> > > > If we say for ""semantic consistency" the source vertex parallelism >> has >> > > to >> > > > be bound by the overall job's max parallelism, it can lead to >> following >> > > > issues: >> > > > >> > > >- High filter selectivity with huge amounts of data to read - >> > setting >> > > >high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so >> that >> > > >source parallelism can be set higher can lead to small blocks and >> > > >sub-optimal performance. >> > > >- Setting high >> > "*jobmanager.adaptive-batch-scheduler.max-parallelism*" >> > > >requires careful tuning of network buffer configurations which is >> > > >unnecessary in cases where it is not required just so that the >> > source >> > > >parallelism can be set high. >> > > > >> > > > Regards >> > > > Venkata krishnan >> > > > >> > > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee >> > wrote: >> > > > >> > > > > Hello Venkata krishnan, >> > > > > >> > > > > I think the term "semantic inconsistency" defined by >> > > > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to >> > > > maintaining a >> > > > > uniform upper limit on parallelism across all vertices within a >> job. >> > As >> > > > the >> > > > > source vertices are part of the global execution graph, they >> should >> > > also >> > > > > respect this rule to ensure consistent application of parallelism >> > > > > constraints. >> > > > > >> > > > > Best, >> > > > > Junrui >> > > > > >> > > > > Venkatakrishnan So
Re: Question around Flink's AdaptiveBatchScheduler
Filed https://issues.apache.org/jira/browse/FLINK-35165 to address the above described issue. Will share the PR here once it is ready for review. Regards Venkata krishnan On Wed, Apr 17, 2024 at 5:32 AM Junrui Lee wrote: > Thanks Venkata and Xia for providing further clarification. I think your > example illustrates the significance of this proposal very well. Please > feel free go ahead and address the concerns. > > Best, > Junrui > > Venkatakrishnan Sowrirajan 于2024年4月16日周二 07:01写道: > > > Thanks for adding your thoughts to this discussion. > > > > If we all agree that the source vertex parallelism shouldn't be bound by > > the downstream max parallelism > > (jobmanager.adaptive-batch-scheduler.max-parallelism) > > based on the rationale and the issues described above, I can take a stab > at > > addressing the issue. > > > > Let me file a ticket to track this issue. Otherwise, I'm looking forward > to > > hearing more thoughts from others as well, especially Lijie and Junrui > who > > have more context on the AdaptiveBatchScheduler. > > > > Regards > > Venkata krishnan > > > > > > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun wrote: > > > > > Hi Venkat, > > > I agree that the parallelism of source vertex should not be upper > bounded > > > by the job's global max parallelism. The case you mentioned, >> High > > filter > > > selectivity with huge amounts of data to read excellently supports > this > > > viewpoint. (In fact, in the current implementation, if the source > > > parallelism is pre-specified at job create stage, rather than relying > on > > > the dynamic parallelism inference of the AdaptiveBatchScheduler, the > > source > > > vertex's parallelism can indeed exceed the job's global max > parallelism.) > > > > > > As Lijie and Junrui pointed out, the key issue is "semantic > consistency." > > > Currently, if a vertex has not set maxParallelism, the > > > AdaptiveBatchScheduler will use > > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as the > > vertex's > > > maxParallelism. Since the current implementation does not distinguish > > > between source vertices and downstream vertices, source vertices are > also > > > subject to this limitation. > > > > > > Therefore, I believe that if the issue of "semantic consistency" can be > > > well explained in the code and configuration documentation, the > > > AdaptiveBatchScheduler should support that the parallelism of source > > > vertices can exceed the job's global max parallelism. > > > > > > Best, > > > Xia > > > > > > Venkatakrishnan Sowrirajan 于2024年4月14日周日 10:31写道: > > > > > > > Let me state why I think "*jobmanager.adaptive-batch-sche* > > > > *duler.default-source-parallelism*" should not be bound by the " > > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*". > > > > > > > >- Source vertex is unique and does not have any upstream vertices > > > >- Downstream vertices read shuffled data partitioned by key, which > > is > > > >not the case for the Source vertex > > > >- Limiting source parallelism by downstream vertices' max > > parallelism > > > is > > > >incorrect > > > > > > > > If we say for ""semantic consistency" the source vertex parallelism > has > > > to > > > > be bound by the overall job's max parallelism, it can lead to > following > > > > issues: > > > > > > > >- High filter selectivity with huge amounts of data to read - > > setting > > > >high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so > that > > > >source parallelism can be set higher can lead to small blocks and > > > >sub-optimal performance. > > > >- Setting high > > "*jobmanager.adaptive-batch-scheduler.max-parallelism*" > > > >requires careful tuning of network buffer configurations which is > > > >unnecessary in cases where it is not required just so that the > > source > > > >parallelism can be set high. > > > > > > > > Regards > > > > Venkata krishnan > > > > > > > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee > > wrote: > > > > > > > > > Hello Venkata krishnan, > > > > > > > > > > I think the term "semantic inconsistency" defined by > > > > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to > > > > maintaining a > > > > > uniform upper limit on parallelism across all vertices within a > job. > > As > > > > the > > > > > source vertices are part of the global execution graph, they should > > > also > > > > > respect this rule to ensure consistent application of parallelism > > > > > constraints. > > > > > > > > > > Best, > > > > > Junrui > > > > > > > > > > Venkatakrishnan Sowrirajan 于2024年4月12日周五 > 02:10写道: > > > > > > > > > > > Gentle bump on this question. cc @Becket Qin < > becket@gmail.com > > > > > > as > > > > > > well. > > > > > > > > > > > > Regards > > > > > > Venkata krishnan > > > > > > > > > > > > > > > > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan < > > > > > > vsowr...@asu.edu> wrote: > > > > > > > > > > > > > Thanks for th
Re: Question around Flink's AdaptiveBatchScheduler
Thanks Venkata and Xia for providing further clarification. I think your example illustrates the significance of this proposal very well. Please feel free go ahead and address the concerns. Best, Junrui Venkatakrishnan Sowrirajan 于2024年4月16日周二 07:01写道: > Thanks for adding your thoughts to this discussion. > > If we all agree that the source vertex parallelism shouldn't be bound by > the downstream max parallelism > (jobmanager.adaptive-batch-scheduler.max-parallelism) > based on the rationale and the issues described above, I can take a stab at > addressing the issue. > > Let me file a ticket to track this issue. Otherwise, I'm looking forward to > hearing more thoughts from others as well, especially Lijie and Junrui who > have more context on the AdaptiveBatchScheduler. > > Regards > Venkata krishnan > > > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun wrote: > > > Hi Venkat, > > I agree that the parallelism of source vertex should not be upper bounded > > by the job's global max parallelism. The case you mentioned, >> High > filter > > selectivity with huge amounts of data to read excellently supports this > > viewpoint. (In fact, in the current implementation, if the source > > parallelism is pre-specified at job create stage, rather than relying on > > the dynamic parallelism inference of the AdaptiveBatchScheduler, the > source > > vertex's parallelism can indeed exceed the job's global max parallelism.) > > > > As Lijie and Junrui pointed out, the key issue is "semantic consistency." > > Currently, if a vertex has not set maxParallelism, the > > AdaptiveBatchScheduler will use > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as the > vertex's > > maxParallelism. Since the current implementation does not distinguish > > between source vertices and downstream vertices, source vertices are also > > subject to this limitation. > > > > Therefore, I believe that if the issue of "semantic consistency" can be > > well explained in the code and configuration documentation, the > > AdaptiveBatchScheduler should support that the parallelism of source > > vertices can exceed the job's global max parallelism. > > > > Best, > > Xia > > > > Venkatakrishnan Sowrirajan 于2024年4月14日周日 10:31写道: > > > > > Let me state why I think "*jobmanager.adaptive-batch-sche* > > > *duler.default-source-parallelism*" should not be bound by the " > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*". > > > > > >- Source vertex is unique and does not have any upstream vertices > > >- Downstream vertices read shuffled data partitioned by key, which > is > > >not the case for the Source vertex > > >- Limiting source parallelism by downstream vertices' max > parallelism > > is > > >incorrect > > > > > > If we say for ""semantic consistency" the source vertex parallelism has > > to > > > be bound by the overall job's max parallelism, it can lead to following > > > issues: > > > > > >- High filter selectivity with huge amounts of data to read - > setting > > >high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so that > > >source parallelism can be set higher can lead to small blocks and > > >sub-optimal performance. > > >- Setting high > "*jobmanager.adaptive-batch-scheduler.max-parallelism*" > > >requires careful tuning of network buffer configurations which is > > >unnecessary in cases where it is not required just so that the > source > > >parallelism can be set high. > > > > > > Regards > > > Venkata krishnan > > > > > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee > wrote: > > > > > > > Hello Venkata krishnan, > > > > > > > > I think the term "semantic inconsistency" defined by > > > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to > > > maintaining a > > > > uniform upper limit on parallelism across all vertices within a job. > As > > > the > > > > source vertices are part of the global execution graph, they should > > also > > > > respect this rule to ensure consistent application of parallelism > > > > constraints. > > > > > > > > Best, > > > > Junrui > > > > > > > > Venkatakrishnan Sowrirajan 于2024年4月12日周五 02:10写道: > > > > > > > > > Gentle bump on this question. cc @Becket Qin > > > as > > > > > well. > > > > > > > > > > Regards > > > > > Venkata krishnan > > > > > > > > > > > > > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan < > > > > > vsowr...@asu.edu> wrote: > > > > > > > > > > > Thanks for the response Lijie and Junrui. Sorry for the late > reply. > > > Few > > > > > > follow up questions. > > > > > > > > > > > > > Source can actually ignore this limit > > > > > > because it has no upstream, but this will lead to semantic > > > > inconsistency. > > > > > > > > > > > > Lijie, can you please elaborate on the above comment further? > What > > do > > > > you > > > > > > mean when you say it will lead to "semantic inconsistency"? > > > > > > > > > > > > > Secondly, we first need to limit the max parallelism of >
Re: Question around Flink's AdaptiveBatchScheduler
Thanks for adding your thoughts to this discussion. If we all agree that the source vertex parallelism shouldn't be bound by the downstream max parallelism (jobmanager.adaptive-batch-scheduler.max-parallelism) based on the rationale and the issues described above, I can take a stab at addressing the issue. Let me file a ticket to track this issue. Otherwise, I'm looking forward to hearing more thoughts from others as well, especially Lijie and Junrui who have more context on the AdaptiveBatchScheduler. Regards Venkata krishnan On Mon, Apr 15, 2024 at 12:54 AM Xia Sun wrote: > Hi Venkat, > I agree that the parallelism of source vertex should not be upper bounded > by the job's global max parallelism. The case you mentioned, >> High filter > selectivity with huge amounts of data to read excellently supports this > viewpoint. (In fact, in the current implementation, if the source > parallelism is pre-specified at job create stage, rather than relying on > the dynamic parallelism inference of the AdaptiveBatchScheduler, the source > vertex's parallelism can indeed exceed the job's global max parallelism.) > > As Lijie and Junrui pointed out, the key issue is "semantic consistency." > Currently, if a vertex has not set maxParallelism, the > AdaptiveBatchScheduler will use > `execution.batch.adaptive.auto-parallelism.max-parallelism` as the vertex's > maxParallelism. Since the current implementation does not distinguish > between source vertices and downstream vertices, source vertices are also > subject to this limitation. > > Therefore, I believe that if the issue of "semantic consistency" can be > well explained in the code and configuration documentation, the > AdaptiveBatchScheduler should support that the parallelism of source > vertices can exceed the job's global max parallelism. > > Best, > Xia > > Venkatakrishnan Sowrirajan 于2024年4月14日周日 10:31写道: > > > Let me state why I think "*jobmanager.adaptive-batch-sche* > > *duler.default-source-parallelism*" should not be bound by the " > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*". > > > >- Source vertex is unique and does not have any upstream vertices > >- Downstream vertices read shuffled data partitioned by key, which is > >not the case for the Source vertex > >- Limiting source parallelism by downstream vertices' max parallelism > is > >incorrect > > > > If we say for ""semantic consistency" the source vertex parallelism has > to > > be bound by the overall job's max parallelism, it can lead to following > > issues: > > > >- High filter selectivity with huge amounts of data to read - setting > >high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so that > >source parallelism can be set higher can lead to small blocks and > >sub-optimal performance. > >- Setting high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" > >requires careful tuning of network buffer configurations which is > >unnecessary in cases where it is not required just so that the source > >parallelism can be set high. > > > > Regards > > Venkata krishnan > > > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee wrote: > > > > > Hello Venkata krishnan, > > > > > > I think the term "semantic inconsistency" defined by > > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to > > maintaining a > > > uniform upper limit on parallelism across all vertices within a job. As > > the > > > source vertices are part of the global execution graph, they should > also > > > respect this rule to ensure consistent application of parallelism > > > constraints. > > > > > > Best, > > > Junrui > > > > > > Venkatakrishnan Sowrirajan 于2024年4月12日周五 02:10写道: > > > > > > > Gentle bump on this question. cc @Becket Qin > as > > > > well. > > > > > > > > Regards > > > > Venkata krishnan > > > > > > > > > > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan < > > > > vsowr...@asu.edu> wrote: > > > > > > > > > Thanks for the response Lijie and Junrui. Sorry for the late reply. > > Few > > > > > follow up questions. > > > > > > > > > > > Source can actually ignore this limit > > > > > because it has no upstream, but this will lead to semantic > > > inconsistency. > > > > > > > > > > Lijie, can you please elaborate on the above comment further? What > do > > > you > > > > > mean when you say it will lead to "semantic inconsistency"? > > > > > > > > > > > Secondly, we first need to limit the max parallelism of > > (downstream) > > > > > vertex, and then we can decide how many subpartitions (upstream > > vertex) > > > > > should produce. The limit should be effective, otherwise some > > > downstream > > > > > tasks will have no data to process. > > > > > > > > > > This makes sense in the context of any other vertices other than > the > > > > > source vertex. As you mentioned above ("Source can actually ignore > > this > > > > > limit because it has no upstream"), therefore I feel " > > > > > jobmanager.adaptive-ba
Re: Question around Flink's AdaptiveBatchScheduler
Hi Venkat, I agree that the parallelism of source vertex should not be upper bounded by the job's global max parallelism. The case you mentioned, >> High filter selectivity with huge amounts of data to read excellently supports this viewpoint. (In fact, in the current implementation, if the source parallelism is pre-specified at job create stage, rather than relying on the dynamic parallelism inference of the AdaptiveBatchScheduler, the source vertex's parallelism can indeed exceed the job's global max parallelism.) As Lijie and Junrui pointed out, the key issue is "semantic consistency." Currently, if a vertex has not set maxParallelism, the AdaptiveBatchScheduler will use `execution.batch.adaptive.auto-parallelism.max-parallelism` as the vertex's maxParallelism. Since the current implementation does not distinguish between source vertices and downstream vertices, source vertices are also subject to this limitation. Therefore, I believe that if the issue of "semantic consistency" can be well explained in the code and configuration documentation, the AdaptiveBatchScheduler should support that the parallelism of source vertices can exceed the job's global max parallelism. Best, Xia Venkatakrishnan Sowrirajan 于2024年4月14日周日 10:31写道: > Let me state why I think "*jobmanager.adaptive-batch-sche* > *duler.default-source-parallelism*" should not be bound by the " > *jobmanager.adaptive-batch-sche**duler.max-parallelism*". > >- Source vertex is unique and does not have any upstream vertices >- Downstream vertices read shuffled data partitioned by key, which is >not the case for the Source vertex >- Limiting source parallelism by downstream vertices' max parallelism is >incorrect > > If we say for ""semantic consistency" the source vertex parallelism has to > be bound by the overall job's max parallelism, it can lead to following > issues: > >- High filter selectivity with huge amounts of data to read - setting >high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so that >source parallelism can be set higher can lead to small blocks and >sub-optimal performance. >- Setting high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" >requires careful tuning of network buffer configurations which is >unnecessary in cases where it is not required just so that the source >parallelism can be set high. > > Regards > Venkata krishnan > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee wrote: > > > Hello Venkata krishnan, > > > > I think the term "semantic inconsistency" defined by > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to > maintaining a > > uniform upper limit on parallelism across all vertices within a job. As > the > > source vertices are part of the global execution graph, they should also > > respect this rule to ensure consistent application of parallelism > > constraints. > > > > Best, > > Junrui > > > > Venkatakrishnan Sowrirajan 于2024年4月12日周五 02:10写道: > > > > > Gentle bump on this question. cc @Becket Qin as > > > well. > > > > > > Regards > > > Venkata krishnan > > > > > > > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan < > > > vsowr...@asu.edu> wrote: > > > > > > > Thanks for the response Lijie and Junrui. Sorry for the late reply. > Few > > > > follow up questions. > > > > > > > > > Source can actually ignore this limit > > > > because it has no upstream, but this will lead to semantic > > inconsistency. > > > > > > > > Lijie, can you please elaborate on the above comment further? What do > > you > > > > mean when you say it will lead to "semantic inconsistency"? > > > > > > > > > Secondly, we first need to limit the max parallelism of > (downstream) > > > > vertex, and then we can decide how many subpartitions (upstream > vertex) > > > > should produce. The limit should be effective, otherwise some > > downstream > > > > tasks will have no data to process. > > > > > > > > This makes sense in the context of any other vertices other than the > > > > source vertex. As you mentioned above ("Source can actually ignore > this > > > > limit because it has no upstream"), therefore I feel " > > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism" need > > not > > > > be upper bounded by > > > "jobmanager.adaptive-batch-scheduler.max-parallelism". > > > > > > > > Regards > > > > Venkata krishnan > > > > > > > > > > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee > > wrote: > > > > > > > >> Hi Venkat, > > > >> > > > >> As Lijie mentioned, in Flink, the parallelism is required to be > less > > > than > > > >> or equal to the maximum parallelism. The config option > > > >> jobmanager.adaptive-batch-scheduler.max-parallelism and > > > >> jobmanager.adaptive-batch-scheduler.default-source-parallelism will > be > > > set > > > >> as the source's parallelism and max-parallelism, respectively. > > > Therefore, > > > >> the check failed situation you encountered is in line with the > > > >> expectations. >
Re: Question around Flink's AdaptiveBatchScheduler
Let me state why I think "*jobmanager.adaptive-batch-sche* *duler.default-source-parallelism*" should not be bound by the " *jobmanager.adaptive-batch-sche**duler.max-parallelism*". - Source vertex is unique and does not have any upstream vertices - Downstream vertices read shuffled data partitioned by key, which is not the case for the Source vertex - Limiting source parallelism by downstream vertices' max parallelism is incorrect If we say for ""semantic consistency" the source vertex parallelism has to be bound by the overall job's max parallelism, it can lead to following issues: - High filter selectivity with huge amounts of data to read - setting high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so that source parallelism can be set higher can lead to small blocks and sub-optimal performance. - Setting high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" requires careful tuning of network buffer configurations which is unnecessary in cases where it is not required just so that the source parallelism can be set high. Regards Venkata krishnan On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee wrote: > Hello Venkata krishnan, > > I think the term "semantic inconsistency" defined by > jobmanager.adaptive-batch-scheduler.max-parallelism refers to maintaining a > uniform upper limit on parallelism across all vertices within a job. As the > source vertices are part of the global execution graph, they should also > respect this rule to ensure consistent application of parallelism > constraints. > > Best, > Junrui > > Venkatakrishnan Sowrirajan 于2024年4月12日周五 02:10写道: > > > Gentle bump on this question. cc @Becket Qin as > > well. > > > > Regards > > Venkata krishnan > > > > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan < > > vsowr...@asu.edu> wrote: > > > > > Thanks for the response Lijie and Junrui. Sorry for the late reply. Few > > > follow up questions. > > > > > > > Source can actually ignore this limit > > > because it has no upstream, but this will lead to semantic > inconsistency. > > > > > > Lijie, can you please elaborate on the above comment further? What do > you > > > mean when you say it will lead to "semantic inconsistency"? > > > > > > > Secondly, we first need to limit the max parallelism of (downstream) > > > vertex, and then we can decide how many subpartitions (upstream vertex) > > > should produce. The limit should be effective, otherwise some > downstream > > > tasks will have no data to process. > > > > > > This makes sense in the context of any other vertices other than the > > > source vertex. As you mentioned above ("Source can actually ignore this > > > limit because it has no upstream"), therefore I feel " > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism" need > not > > > be upper bounded by > > "jobmanager.adaptive-batch-scheduler.max-parallelism". > > > > > > Regards > > > Venkata krishnan > > > > > > > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee > wrote: > > > > > >> Hi Venkat, > > >> > > >> As Lijie mentioned, in Flink, the parallelism is required to be less > > than > > >> or equal to the maximum parallelism. The config option > > >> jobmanager.adaptive-batch-scheduler.max-parallelism and > > >> jobmanager.adaptive-batch-scheduler.default-source-parallelism will be > > set > > >> as the source's parallelism and max-parallelism, respectively. > > Therefore, > > >> the check failed situation you encountered is in line with the > > >> expectations. > > >> > > >> Best, > > >> Junrui > > >> > > >> Lijie Wang 于2024年2月29日周四 17:35写道: > > >> > > >> > Hi Venkat, > > >> > > > >> > >> default-source-parallelism config should be independent from the > > >> > max-parallelism > > >> > > > >> > Actually, it's not. > > >> > > > >> > Firstly, it's obvious that the parallelism should be less than or > > equal > > >> to > > >> > the max parallelism(both literally and execution). The > > >> > "jobmanager.adaptive-batch-scheduler.max-parallelism" will be used > as > > >> the > > >> > max parallelism for a vertex if you don't set max parallelism for it > > >> > individually (Just like the source in your case). > > >> > > > >> > Secondly, we first need to limit the max parallelism of (downstream) > > >> > vertex, and then we can decide how many subpartitions (upstream > > vertex) > > >> > should produce. The limit should be effective, otherwise some > > downstream > > >> > tasks will have no data to process. Source can actually ignore this > > >> limit > > >> > because it has no upstream, but this will lead to semantic > > >> inconsistency. > > >> > > > >> > Best, > > >> > Lijie > > >> > > > >> > Venkatakrishnan Sowrirajan 于2024年2月29日周四 > 05:49写道: > > >> > > > >> > > Hi Flink devs, > > >> > > > > >> > > With Flink's AdaptiveBatchScheduler > > >> > > < > > >> > > > > >> > > > >> > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scalin
Re: Question around Flink's AdaptiveBatchScheduler
Hello Venkata krishnan, I think the term "semantic inconsistency" defined by jobmanager.adaptive-batch-scheduler.max-parallelism refers to maintaining a uniform upper limit on parallelism across all vertices within a job. As the source vertices are part of the global execution graph, they should also respect this rule to ensure consistent application of parallelism constraints. Best, Junrui Venkatakrishnan Sowrirajan 于2024年4月12日周五 02:10写道: > Gentle bump on this question. cc @Becket Qin as > well. > > Regards > Venkata krishnan > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan < > vsowr...@asu.edu> wrote: > > > Thanks for the response Lijie and Junrui. Sorry for the late reply. Few > > follow up questions. > > > > > Source can actually ignore this limit > > because it has no upstream, but this will lead to semantic inconsistency. > > > > Lijie, can you please elaborate on the above comment further? What do you > > mean when you say it will lead to "semantic inconsistency"? > > > > > Secondly, we first need to limit the max parallelism of (downstream) > > vertex, and then we can decide how many subpartitions (upstream vertex) > > should produce. The limit should be effective, otherwise some downstream > > tasks will have no data to process. > > > > This makes sense in the context of any other vertices other than the > > source vertex. As you mentioned above ("Source can actually ignore this > > limit because it has no upstream"), therefore I feel " > > jobmanager.adaptive-batch-scheduler.default-source-parallelism" need not > > be upper bounded by > "jobmanager.adaptive-batch-scheduler.max-parallelism". > > > > Regards > > Venkata krishnan > > > > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee wrote: > > > >> Hi Venkat, > >> > >> As Lijie mentioned, in Flink, the parallelism is required to be less > than > >> or equal to the maximum parallelism. The config option > >> jobmanager.adaptive-batch-scheduler.max-parallelism and > >> jobmanager.adaptive-batch-scheduler.default-source-parallelism will be > set > >> as the source's parallelism and max-parallelism, respectively. > Therefore, > >> the check failed situation you encountered is in line with the > >> expectations. > >> > >> Best, > >> Junrui > >> > >> Lijie Wang 于2024年2月29日周四 17:35写道: > >> > >> > Hi Venkat, > >> > > >> > >> default-source-parallelism config should be independent from the > >> > max-parallelism > >> > > >> > Actually, it's not. > >> > > >> > Firstly, it's obvious that the parallelism should be less than or > equal > >> to > >> > the max parallelism(both literally and execution). The > >> > "jobmanager.adaptive-batch-scheduler.max-parallelism" will be used as > >> the > >> > max parallelism for a vertex if you don't set max parallelism for it > >> > individually (Just like the source in your case). > >> > > >> > Secondly, we first need to limit the max parallelism of (downstream) > >> > vertex, and then we can decide how many subpartitions (upstream > vertex) > >> > should produce. The limit should be effective, otherwise some > downstream > >> > tasks will have no data to process. Source can actually ignore this > >> limit > >> > because it has no upstream, but this will lead to semantic > >> inconsistency. > >> > > >> > Best, > >> > Lijie > >> > > >> > Venkatakrishnan Sowrirajan 于2024年2月29日周四 05:49写道: > >> > > >> > > Hi Flink devs, > >> > > > >> > > With Flink's AdaptiveBatchScheduler > >> > > < > >> > > > >> > > >> > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$ > >> > > > > >> > > (Note: > >> > > this is different from AdaptiveScheduler > >> > > < > >> > > > >> > > >> > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$ > >> > > >), > >> > > the scheduler automatically determines the correct number of > >> downstream > >> > > tasks required to process the shuffle generated by the upstream > >> vertex. > >> > > > >> > > I have a question regarding the current behavior. There are 2 > configs > >> > which > >> > > are in interplay here. > >> > > 1. jobmanager.adaptive-batch-scheduler.default-source-parallelism > >> > > < > >> > > > >> > > >> > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$ > >> > > > > >> > > - The default parallelism of data source. > >> > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism > >> > > < > >> > > > >> > > >> > https://urldefense.com/v3/__https://n
Re: Question around Flink's AdaptiveBatchScheduler
Gentle bump on this question. cc @Becket Qin as well. Regards Venkata krishnan On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan < vsowr...@asu.edu> wrote: > Thanks for the response Lijie and Junrui. Sorry for the late reply. Few > follow up questions. > > > Source can actually ignore this limit > because it has no upstream, but this will lead to semantic inconsistency. > > Lijie, can you please elaborate on the above comment further? What do you > mean when you say it will lead to "semantic inconsistency"? > > > Secondly, we first need to limit the max parallelism of (downstream) > vertex, and then we can decide how many subpartitions (upstream vertex) > should produce. The limit should be effective, otherwise some downstream > tasks will have no data to process. > > This makes sense in the context of any other vertices other than the > source vertex. As you mentioned above ("Source can actually ignore this > limit because it has no upstream"), therefore I feel " > jobmanager.adaptive-batch-scheduler.default-source-parallelism" need not > be upper bounded by "jobmanager.adaptive-batch-scheduler.max-parallelism". > > Regards > Venkata krishnan > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee wrote: > >> Hi Venkat, >> >> As Lijie mentioned, in Flink, the parallelism is required to be less than >> or equal to the maximum parallelism. The config option >> jobmanager.adaptive-batch-scheduler.max-parallelism and >> jobmanager.adaptive-batch-scheduler.default-source-parallelism will be set >> as the source's parallelism and max-parallelism, respectively. Therefore, >> the check failed situation you encountered is in line with the >> expectations. >> >> Best, >> Junrui >> >> Lijie Wang 于2024年2月29日周四 17:35写道: >> >> > Hi Venkat, >> > >> > >> default-source-parallelism config should be independent from the >> > max-parallelism >> > >> > Actually, it's not. >> > >> > Firstly, it's obvious that the parallelism should be less than or equal >> to >> > the max parallelism(both literally and execution). The >> > "jobmanager.adaptive-batch-scheduler.max-parallelism" will be used as >> the >> > max parallelism for a vertex if you don't set max parallelism for it >> > individually (Just like the source in your case). >> > >> > Secondly, we first need to limit the max parallelism of (downstream) >> > vertex, and then we can decide how many subpartitions (upstream vertex) >> > should produce. The limit should be effective, otherwise some downstream >> > tasks will have no data to process. Source can actually ignore this >> limit >> > because it has no upstream, but this will lead to semantic >> inconsistency. >> > >> > Best, >> > Lijie >> > >> > Venkatakrishnan Sowrirajan 于2024年2月29日周四 05:49写道: >> > >> > > Hi Flink devs, >> > > >> > > With Flink's AdaptiveBatchScheduler >> > > < >> > > >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$ >> > > > >> > > (Note: >> > > this is different from AdaptiveScheduler >> > > < >> > > >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$ >> > > >), >> > > the scheduler automatically determines the correct number of >> downstream >> > > tasks required to process the shuffle generated by the upstream >> vertex. >> > > >> > > I have a question regarding the current behavior. There are 2 configs >> > which >> > > are in interplay here. >> > > 1. jobmanager.adaptive-batch-scheduler.default-source-parallelism >> > > < >> > > >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$ >> > > > >> > > - The default parallelism of data source. >> > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism >> > > < >> > > >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$ >> > > > >> > > - >> > > Upper bound of allowed parallelism to set adaptively. >> > > >> > > Currently, if " >> > > jobmanager.adaptive-batch-scheduler.default-source-parallelism >> > > < >> > > >> > >> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZH
Re: Question around Flink's AdaptiveBatchScheduler
Thanks for the response Lijie and Junrui. Sorry for the late reply. Few follow up questions. > Source can actually ignore this limit because it has no upstream, but this will lead to semantic inconsistency. Lijie, can you please elaborate on the above comment further? What do you mean when you say it will lead to "semantic inconsistency"? > Secondly, we first need to limit the max parallelism of (downstream) vertex, and then we can decide how many subpartitions (upstream vertex) should produce. The limit should be effective, otherwise some downstream tasks will have no data to process. This makes sense in the context of any other vertices other than the source vertex. As you mentioned above ("Source can actually ignore this limit because it has no upstream"), therefore I feel " jobmanager.adaptive-batch-scheduler.default-source-parallelism" need not be upper bounded by "jobmanager.adaptive-batch-scheduler.max-parallelism". Regards Venkata krishnan On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee wrote: > Hi Venkat, > > As Lijie mentioned, in Flink, the parallelism is required to be less than > or equal to the maximum parallelism. The config option > jobmanager.adaptive-batch-scheduler.max-parallelism and > jobmanager.adaptive-batch-scheduler.default-source-parallelism will be set > as the source's parallelism and max-parallelism, respectively. Therefore, > the check failed situation you encountered is in line with the > expectations. > > Best, > Junrui > > Lijie Wang 于2024年2月29日周四 17:35写道: > > > Hi Venkat, > > > > >> default-source-parallelism config should be independent from the > > max-parallelism > > > > Actually, it's not. > > > > Firstly, it's obvious that the parallelism should be less than or equal > to > > the max parallelism(both literally and execution). The > > "jobmanager.adaptive-batch-scheduler.max-parallelism" will be used as the > > max parallelism for a vertex if you don't set max parallelism for it > > individually (Just like the source in your case). > > > > Secondly, we first need to limit the max parallelism of (downstream) > > vertex, and then we can decide how many subpartitions (upstream vertex) > > should produce. The limit should be effective, otherwise some downstream > > tasks will have no data to process. Source can actually ignore this limit > > because it has no upstream, but this will lead to semantic inconsistency. > > > > Best, > > Lijie > > > > Venkatakrishnan Sowrirajan 于2024年2月29日周四 05:49写道: > > > > > Hi Flink devs, > > > > > > With Flink's AdaptiveBatchScheduler > > > < > > > > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$ > > > > > > > (Note: > > > this is different from AdaptiveScheduler > > > < > > > > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$ > > > >), > > > the scheduler automatically determines the correct number of downstream > > > tasks required to process the shuffle generated by the upstream vertex. > > > > > > I have a question regarding the current behavior. There are 2 configs > > which > > > are in interplay here. > > > 1. jobmanager.adaptive-batch-scheduler.default-source-parallelism > > > < > > > > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$ > > > > > > > - The default parallelism of data source. > > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism > > > < > > > > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$ > > > > > > > - > > > Upper bound of allowed parallelism to set adaptively. > > > > > > Currently, if " > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism > > > < > > > > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$ > > > >" > > > is greater than "jobmanager.adaptive-batch-scheduler.max-parallelism > > > < > > > > > > https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!f
Re: Question around Flink's AdaptiveBatchScheduler
Hi Venkat, As Lijie mentioned, in Flink, the parallelism is required to be less than or equal to the maximum parallelism. The config option jobmanager.adaptive-batch-scheduler.max-parallelism and jobmanager.adaptive-batch-scheduler.default-source-parallelism will be set as the source's parallelism and max-parallelism, respectively. Therefore, the check failed situation you encountered is in line with the expectations. Best, Junrui Lijie Wang 于2024年2月29日周四 17:35写道: > Hi Venkat, > > >> default-source-parallelism config should be independent from the > max-parallelism > > Actually, it's not. > > Firstly, it's obvious that the parallelism should be less than or equal to > the max parallelism(both literally and execution). The > "jobmanager.adaptive-batch-scheduler.max-parallelism" will be used as the > max parallelism for a vertex if you don't set max parallelism for it > individually (Just like the source in your case). > > Secondly, we first need to limit the max parallelism of (downstream) > vertex, and then we can decide how many subpartitions (upstream vertex) > should produce. The limit should be effective, otherwise some downstream > tasks will have no data to process. Source can actually ignore this limit > because it has no upstream, but this will lead to semantic inconsistency. > > Best, > Lijie > > Venkatakrishnan Sowrirajan 于2024年2月29日周四 05:49写道: > > > Hi Flink devs, > > > > With Flink's AdaptiveBatchScheduler > > < > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/#adaptive-batch-scheduler > > > > > (Note: > > this is different from AdaptiveScheduler > > < > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/#adaptive-scheduler > > >), > > the scheduler automatically determines the correct number of downstream > > tasks required to process the shuffle generated by the upstream vertex. > > > > I have a question regarding the current behavior. There are 2 configs > which > > are in interplay here. > > 1. jobmanager.adaptive-batch-scheduler.default-source-parallelism > > < > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism > > > > > - The default parallelism of data source. > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism > > < > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism > > > > > - > > Upper bound of allowed parallelism to set adaptively. > > > > Currently, if " > > jobmanager.adaptive-batch-scheduler.default-source-parallelism > > < > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism > > >" > > is greater than "jobmanager.adaptive-batch-scheduler.max-parallelism > > < > > > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism > > >", > > Flink application fails with the below message: > > > > "Vertex's parallelism should be smaller than or equal to vertex's max > > parallelism." > > > > This is the corresponding code in Flink's DefaultVertexParallelismInfo > > < > > > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java#L110 > > >. > > My question is, "default-source-parallelism" config should be independent > > from the "max-parallelism" flag. The former controls the default source > > parallelism while the latter controls the max number of partitions to > write > > the intermediate shuffle. > > > > If this is true, then the above check should be fixed. Otherwise, wanted > to > > understand why the "default-source-parallelism` should be less than the > > "max-parallelism" > > > > Thanks > > Venkat > > >
Re: Question around Flink's AdaptiveBatchScheduler
Hi Venkat, >> default-source-parallelism config should be independent from the max-parallelism Actually, it's not. Firstly, it's obvious that the parallelism should be less than or equal to the max parallelism(both literally and execution). The "jobmanager.adaptive-batch-scheduler.max-parallelism" will be used as the max parallelism for a vertex if you don't set max parallelism for it individually (Just like the source in your case). Secondly, we first need to limit the max parallelism of (downstream) vertex, and then we can decide how many subpartitions (upstream vertex) should produce. The limit should be effective, otherwise some downstream tasks will have no data to process. Source can actually ignore this limit because it has no upstream, but this will lead to semantic inconsistency. Best, Lijie Venkatakrishnan Sowrirajan 于2024年2月29日周四 05:49写道: > Hi Flink devs, > > With Flink's AdaptiveBatchScheduler > < > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/#adaptive-batch-scheduler > > > (Note: > this is different from AdaptiveScheduler > < > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/#adaptive-scheduler > >), > the scheduler automatically determines the correct number of downstream > tasks required to process the shuffle generated by the upstream vertex. > > I have a question regarding the current behavior. There are 2 configs which > are in interplay here. > 1. jobmanager.adaptive-batch-scheduler.default-source-parallelism > < > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism > > > - The default parallelism of data source. > 2. jobmanager.adaptive-batch-scheduler.max-parallelism > < > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism > > > - > Upper bound of allowed parallelism to set adaptively. > > Currently, if " > jobmanager.adaptive-batch-scheduler.default-source-parallelism > < > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism > >" > is greater than "jobmanager.adaptive-batch-scheduler.max-parallelism > < > https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism > >", > Flink application fails with the below message: > > "Vertex's parallelism should be smaller than or equal to vertex's max > parallelism." > > This is the corresponding code in Flink's DefaultVertexParallelismInfo > < > https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java#L110 > >. > My question is, "default-source-parallelism" config should be independent > from the "max-parallelism" flag. The former controls the default source > parallelism while the latter controls the max number of partitions to write > the intermediate shuffle. > > If this is true, then the above check should be fixed. Otherwise, wanted to > understand why the "default-source-parallelism` should be less than the > "max-parallelism" > > Thanks > Venkat >