Re: Question around Flink's AdaptiveBatchScheduler

2024-05-09 Thread Venkatakrishnan Sowrirajan
Xia,

Thanks for the reviews. Unfortunately due to work commitments I am little
delayed in addressing your review comments. Mostly will be done by end of
this week. Just a quick heads up.

Jinrui,

Thanks, that would be great.

On Mon, May 6, 2024, 12:45 AM Junrui Lee  wrote:

> Hi,
> Thanks for the reminder. I will review it soon during my free time.
>
> Venkatakrishnan Sowrirajan  于2024年5月4日周六 10:10写道:
>
> > Jinrui and Xia
> >
> > Gentle ping for reviews.
> >
> > On Mon, Apr 29, 2024, 8:28 PM Venkatakrishnan Sowrirajan <
> vsowr...@asu.edu
> > >
> > wrote:
> >
> > > Hi Xia and Jinrui,
> > >
> > > Filed
> https://urldefense.com/v3/__https://github.com/apache/flink/pull/24736__;!!IKRxdwAv5BmarQ!YHVAC2UWD7ITI-Xyk6Flu6WhuSWYsHTLCOtJkLUhtIyojo0OxQOfsQmoS7d9-q0OhcRzbZ0hjO19qbictUrQLQ$
> to address the above
> > > described issue. Please take a look whenever you can.
> > >
> > > Thanks
> > > Venkat
> > >
> > >
> > > On Thu, Apr 18, 2024 at 12:16 PM Venkatakrishnan Sowrirajan <
> > > vsowr...@asu.edu> wrote:
> > >
> > >> Filed
> https://urldefense.com/v3/__https://issues.apache.org/jira/browse/FLINK-35165__;!!IKRxdwAv5BmarQ!YHVAC2UWD7ITI-Xyk6Flu6WhuSWYsHTLCOtJkLUhtIyojo0OxQOfsQmoS7d9-q0OhcRzbZ0hjO19qbim4QLkBQ$
> to address the
> > >> above described issue. Will share the PR here once it is ready for
> > review.
> > >>
> > >> Regards
> > >> Venkata krishnan
> > >>
> > >>
> > >> On Wed, Apr 17, 2024 at 5:32 AM Junrui Lee 
> wrote:
> > >>
> > >>> Thanks Venkata and Xia for providing further clarification. I think
> > your
> > >>> example illustrates the significance of this proposal very well.
> Please
> > >>> feel free go ahead and address the concerns.
> > >>>
> > >>> Best,
> > >>> Junrui
> > >>>
> > >>> Venkatakrishnan Sowrirajan  于2024年4月16日周二 07:01写道:
> > >>>
> > >>> > Thanks for adding your thoughts to this discussion.
> > >>> >
> > >>> > If we all agree that the source vertex parallelism shouldn't be
> bound
> > >>> by
> > >>> > the downstream max parallelism
> > >>> > (jobmanager.adaptive-batch-scheduler.max-parallelism)
> > >>> > based on the rationale and the issues described above, I can take a
> > >>> stab at
> > >>> > addressing the issue.
> > >>> >
> > >>> > Let me file a ticket to track this issue. Otherwise, I'm looking
> > >>> forward to
> > >>> > hearing more thoughts from others as well, especially Lijie and
> > Junrui
> > >>> who
> > >>> > have more context on the AdaptiveBatchScheduler.
> > >>> >
> > >>> > Regards
> > >>> > Venkata krishnan
> > >>> >
> > >>> >
> > >>> > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun 
> > wrote:
> > >>> >
> > >>> > > Hi Venkat,
> > >>> > > I agree that the parallelism of source vertex should not be upper
> > >>> bounded
> > >>> > > by the job's global max parallelism. The case you mentioned, >>
> > High
> > >>> > filter
> > >>> > > selectivity with huge amounts of data to read  excellently
> supports
> > >>> this
> > >>> > > viewpoint. (In fact, in the current implementation, if the source
> > >>> > > parallelism is pre-specified at job create stage, rather than
> > >>> relying on
> > >>> > > the dynamic parallelism inference of the AdaptiveBatchScheduler,
> > the
> > >>> > source
> > >>> > > vertex's parallelism can indeed exceed the job's global max
> > >>> parallelism.)
> > >>> > >
> > >>> > > As Lijie and Junrui pointed out, the key issue is "semantic
> > >>> consistency."
> > >>> > > Currently, if a vertex has not set maxParallelism, the
> > >>> > > AdaptiveBatchScheduler will use
> > >>> > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as
> the
> > >>> > vertex's
> > >>> > > maxParallelism. Since the current implementation does not
> > distinguish
> > >>> > > between source vertices and downstream vertices, source vertices
> > are
> > >>> also
> > >>> > > subject to this limitation.
> > >>> > >
> > >>> > > Therefore, I believe that if the issue of "semantic consistency"
> > can
> > >>> be
> > >>> > > well explained in the code and configuration documentation, the
> > >>> > > AdaptiveBatchScheduler should support that the parallelism of
> > source
> > >>> > > vertices can exceed the job's global max parallelism.
> > >>> > >
> > >>> > > Best,
> > >>> > > Xia
> > >>> > >
> > >>> > > Venkatakrishnan Sowrirajan  于2024年4月14日周日
> > 10:31写道:
> > >>> > >
> > >>> > > > Let me state why I think "*jobmanager.adaptive-batch-sche*
> > >>> > > > *duler.default-source-parallelism*" should not be bound by the
> "
> > >>> > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*".
> > >>> > > >
> > >>> > > >- Source vertex is unique and does not have any upstream
> > >>> vertices
> > >>> > > >- Downstream vertices read shuffled data partitioned by key,
> > >>> which
> > >>> > is
> > >>> > > >not the case for the Source vertex
> > >>> > > >- Limiting source parallelism by downstream vertices' max
> > >>> > parallelism
> > >>> > > is
> > >>> > > >incorrect
> > >>> > > >
> > >>> > > > If we say for ""semantic consistency" the source vertex
> > 

Re: Question around Flink's AdaptiveBatchScheduler

2024-05-06 Thread Junrui Lee
Hi,
Thanks for the reminder. I will review it soon during my free time.

Venkatakrishnan Sowrirajan  于2024年5月4日周六 10:10写道:

> Jinrui and Xia
>
> Gentle ping for reviews.
>
> On Mon, Apr 29, 2024, 8:28 PM Venkatakrishnan Sowrirajan  >
> wrote:
>
> > Hi Xia and Jinrui,
> >
> > Filed https://github.com/apache/flink/pull/24736 to address the above
> > described issue. Please take a look whenever you can.
> >
> > Thanks
> > Venkat
> >
> >
> > On Thu, Apr 18, 2024 at 12:16 PM Venkatakrishnan Sowrirajan <
> > vsowr...@asu.edu> wrote:
> >
> >> Filed https://issues.apache.org/jira/browse/FLINK-35165 to address the
> >> above described issue. Will share the PR here once it is ready for
> review.
> >>
> >> Regards
> >> Venkata krishnan
> >>
> >>
> >> On Wed, Apr 17, 2024 at 5:32 AM Junrui Lee  wrote:
> >>
> >>> Thanks Venkata and Xia for providing further clarification. I think
> your
> >>> example illustrates the significance of this proposal very well. Please
> >>> feel free go ahead and address the concerns.
> >>>
> >>> Best,
> >>> Junrui
> >>>
> >>> Venkatakrishnan Sowrirajan  于2024年4月16日周二 07:01写道:
> >>>
> >>> > Thanks for adding your thoughts to this discussion.
> >>> >
> >>> > If we all agree that the source vertex parallelism shouldn't be bound
> >>> by
> >>> > the downstream max parallelism
> >>> > (jobmanager.adaptive-batch-scheduler.max-parallelism)
> >>> > based on the rationale and the issues described above, I can take a
> >>> stab at
> >>> > addressing the issue.
> >>> >
> >>> > Let me file a ticket to track this issue. Otherwise, I'm looking
> >>> forward to
> >>> > hearing more thoughts from others as well, especially Lijie and
> Junrui
> >>> who
> >>> > have more context on the AdaptiveBatchScheduler.
> >>> >
> >>> > Regards
> >>> > Venkata krishnan
> >>> >
> >>> >
> >>> > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun 
> wrote:
> >>> >
> >>> > > Hi Venkat,
> >>> > > I agree that the parallelism of source vertex should not be upper
> >>> bounded
> >>> > > by the job's global max parallelism. The case you mentioned, >>
> High
> >>> > filter
> >>> > > selectivity with huge amounts of data to read  excellently supports
> >>> this
> >>> > > viewpoint. (In fact, in the current implementation, if the source
> >>> > > parallelism is pre-specified at job create stage, rather than
> >>> relying on
> >>> > > the dynamic parallelism inference of the AdaptiveBatchScheduler,
> the
> >>> > source
> >>> > > vertex's parallelism can indeed exceed the job's global max
> >>> parallelism.)
> >>> > >
> >>> > > As Lijie and Junrui pointed out, the key issue is "semantic
> >>> consistency."
> >>> > > Currently, if a vertex has not set maxParallelism, the
> >>> > > AdaptiveBatchScheduler will use
> >>> > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as the
> >>> > vertex's
> >>> > > maxParallelism. Since the current implementation does not
> distinguish
> >>> > > between source vertices and downstream vertices, source vertices
> are
> >>> also
> >>> > > subject to this limitation.
> >>> > >
> >>> > > Therefore, I believe that if the issue of "semantic consistency"
> can
> >>> be
> >>> > > well explained in the code and configuration documentation, the
> >>> > > AdaptiveBatchScheduler should support that the parallelism of
> source
> >>> > > vertices can exceed the job's global max parallelism.
> >>> > >
> >>> > > Best,
> >>> > > Xia
> >>> > >
> >>> > > Venkatakrishnan Sowrirajan  于2024年4月14日周日
> 10:31写道:
> >>> > >
> >>> > > > Let me state why I think "*jobmanager.adaptive-batch-sche*
> >>> > > > *duler.default-source-parallelism*" should not be bound by the "
> >>> > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*".
> >>> > > >
> >>> > > >- Source vertex is unique and does not have any upstream
> >>> vertices
> >>> > > >- Downstream vertices read shuffled data partitioned by key,
> >>> which
> >>> > is
> >>> > > >not the case for the Source vertex
> >>> > > >- Limiting source parallelism by downstream vertices' max
> >>> > parallelism
> >>> > > is
> >>> > > >incorrect
> >>> > > >
> >>> > > > If we say for ""semantic consistency" the source vertex
> >>> parallelism has
> >>> > > to
> >>> > > > be bound by the overall job's max parallelism, it can lead to
> >>> following
> >>> > > > issues:
> >>> > > >
> >>> > > >- High filter selectivity with huge amounts of data to read -
> >>> > setting
> >>> > > >high "*jobmanager.adaptive-batch-scheduler.max-parallelism*"
> so
> >>> that
> >>> > > >source parallelism can be set higher can lead to small blocks
> >>> and
> >>> > > >sub-optimal performance.
> >>> > > >- Setting high
> >>> > "*jobmanager.adaptive-batch-scheduler.max-parallelism*"
> >>> > > >requires careful tuning of network buffer configurations which
> >>> is
> >>> > > >unnecessary in cases where it is not required just so that the
> >>> > source
> >>> > > >parallelism can be set high.
> >>> > > >
> >>> > > > Regards
> >>> > > > Venkata krishnan
> 

Re: Question around Flink's AdaptiveBatchScheduler

2024-05-03 Thread Venkatakrishnan Sowrirajan
Jinrui and Xia

Gentle ping for reviews.

On Mon, Apr 29, 2024, 8:28 PM Venkatakrishnan Sowrirajan 
wrote:

> Hi Xia and Jinrui,
>
> Filed https://github.com/apache/flink/pull/24736 to address the above
> described issue. Please take a look whenever you can.
>
> Thanks
> Venkat
>
>
> On Thu, Apr 18, 2024 at 12:16 PM Venkatakrishnan Sowrirajan <
> vsowr...@asu.edu> wrote:
>
>> Filed https://issues.apache.org/jira/browse/FLINK-35165 to address the
>> above described issue. Will share the PR here once it is ready for review.
>>
>> Regards
>> Venkata krishnan
>>
>>
>> On Wed, Apr 17, 2024 at 5:32 AM Junrui Lee  wrote:
>>
>>> Thanks Venkata and Xia for providing further clarification. I think your
>>> example illustrates the significance of this proposal very well. Please
>>> feel free go ahead and address the concerns.
>>>
>>> Best,
>>> Junrui
>>>
>>> Venkatakrishnan Sowrirajan  于2024年4月16日周二 07:01写道:
>>>
>>> > Thanks for adding your thoughts to this discussion.
>>> >
>>> > If we all agree that the source vertex parallelism shouldn't be bound
>>> by
>>> > the downstream max parallelism
>>> > (jobmanager.adaptive-batch-scheduler.max-parallelism)
>>> > based on the rationale and the issues described above, I can take a
>>> stab at
>>> > addressing the issue.
>>> >
>>> > Let me file a ticket to track this issue. Otherwise, I'm looking
>>> forward to
>>> > hearing more thoughts from others as well, especially Lijie and Junrui
>>> who
>>> > have more context on the AdaptiveBatchScheduler.
>>> >
>>> > Regards
>>> > Venkata krishnan
>>> >
>>> >
>>> > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun  wrote:
>>> >
>>> > > Hi Venkat,
>>> > > I agree that the parallelism of source vertex should not be upper
>>> bounded
>>> > > by the job's global max parallelism. The case you mentioned, >> High
>>> > filter
>>> > > selectivity with huge amounts of data to read  excellently supports
>>> this
>>> > > viewpoint. (In fact, in the current implementation, if the source
>>> > > parallelism is pre-specified at job create stage, rather than
>>> relying on
>>> > > the dynamic parallelism inference of the AdaptiveBatchScheduler, the
>>> > source
>>> > > vertex's parallelism can indeed exceed the job's global max
>>> parallelism.)
>>> > >
>>> > > As Lijie and Junrui pointed out, the key issue is "semantic
>>> consistency."
>>> > > Currently, if a vertex has not set maxParallelism, the
>>> > > AdaptiveBatchScheduler will use
>>> > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as the
>>> > vertex's
>>> > > maxParallelism. Since the current implementation does not distinguish
>>> > > between source vertices and downstream vertices, source vertices are
>>> also
>>> > > subject to this limitation.
>>> > >
>>> > > Therefore, I believe that if the issue of "semantic consistency" can
>>> be
>>> > > well explained in the code and configuration documentation, the
>>> > > AdaptiveBatchScheduler should support that the parallelism of source
>>> > > vertices can exceed the job's global max parallelism.
>>> > >
>>> > > Best,
>>> > > Xia
>>> > >
>>> > > Venkatakrishnan Sowrirajan  于2024年4月14日周日 10:31写道:
>>> > >
>>> > > > Let me state why I think "*jobmanager.adaptive-batch-sche*
>>> > > > *duler.default-source-parallelism*" should not be bound by the "
>>> > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*".
>>> > > >
>>> > > >- Source vertex is unique and does not have any upstream
>>> vertices
>>> > > >- Downstream vertices read shuffled data partitioned by key,
>>> which
>>> > is
>>> > > >not the case for the Source vertex
>>> > > >- Limiting source parallelism by downstream vertices' max
>>> > parallelism
>>> > > is
>>> > > >incorrect
>>> > > >
>>> > > > If we say for ""semantic consistency" the source vertex
>>> parallelism has
>>> > > to
>>> > > > be bound by the overall job's max parallelism, it can lead to
>>> following
>>> > > > issues:
>>> > > >
>>> > > >- High filter selectivity with huge amounts of data to read -
>>> > setting
>>> > > >high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so
>>> that
>>> > > >source parallelism can be set higher can lead to small blocks
>>> and
>>> > > >sub-optimal performance.
>>> > > >- Setting high
>>> > "*jobmanager.adaptive-batch-scheduler.max-parallelism*"
>>> > > >requires careful tuning of network buffer configurations which
>>> is
>>> > > >unnecessary in cases where it is not required just so that the
>>> > source
>>> > > >parallelism can be set high.
>>> > > >
>>> > > > Regards
>>> > > > Venkata krishnan
>>> > > >
>>> > > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee 
>>> > wrote:
>>> > > >
>>> > > > > Hello Venkata krishnan,
>>> > > > >
>>> > > > > I think the term "semantic inconsistency" defined by
>>> > > > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to
>>> > > > maintaining a
>>> > > > > uniform upper limit on parallelism across all vertices within a
>>> job.
>>> > As
>>> > > > the
>>> > > > > 

Re: Question around Flink's AdaptiveBatchScheduler

2024-04-29 Thread Venkatakrishnan Sowrirajan
Hi Xia and Jinrui,

Filed https://github.com/apache/flink/pull/24736 to address the above
described issue. Please take a look whenever you can.

Thanks
Venkat


On Thu, Apr 18, 2024 at 12:16 PM Venkatakrishnan Sowrirajan <
vsowr...@asu.edu> wrote:

> Filed https://issues.apache.org/jira/browse/FLINK-35165 to address the
> above described issue. Will share the PR here once it is ready for review.
>
> Regards
> Venkata krishnan
>
>
> On Wed, Apr 17, 2024 at 5:32 AM Junrui Lee  wrote:
>
>> Thanks Venkata and Xia for providing further clarification. I think your
>> example illustrates the significance of this proposal very well. Please
>> feel free go ahead and address the concerns.
>>
>> Best,
>> Junrui
>>
>> Venkatakrishnan Sowrirajan  于2024年4月16日周二 07:01写道:
>>
>> > Thanks for adding your thoughts to this discussion.
>> >
>> > If we all agree that the source vertex parallelism shouldn't be bound by
>> > the downstream max parallelism
>> > (jobmanager.adaptive-batch-scheduler.max-parallelism)
>> > based on the rationale and the issues described above, I can take a
>> stab at
>> > addressing the issue.
>> >
>> > Let me file a ticket to track this issue. Otherwise, I'm looking
>> forward to
>> > hearing more thoughts from others as well, especially Lijie and Junrui
>> who
>> > have more context on the AdaptiveBatchScheduler.
>> >
>> > Regards
>> > Venkata krishnan
>> >
>> >
>> > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun  wrote:
>> >
>> > > Hi Venkat,
>> > > I agree that the parallelism of source vertex should not be upper
>> bounded
>> > > by the job's global max parallelism. The case you mentioned, >> High
>> > filter
>> > > selectivity with huge amounts of data to read  excellently supports
>> this
>> > > viewpoint. (In fact, in the current implementation, if the source
>> > > parallelism is pre-specified at job create stage, rather than relying
>> on
>> > > the dynamic parallelism inference of the AdaptiveBatchScheduler, the
>> > source
>> > > vertex's parallelism can indeed exceed the job's global max
>> parallelism.)
>> > >
>> > > As Lijie and Junrui pointed out, the key issue is "semantic
>> consistency."
>> > > Currently, if a vertex has not set maxParallelism, the
>> > > AdaptiveBatchScheduler will use
>> > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as the
>> > vertex's
>> > > maxParallelism. Since the current implementation does not distinguish
>> > > between source vertices and downstream vertices, source vertices are
>> also
>> > > subject to this limitation.
>> > >
>> > > Therefore, I believe that if the issue of "semantic consistency" can
>> be
>> > > well explained in the code and configuration documentation, the
>> > > AdaptiveBatchScheduler should support that the parallelism of source
>> > > vertices can exceed the job's global max parallelism.
>> > >
>> > > Best,
>> > > Xia
>> > >
>> > > Venkatakrishnan Sowrirajan  于2024年4月14日周日 10:31写道:
>> > >
>> > > > Let me state why I think "*jobmanager.adaptive-batch-sche*
>> > > > *duler.default-source-parallelism*" should not be bound by the "
>> > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*".
>> > > >
>> > > >- Source vertex is unique and does not have any upstream vertices
>> > > >- Downstream vertices read shuffled data partitioned by key,
>> which
>> > is
>> > > >not the case for the Source vertex
>> > > >- Limiting source parallelism by downstream vertices' max
>> > parallelism
>> > > is
>> > > >incorrect
>> > > >
>> > > > If we say for ""semantic consistency" the source vertex parallelism
>> has
>> > > to
>> > > > be bound by the overall job's max parallelism, it can lead to
>> following
>> > > > issues:
>> > > >
>> > > >- High filter selectivity with huge amounts of data to read -
>> > setting
>> > > >high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so
>> that
>> > > >source parallelism can be set higher can lead to small blocks and
>> > > >sub-optimal performance.
>> > > >- Setting high
>> > "*jobmanager.adaptive-batch-scheduler.max-parallelism*"
>> > > >requires careful tuning of network buffer configurations which is
>> > > >unnecessary in cases where it is not required just so that the
>> > source
>> > > >parallelism can be set high.
>> > > >
>> > > > Regards
>> > > > Venkata krishnan
>> > > >
>> > > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee 
>> > wrote:
>> > > >
>> > > > > Hello Venkata krishnan,
>> > > > >
>> > > > > I think the term "semantic inconsistency" defined by
>> > > > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to
>> > > > maintaining a
>> > > > > uniform upper limit on parallelism across all vertices within a
>> job.
>> > As
>> > > > the
>> > > > > source vertices are part of the global execution graph, they
>> should
>> > > also
>> > > > > respect this rule to ensure consistent application of parallelism
>> > > > > constraints.
>> > > > >
>> > > > > Best,
>> > > > > Junrui
>> > > > >
>> > > > > Venkatakrishnan 

Re: Question around Flink's AdaptiveBatchScheduler

2024-04-18 Thread Venkatakrishnan Sowrirajan
Filed https://issues.apache.org/jira/browse/FLINK-35165 to address the
above described issue. Will share the PR here once it is ready for review.

Regards
Venkata krishnan


On Wed, Apr 17, 2024 at 5:32 AM Junrui Lee  wrote:

> Thanks Venkata and Xia for providing further clarification. I think your
> example illustrates the significance of this proposal very well. Please
> feel free go ahead and address the concerns.
>
> Best,
> Junrui
>
> Venkatakrishnan Sowrirajan  于2024年4月16日周二 07:01写道:
>
> > Thanks for adding your thoughts to this discussion.
> >
> > If we all agree that the source vertex parallelism shouldn't be bound by
> > the downstream max parallelism
> > (jobmanager.adaptive-batch-scheduler.max-parallelism)
> > based on the rationale and the issues described above, I can take a stab
> at
> > addressing the issue.
> >
> > Let me file a ticket to track this issue. Otherwise, I'm looking forward
> to
> > hearing more thoughts from others as well, especially Lijie and Junrui
> who
> > have more context on the AdaptiveBatchScheduler.
> >
> > Regards
> > Venkata krishnan
> >
> >
> > On Mon, Apr 15, 2024 at 12:54 AM Xia Sun  wrote:
> >
> > > Hi Venkat,
> > > I agree that the parallelism of source vertex should not be upper
> bounded
> > > by the job's global max parallelism. The case you mentioned, >> High
> > filter
> > > selectivity with huge amounts of data to read  excellently supports
> this
> > > viewpoint. (In fact, in the current implementation, if the source
> > > parallelism is pre-specified at job create stage, rather than relying
> on
> > > the dynamic parallelism inference of the AdaptiveBatchScheduler, the
> > source
> > > vertex's parallelism can indeed exceed the job's global max
> parallelism.)
> > >
> > > As Lijie and Junrui pointed out, the key issue is "semantic
> consistency."
> > > Currently, if a vertex has not set maxParallelism, the
> > > AdaptiveBatchScheduler will use
> > > `execution.batch.adaptive.auto-parallelism.max-parallelism` as the
> > vertex's
> > > maxParallelism. Since the current implementation does not distinguish
> > > between source vertices and downstream vertices, source vertices are
> also
> > > subject to this limitation.
> > >
> > > Therefore, I believe that if the issue of "semantic consistency" can be
> > > well explained in the code and configuration documentation, the
> > > AdaptiveBatchScheduler should support that the parallelism of source
> > > vertices can exceed the job's global max parallelism.
> > >
> > > Best,
> > > Xia
> > >
> > > Venkatakrishnan Sowrirajan  于2024年4月14日周日 10:31写道:
> > >
> > > > Let me state why I think "*jobmanager.adaptive-batch-sche*
> > > > *duler.default-source-parallelism*" should not be bound by the "
> > > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*".
> > > >
> > > >- Source vertex is unique and does not have any upstream vertices
> > > >- Downstream vertices read shuffled data partitioned by key, which
> > is
> > > >not the case for the Source vertex
> > > >- Limiting source parallelism by downstream vertices' max
> > parallelism
> > > is
> > > >incorrect
> > > >
> > > > If we say for ""semantic consistency" the source vertex parallelism
> has
> > > to
> > > > be bound by the overall job's max parallelism, it can lead to
> following
> > > > issues:
> > > >
> > > >- High filter selectivity with huge amounts of data to read -
> > setting
> > > >high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so
> that
> > > >source parallelism can be set higher can lead to small blocks and
> > > >sub-optimal performance.
> > > >- Setting high
> > "*jobmanager.adaptive-batch-scheduler.max-parallelism*"
> > > >requires careful tuning of network buffer configurations which is
> > > >unnecessary in cases where it is not required just so that the
> > source
> > > >parallelism can be set high.
> > > >
> > > > Regards
> > > > Venkata krishnan
> > > >
> > > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee 
> > wrote:
> > > >
> > > > > Hello Venkata krishnan,
> > > > >
> > > > > I think the term "semantic inconsistency" defined by
> > > > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to
> > > > maintaining a
> > > > > uniform upper limit on parallelism across all vertices within a
> job.
> > As
> > > > the
> > > > > source vertices are part of the global execution graph, they should
> > > also
> > > > > respect this rule to ensure consistent application of parallelism
> > > > > constraints.
> > > > >
> > > > > Best,
> > > > > Junrui
> > > > >
> > > > > Venkatakrishnan Sowrirajan  于2024年4月12日周五
> 02:10写道:
> > > > >
> > > > > > Gentle bump on this question. cc @Becket Qin <
> becket@gmail.com
> > >
> > > as
> > > > > > well.
> > > > > >
> > > > > > Regards
> > > > > > Venkata krishnan
> > > > > >
> > > > > >
> > > > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan <
> > > > > > vsowr...@asu.edu> wrote:
> > > > > >
> > > > > > > Thanks for 

Re: Question around Flink's AdaptiveBatchScheduler

2024-04-17 Thread Junrui Lee
Thanks Venkata and Xia for providing further clarification. I think your
example illustrates the significance of this proposal very well. Please
feel free go ahead and address the concerns.

Best,
Junrui

Venkatakrishnan Sowrirajan  于2024年4月16日周二 07:01写道:

> Thanks for adding your thoughts to this discussion.
>
> If we all agree that the source vertex parallelism shouldn't be bound by
> the downstream max parallelism
> (jobmanager.adaptive-batch-scheduler.max-parallelism)
> based on the rationale and the issues described above, I can take a stab at
> addressing the issue.
>
> Let me file a ticket to track this issue. Otherwise, I'm looking forward to
> hearing more thoughts from others as well, especially Lijie and Junrui who
> have more context on the AdaptiveBatchScheduler.
>
> Regards
> Venkata krishnan
>
>
> On Mon, Apr 15, 2024 at 12:54 AM Xia Sun  wrote:
>
> > Hi Venkat,
> > I agree that the parallelism of source vertex should not be upper bounded
> > by the job's global max parallelism. The case you mentioned, >> High
> filter
> > selectivity with huge amounts of data to read  excellently supports this
> > viewpoint. (In fact, in the current implementation, if the source
> > parallelism is pre-specified at job create stage, rather than relying on
> > the dynamic parallelism inference of the AdaptiveBatchScheduler, the
> source
> > vertex's parallelism can indeed exceed the job's global max parallelism.)
> >
> > As Lijie and Junrui pointed out, the key issue is "semantic consistency."
> > Currently, if a vertex has not set maxParallelism, the
> > AdaptiveBatchScheduler will use
> > `execution.batch.adaptive.auto-parallelism.max-parallelism` as the
> vertex's
> > maxParallelism. Since the current implementation does not distinguish
> > between source vertices and downstream vertices, source vertices are also
> > subject to this limitation.
> >
> > Therefore, I believe that if the issue of "semantic consistency" can be
> > well explained in the code and configuration documentation, the
> > AdaptiveBatchScheduler should support that the parallelism of source
> > vertices can exceed the job's global max parallelism.
> >
> > Best,
> > Xia
> >
> > Venkatakrishnan Sowrirajan  于2024年4月14日周日 10:31写道:
> >
> > > Let me state why I think "*jobmanager.adaptive-batch-sche*
> > > *duler.default-source-parallelism*" should not be bound by the "
> > > *jobmanager.adaptive-batch-sche**duler.max-parallelism*".
> > >
> > >- Source vertex is unique and does not have any upstream vertices
> > >- Downstream vertices read shuffled data partitioned by key, which
> is
> > >not the case for the Source vertex
> > >- Limiting source parallelism by downstream vertices' max
> parallelism
> > is
> > >incorrect
> > >
> > > If we say for ""semantic consistency" the source vertex parallelism has
> > to
> > > be bound by the overall job's max parallelism, it can lead to following
> > > issues:
> > >
> > >- High filter selectivity with huge amounts of data to read -
> setting
> > >high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so that
> > >source parallelism can be set higher can lead to small blocks and
> > >sub-optimal performance.
> > >- Setting high
> "*jobmanager.adaptive-batch-scheduler.max-parallelism*"
> > >requires careful tuning of network buffer configurations which is
> > >unnecessary in cases where it is not required just so that the
> source
> > >parallelism can be set high.
> > >
> > > Regards
> > > Venkata krishnan
> > >
> > > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee 
> wrote:
> > >
> > > > Hello Venkata krishnan,
> > > >
> > > > I think the term "semantic inconsistency" defined by
> > > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to
> > > maintaining a
> > > > uniform upper limit on parallelism across all vertices within a job.
> As
> > > the
> > > > source vertices are part of the global execution graph, they should
> > also
> > > > respect this rule to ensure consistent application of parallelism
> > > > constraints.
> > > >
> > > > Best,
> > > > Junrui
> > > >
> > > > Venkatakrishnan Sowrirajan  于2024年4月12日周五 02:10写道:
> > > >
> > > > > Gentle bump on this question. cc @Becket Qin  >
> > as
> > > > > well.
> > > > >
> > > > > Regards
> > > > > Venkata krishnan
> > > > >
> > > > >
> > > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan <
> > > > > vsowr...@asu.edu> wrote:
> > > > >
> > > > > > Thanks for the response Lijie and Junrui. Sorry for the late
> reply.
> > > Few
> > > > > > follow up questions.
> > > > > >
> > > > > > > Source can actually ignore this limit
> > > > > > because it has no upstream, but this will lead to semantic
> > > > inconsistency.
> > > > > >
> > > > > > Lijie, can you please elaborate on the above comment further?
> What
> > do
> > > > you
> > > > > > mean when you say it will lead to "semantic inconsistency"?
> > > > > >
> > > > > > > Secondly, we first need to limit the max parallelism of

Re: Question around Flink's AdaptiveBatchScheduler

2024-04-15 Thread Venkatakrishnan Sowrirajan
Thanks for adding your thoughts to this discussion.

If we all agree that the source vertex parallelism shouldn't be bound by
the downstream max parallelism
(jobmanager.adaptive-batch-scheduler.max-parallelism)
based on the rationale and the issues described above, I can take a stab at
addressing the issue.

Let me file a ticket to track this issue. Otherwise, I'm looking forward to
hearing more thoughts from others as well, especially Lijie and Junrui who
have more context on the AdaptiveBatchScheduler.

Regards
Venkata krishnan


On Mon, Apr 15, 2024 at 12:54 AM Xia Sun  wrote:

> Hi Venkat,
> I agree that the parallelism of source vertex should not be upper bounded
> by the job's global max parallelism. The case you mentioned, >> High filter
> selectivity with huge amounts of data to read  excellently supports this
> viewpoint. (In fact, in the current implementation, if the source
> parallelism is pre-specified at job create stage, rather than relying on
> the dynamic parallelism inference of the AdaptiveBatchScheduler, the source
> vertex's parallelism can indeed exceed the job's global max parallelism.)
>
> As Lijie and Junrui pointed out, the key issue is "semantic consistency."
> Currently, if a vertex has not set maxParallelism, the
> AdaptiveBatchScheduler will use
> `execution.batch.adaptive.auto-parallelism.max-parallelism` as the vertex's
> maxParallelism. Since the current implementation does not distinguish
> between source vertices and downstream vertices, source vertices are also
> subject to this limitation.
>
> Therefore, I believe that if the issue of "semantic consistency" can be
> well explained in the code and configuration documentation, the
> AdaptiveBatchScheduler should support that the parallelism of source
> vertices can exceed the job's global max parallelism.
>
> Best,
> Xia
>
> Venkatakrishnan Sowrirajan  于2024年4月14日周日 10:31写道:
>
> > Let me state why I think "*jobmanager.adaptive-batch-sche*
> > *duler.default-source-parallelism*" should not be bound by the "
> > *jobmanager.adaptive-batch-sche**duler.max-parallelism*".
> >
> >- Source vertex is unique and does not have any upstream vertices
> >- Downstream vertices read shuffled data partitioned by key, which is
> >not the case for the Source vertex
> >- Limiting source parallelism by downstream vertices' max parallelism
> is
> >incorrect
> >
> > If we say for ""semantic consistency" the source vertex parallelism has
> to
> > be bound by the overall job's max parallelism, it can lead to following
> > issues:
> >
> >- High filter selectivity with huge amounts of data to read - setting
> >high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so that
> >source parallelism can be set higher can lead to small blocks and
> >sub-optimal performance.
> >- Setting high "*jobmanager.adaptive-batch-scheduler.max-parallelism*"
> >requires careful tuning of network buffer configurations which is
> >unnecessary in cases where it is not required just so that the source
> >parallelism can be set high.
> >
> > Regards
> > Venkata krishnan
> >
> > On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee  wrote:
> >
> > > Hello Venkata krishnan,
> > >
> > > I think the term "semantic inconsistency" defined by
> > > jobmanager.adaptive-batch-scheduler.max-parallelism refers to
> > maintaining a
> > > uniform upper limit on parallelism across all vertices within a job. As
> > the
> > > source vertices are part of the global execution graph, they should
> also
> > > respect this rule to ensure consistent application of parallelism
> > > constraints.
> > >
> > > Best,
> > > Junrui
> > >
> > > Venkatakrishnan Sowrirajan  于2024年4月12日周五 02:10写道:
> > >
> > > > Gentle bump on this question. cc @Becket Qin 
> as
> > > > well.
> > > >
> > > > Regards
> > > > Venkata krishnan
> > > >
> > > >
> > > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan <
> > > > vsowr...@asu.edu> wrote:
> > > >
> > > > > Thanks for the response Lijie and Junrui. Sorry for the late reply.
> > Few
> > > > > follow up questions.
> > > > >
> > > > > > Source can actually ignore this limit
> > > > > because it has no upstream, but this will lead to semantic
> > > inconsistency.
> > > > >
> > > > > Lijie, can you please elaborate on the above comment further? What
> do
> > > you
> > > > > mean when you say it will lead to "semantic inconsistency"?
> > > > >
> > > > > > Secondly, we first need to limit the max parallelism of
> > (downstream)
> > > > > vertex, and then we can decide how many subpartitions (upstream
> > vertex)
> > > > > should produce. The limit should be effective, otherwise some
> > > downstream
> > > > > tasks will have no data to process.
> > > > >
> > > > > This makes sense in the context of any other vertices other than
> the
> > > > > source vertex. As you mentioned above ("Source can actually ignore
> > this
> > > > > limit because it has no upstream"), therefore I feel "
> > > > > 

Re: Question around Flink's AdaptiveBatchScheduler

2024-04-15 Thread Xia Sun
Hi Venkat,
I agree that the parallelism of source vertex should not be upper bounded
by the job's global max parallelism. The case you mentioned, >> High filter
selectivity with huge amounts of data to read  excellently supports this
viewpoint. (In fact, in the current implementation, if the source
parallelism is pre-specified at job create stage, rather than relying on
the dynamic parallelism inference of the AdaptiveBatchScheduler, the source
vertex's parallelism can indeed exceed the job's global max parallelism.)

As Lijie and Junrui pointed out, the key issue is "semantic consistency."
Currently, if a vertex has not set maxParallelism, the
AdaptiveBatchScheduler will use
`execution.batch.adaptive.auto-parallelism.max-parallelism` as the vertex's
maxParallelism. Since the current implementation does not distinguish
between source vertices and downstream vertices, source vertices are also
subject to this limitation.

Therefore, I believe that if the issue of "semantic consistency" can be
well explained in the code and configuration documentation, the
AdaptiveBatchScheduler should support that the parallelism of source
vertices can exceed the job's global max parallelism.

Best,
Xia

Venkatakrishnan Sowrirajan  于2024年4月14日周日 10:31写道:

> Let me state why I think "*jobmanager.adaptive-batch-sche*
> *duler.default-source-parallelism*" should not be bound by the "
> *jobmanager.adaptive-batch-sche**duler.max-parallelism*".
>
>- Source vertex is unique and does not have any upstream vertices
>- Downstream vertices read shuffled data partitioned by key, which is
>not the case for the Source vertex
>- Limiting source parallelism by downstream vertices' max parallelism is
>incorrect
>
> If we say for ""semantic consistency" the source vertex parallelism has to
> be bound by the overall job's max parallelism, it can lead to following
> issues:
>
>- High filter selectivity with huge amounts of data to read - setting
>high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so that
>source parallelism can be set higher can lead to small blocks and
>sub-optimal performance.
>- Setting high "*jobmanager.adaptive-batch-scheduler.max-parallelism*"
>requires careful tuning of network buffer configurations which is
>unnecessary in cases where it is not required just so that the source
>parallelism can be set high.
>
> Regards
> Venkata krishnan
>
> On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee  wrote:
>
> > Hello Venkata krishnan,
> >
> > I think the term "semantic inconsistency" defined by
> > jobmanager.adaptive-batch-scheduler.max-parallelism refers to
> maintaining a
> > uniform upper limit on parallelism across all vertices within a job. As
> the
> > source vertices are part of the global execution graph, they should also
> > respect this rule to ensure consistent application of parallelism
> > constraints.
> >
> > Best,
> > Junrui
> >
> > Venkatakrishnan Sowrirajan  于2024年4月12日周五 02:10写道:
> >
> > > Gentle bump on this question. cc @Becket Qin  as
> > > well.
> > >
> > > Regards
> > > Venkata krishnan
> > >
> > >
> > > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan <
> > > vsowr...@asu.edu> wrote:
> > >
> > > > Thanks for the response Lijie and Junrui. Sorry for the late reply.
> Few
> > > > follow up questions.
> > > >
> > > > > Source can actually ignore this limit
> > > > because it has no upstream, but this will lead to semantic
> > inconsistency.
> > > >
> > > > Lijie, can you please elaborate on the above comment further? What do
> > you
> > > > mean when you say it will lead to "semantic inconsistency"?
> > > >
> > > > > Secondly, we first need to limit the max parallelism of
> (downstream)
> > > > vertex, and then we can decide how many subpartitions (upstream
> vertex)
> > > > should produce. The limit should be effective, otherwise some
> > downstream
> > > > tasks will have no data to process.
> > > >
> > > > This makes sense in the context of any other vertices other than the
> > > > source vertex. As you mentioned above ("Source can actually ignore
> this
> > > > limit because it has no upstream"), therefore I feel "
> > > > jobmanager.adaptive-batch-scheduler.default-source-parallelism" need
> > not
> > > > be upper bounded by
> > > "jobmanager.adaptive-batch-scheduler.max-parallelism".
> > > >
> > > > Regards
> > > > Venkata krishnan
> > > >
> > > >
> > > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee 
> > wrote:
> > > >
> > > >> Hi Venkat,
> > > >>
> > > >> As Lijie mentioned,  in Flink, the parallelism is required to be
> less
> > > than
> > > >> or equal to the maximum parallelism. The config option
> > > >> jobmanager.adaptive-batch-scheduler.max-parallelism and
> > > >> jobmanager.adaptive-batch-scheduler.default-source-parallelism will
> be
> > > set
> > > >> as the source's parallelism and max-parallelism, respectively.
> > > Therefore,
> > > >> the check failed situation you encountered is in line with the
> > > >> expectations.

Re: Question around Flink's AdaptiveBatchScheduler

2024-04-13 Thread Venkatakrishnan Sowrirajan
Let me state why I think "*jobmanager.adaptive-batch-sche*
*duler.default-source-parallelism*" should not be bound by the "
*jobmanager.adaptive-batch-sche**duler.max-parallelism*".

   - Source vertex is unique and does not have any upstream vertices
   - Downstream vertices read shuffled data partitioned by key, which is
   not the case for the Source vertex
   - Limiting source parallelism by downstream vertices' max parallelism is
   incorrect

If we say for ""semantic consistency" the source vertex parallelism has to
be bound by the overall job's max parallelism, it can lead to following
issues:

   - High filter selectivity with huge amounts of data to read - setting
   high "*jobmanager.adaptive-batch-scheduler.max-parallelism*" so that
   source parallelism can be set higher can lead to small blocks and
   sub-optimal performance.
   - Setting high "*jobmanager.adaptive-batch-scheduler.max-parallelism*"
   requires careful tuning of network buffer configurations which is
   unnecessary in cases where it is not required just so that the source
   parallelism can be set high.

Regards
Venkata krishnan

On Thu, Apr 11, 2024 at 9:30 PM Junrui Lee  wrote:

> Hello Venkata krishnan,
>
> I think the term "semantic inconsistency" defined by
> jobmanager.adaptive-batch-scheduler.max-parallelism refers to maintaining a
> uniform upper limit on parallelism across all vertices within a job. As the
> source vertices are part of the global execution graph, they should also
> respect this rule to ensure consistent application of parallelism
> constraints.
>
> Best,
> Junrui
>
> Venkatakrishnan Sowrirajan  于2024年4月12日周五 02:10写道:
>
> > Gentle bump on this question. cc @Becket Qin  as
> > well.
> >
> > Regards
> > Venkata krishnan
> >
> >
> > On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan <
> > vsowr...@asu.edu> wrote:
> >
> > > Thanks for the response Lijie and Junrui. Sorry for the late reply. Few
> > > follow up questions.
> > >
> > > > Source can actually ignore this limit
> > > because it has no upstream, but this will lead to semantic
> inconsistency.
> > >
> > > Lijie, can you please elaborate on the above comment further? What do
> you
> > > mean when you say it will lead to "semantic inconsistency"?
> > >
> > > > Secondly, we first need to limit the max parallelism of (downstream)
> > > vertex, and then we can decide how many subpartitions (upstream vertex)
> > > should produce. The limit should be effective, otherwise some
> downstream
> > > tasks will have no data to process.
> > >
> > > This makes sense in the context of any other vertices other than the
> > > source vertex. As you mentioned above ("Source can actually ignore this
> > > limit because it has no upstream"), therefore I feel "
> > > jobmanager.adaptive-batch-scheduler.default-source-parallelism" need
> not
> > > be upper bounded by
> > "jobmanager.adaptive-batch-scheduler.max-parallelism".
> > >
> > > Regards
> > > Venkata krishnan
> > >
> > >
> > > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee 
> wrote:
> > >
> > >> Hi Venkat,
> > >>
> > >> As Lijie mentioned,  in Flink, the parallelism is required to be less
> > than
> > >> or equal to the maximum parallelism. The config option
> > >> jobmanager.adaptive-batch-scheduler.max-parallelism and
> > >> jobmanager.adaptive-batch-scheduler.default-source-parallelism will be
> > set
> > >> as the source's parallelism and max-parallelism, respectively.
> > Therefore,
> > >> the check failed situation you encountered is in line with the
> > >> expectations.
> > >>
> > >> Best,
> > >> Junrui
> > >>
> > >> Lijie Wang  于2024年2月29日周四 17:35写道:
> > >>
> > >> > Hi Venkat,
> > >> >
> > >> > >> default-source-parallelism config should be independent from the
> > >> > max-parallelism
> > >> >
> > >> > Actually, it's not.
> > >> >
> > >> > Firstly, it's obvious that the parallelism should be less than or
> > equal
> > >> to
> > >> > the max parallelism(both literally and execution). The
> > >> > "jobmanager.adaptive-batch-scheduler.max-parallelism" will be used
> as
> > >> the
> > >> > max parallelism for a vertex if you don't set max parallelism for it
> > >> > individually (Just like the source in your case).
> > >> >
> > >> > Secondly, we first need to limit the max parallelism of (downstream)
> > >> > vertex, and then we can decide how many subpartitions (upstream
> > vertex)
> > >> > should produce. The limit should be effective, otherwise some
> > downstream
> > >> > tasks will have no data to process. Source can actually ignore this
> > >> limit
> > >> > because it has no upstream, but this will lead to semantic
> > >> inconsistency.
> > >> >
> > >> > Best,
> > >> > Lijie
> > >> >
> > >> > Venkatakrishnan Sowrirajan  于2024年2月29日周四
> 05:49写道:
> > >> >
> > >> > > Hi Flink devs,
> > >> > >
> > >> > > With Flink's AdaptiveBatchScheduler
> > >> > > <
> > >> > >
> > >> >
> > >>
> >
> 

Re: Question around Flink's AdaptiveBatchScheduler

2024-04-11 Thread Junrui Lee
Hello Venkata krishnan,

I think the term "semantic inconsistency" defined by
jobmanager.adaptive-batch-scheduler.max-parallelism refers to maintaining a
uniform upper limit on parallelism across all vertices within a job. As the
source vertices are part of the global execution graph, they should also
respect this rule to ensure consistent application of parallelism
constraints.

Best,
Junrui

Venkatakrishnan Sowrirajan  于2024年4月12日周五 02:10写道:

> Gentle bump on this question. cc @Becket Qin  as
> well.
>
> Regards
> Venkata krishnan
>
>
> On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan <
> vsowr...@asu.edu> wrote:
>
> > Thanks for the response Lijie and Junrui. Sorry for the late reply. Few
> > follow up questions.
> >
> > > Source can actually ignore this limit
> > because it has no upstream, but this will lead to semantic inconsistency.
> >
> > Lijie, can you please elaborate on the above comment further? What do you
> > mean when you say it will lead to "semantic inconsistency"?
> >
> > > Secondly, we first need to limit the max parallelism of (downstream)
> > vertex, and then we can decide how many subpartitions (upstream vertex)
> > should produce. The limit should be effective, otherwise some downstream
> > tasks will have no data to process.
> >
> > This makes sense in the context of any other vertices other than the
> > source vertex. As you mentioned above ("Source can actually ignore this
> > limit because it has no upstream"), therefore I feel "
> > jobmanager.adaptive-batch-scheduler.default-source-parallelism" need not
> > be upper bounded by
> "jobmanager.adaptive-batch-scheduler.max-parallelism".
> >
> > Regards
> > Venkata krishnan
> >
> >
> > On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee  wrote:
> >
> >> Hi Venkat,
> >>
> >> As Lijie mentioned,  in Flink, the parallelism is required to be less
> than
> >> or equal to the maximum parallelism. The config option
> >> jobmanager.adaptive-batch-scheduler.max-parallelism and
> >> jobmanager.adaptive-batch-scheduler.default-source-parallelism will be
> set
> >> as the source's parallelism and max-parallelism, respectively.
> Therefore,
> >> the check failed situation you encountered is in line with the
> >> expectations.
> >>
> >> Best,
> >> Junrui
> >>
> >> Lijie Wang  于2024年2月29日周四 17:35写道:
> >>
> >> > Hi Venkat,
> >> >
> >> > >> default-source-parallelism config should be independent from the
> >> > max-parallelism
> >> >
> >> > Actually, it's not.
> >> >
> >> > Firstly, it's obvious that the parallelism should be less than or
> equal
> >> to
> >> > the max parallelism(both literally and execution). The
> >> > "jobmanager.adaptive-batch-scheduler.max-parallelism" will be used as
> >> the
> >> > max parallelism for a vertex if you don't set max parallelism for it
> >> > individually (Just like the source in your case).
> >> >
> >> > Secondly, we first need to limit the max parallelism of (downstream)
> >> > vertex, and then we can decide how many subpartitions (upstream
> vertex)
> >> > should produce. The limit should be effective, otherwise some
> downstream
> >> > tasks will have no data to process. Source can actually ignore this
> >> limit
> >> > because it has no upstream, but this will lead to semantic
> >> inconsistency.
> >> >
> >> > Best,
> >> > Lijie
> >> >
> >> > Venkatakrishnan Sowrirajan  于2024年2月29日周四 05:49写道:
> >> >
> >> > > Hi Flink devs,
> >> > >
> >> > > With Flink's AdaptiveBatchScheduler
> >> > > <
> >> > >
> >> >
> >>
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$
> >> > > >
> >> > > (Note:
> >> > > this is different from AdaptiveScheduler
> >> > > <
> >> > >
> >> >
> >>
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$
> >> > > >),
> >> > > the scheduler automatically determines the correct number of
> >> downstream
> >> > > tasks required to process the shuffle generated by the upstream
> >> vertex.
> >> > >
> >> > > I have a question regarding the current behavior. There are 2
> configs
> >> > which
> >> > > are in interplay here.
> >> > > 1. jobmanager.adaptive-batch-scheduler.default-source-parallelism
> >> > > <
> >> > >
> >> >
> >>
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
> >> > > >
> >> > >  - The default parallelism of data source.
> >> > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism
> >> > > <
> >> > >
> >> >
> >>
> 

Re: Question around Flink's AdaptiveBatchScheduler

2024-04-11 Thread Venkatakrishnan Sowrirajan
Gentle bump on this question. cc @Becket Qin  as well.

Regards
Venkata krishnan


On Tue, Mar 12, 2024 at 10:11 PM Venkatakrishnan Sowrirajan <
vsowr...@asu.edu> wrote:

> Thanks for the response Lijie and Junrui. Sorry for the late reply. Few
> follow up questions.
>
> > Source can actually ignore this limit
> because it has no upstream, but this will lead to semantic inconsistency.
>
> Lijie, can you please elaborate on the above comment further? What do you
> mean when you say it will lead to "semantic inconsistency"?
>
> > Secondly, we first need to limit the max parallelism of (downstream)
> vertex, and then we can decide how many subpartitions (upstream vertex)
> should produce. The limit should be effective, otherwise some downstream
> tasks will have no data to process.
>
> This makes sense in the context of any other vertices other than the
> source vertex. As you mentioned above ("Source can actually ignore this
> limit because it has no upstream"), therefore I feel "
> jobmanager.adaptive-batch-scheduler.default-source-parallelism" need not
> be upper bounded by "jobmanager.adaptive-batch-scheduler.max-parallelism".
>
> Regards
> Venkata krishnan
>
>
> On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee  wrote:
>
>> Hi Venkat,
>>
>> As Lijie mentioned,  in Flink, the parallelism is required to be less than
>> or equal to the maximum parallelism. The config option
>> jobmanager.adaptive-batch-scheduler.max-parallelism and
>> jobmanager.adaptive-batch-scheduler.default-source-parallelism will be set
>> as the source's parallelism and max-parallelism, respectively. Therefore,
>> the check failed situation you encountered is in line with the
>> expectations.
>>
>> Best,
>> Junrui
>>
>> Lijie Wang  于2024年2月29日周四 17:35写道:
>>
>> > Hi Venkat,
>> >
>> > >> default-source-parallelism config should be independent from the
>> > max-parallelism
>> >
>> > Actually, it's not.
>> >
>> > Firstly, it's obvious that the parallelism should be less than or equal
>> to
>> > the max parallelism(both literally and execution). The
>> > "jobmanager.adaptive-batch-scheduler.max-parallelism" will be used as
>> the
>> > max parallelism for a vertex if you don't set max parallelism for it
>> > individually (Just like the source in your case).
>> >
>> > Secondly, we first need to limit the max parallelism of (downstream)
>> > vertex, and then we can decide how many subpartitions (upstream vertex)
>> > should produce. The limit should be effective, otherwise some downstream
>> > tasks will have no data to process. Source can actually ignore this
>> limit
>> > because it has no upstream, but this will lead to semantic
>> inconsistency.
>> >
>> > Best,
>> > Lijie
>> >
>> > Venkatakrishnan Sowrirajan  于2024年2月29日周四 05:49写道:
>> >
>> > > Hi Flink devs,
>> > >
>> > > With Flink's AdaptiveBatchScheduler
>> > > <
>> > >
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$
>> > > >
>> > > (Note:
>> > > this is different from AdaptiveScheduler
>> > > <
>> > >
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$
>> > > >),
>> > > the scheduler automatically determines the correct number of
>> downstream
>> > > tasks required to process the shuffle generated by the upstream
>> vertex.
>> > >
>> > > I have a question regarding the current behavior. There are 2 configs
>> > which
>> > > are in interplay here.
>> > > 1. jobmanager.adaptive-batch-scheduler.default-source-parallelism
>> > > <
>> > >
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
>> > > >
>> > >  - The default parallelism of data source.
>> > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism
>> > > <
>> > >
>> >
>> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$
>> > > >
>> > > -
>> > > Upper bound of allowed parallelism to set adaptively.
>> > >
>> > > Currently, if "
>> > > jobmanager.adaptive-batch-scheduler.default-source-parallelism
>> > > <
>> > >
>> >
>> 

Re: Question around Flink's AdaptiveBatchScheduler

2024-03-12 Thread Venkatakrishnan Sowrirajan
Thanks for the response Lijie and Junrui. Sorry for the late reply. Few
follow up questions.

> Source can actually ignore this limit
because it has no upstream, but this will lead to semantic inconsistency.

Lijie, can you please elaborate on the above comment further? What do you
mean when you say it will lead to "semantic inconsistency"?

> Secondly, we first need to limit the max parallelism of (downstream)
vertex, and then we can decide how many subpartitions (upstream vertex)
should produce. The limit should be effective, otherwise some downstream
tasks will have no data to process.

This makes sense in the context of any other vertices other than the source
vertex. As you mentioned above ("Source can actually ignore this limit
because it has no upstream"), therefore I feel "
jobmanager.adaptive-batch-scheduler.default-source-parallelism" need not be
upper bounded by "jobmanager.adaptive-batch-scheduler.max-parallelism".

Regards
Venkata krishnan


On Thu, Feb 29, 2024 at 2:11 AM Junrui Lee  wrote:

> Hi Venkat,
>
> As Lijie mentioned,  in Flink, the parallelism is required to be less than
> or equal to the maximum parallelism. The config option
> jobmanager.adaptive-batch-scheduler.max-parallelism and
> jobmanager.adaptive-batch-scheduler.default-source-parallelism will be set
> as the source's parallelism and max-parallelism, respectively. Therefore,
> the check failed situation you encountered is in line with the
> expectations.
>
> Best,
> Junrui
>
> Lijie Wang  于2024年2月29日周四 17:35写道:
>
> > Hi Venkat,
> >
> > >> default-source-parallelism config should be independent from the
> > max-parallelism
> >
> > Actually, it's not.
> >
> > Firstly, it's obvious that the parallelism should be less than or equal
> to
> > the max parallelism(both literally and execution). The
> > "jobmanager.adaptive-batch-scheduler.max-parallelism" will be used as the
> > max parallelism for a vertex if you don't set max parallelism for it
> > individually (Just like the source in your case).
> >
> > Secondly, we first need to limit the max parallelism of (downstream)
> > vertex, and then we can decide how many subpartitions (upstream vertex)
> > should produce. The limit should be effective, otherwise some downstream
> > tasks will have no data to process. Source can actually ignore this limit
> > because it has no upstream, but this will lead to semantic inconsistency.
> >
> > Best,
> > Lijie
> >
> > Venkatakrishnan Sowrirajan  于2024年2月29日周四 05:49写道:
> >
> > > Hi Flink devs,
> > >
> > > With Flink's AdaptiveBatchScheduler
> > > <
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-batch-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISrg5BrHLw$
> > > >
> > > (Note:
> > > this is different from AdaptiveScheduler
> > > <
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/*adaptive-scheduler__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISqUzURivw$
> > > >),
> > > the scheduler automatically determines the correct number of downstream
> > > tasks required to process the shuffle generated by the upstream vertex.
> > >
> > > I have a question regarding the current behavior. There are 2 configs
> > which
> > > are in interplay here.
> > > 1. jobmanager.adaptive-batch-scheduler.default-source-parallelism
> > > <
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
> > > >
> > >  - The default parallelism of data source.
> > > 2. jobmanager.adaptive-batch-scheduler.max-parallelism
> > > <
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-max-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISpOw_L_Eg$
> > > >
> > > -
> > > Upper bound of allowed parallelism to set adaptively.
> > >
> > > Currently, if "
> > > jobmanager.adaptive-batch-scheduler.default-source-parallelism
> > > <
> > >
> >
> https://urldefense.com/v3/__https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/*jobmanager-adaptive-batch-scheduler-default-source-parallelism__;Iw!!IKRxdwAv5BmarQ!fwD4qP-fTEiqJH9CC3AHgXbR5MJPGm7ll1dYwElK-zrtujWDWio6_yvBa4rHlaZHP_lefLs4bZQAISoOTMiiCA$
> > > >"
> > > is greater than "jobmanager.adaptive-batch-scheduler.max-parallelism
> > > <
> > >
> >
> 

Re: Question around Flink's AdaptiveBatchScheduler

2024-02-29 Thread Junrui Lee
Hi Venkat,

As Lijie mentioned,  in Flink, the parallelism is required to be less than
or equal to the maximum parallelism. The config option
jobmanager.adaptive-batch-scheduler.max-parallelism and
jobmanager.adaptive-batch-scheduler.default-source-parallelism will be set
as the source's parallelism and max-parallelism, respectively. Therefore,
the check failed situation you encountered is in line with the expectations.

Best,
Junrui

Lijie Wang  于2024年2月29日周四 17:35写道:

> Hi Venkat,
>
> >> default-source-parallelism config should be independent from the
> max-parallelism
>
> Actually, it's not.
>
> Firstly, it's obvious that the parallelism should be less than or equal to
> the max parallelism(both literally and execution). The
> "jobmanager.adaptive-batch-scheduler.max-parallelism" will be used as the
> max parallelism for a vertex if you don't set max parallelism for it
> individually (Just like the source in your case).
>
> Secondly, we first need to limit the max parallelism of (downstream)
> vertex, and then we can decide how many subpartitions (upstream vertex)
> should produce. The limit should be effective, otherwise some downstream
> tasks will have no data to process. Source can actually ignore this limit
> because it has no upstream, but this will lead to semantic inconsistency.
>
> Best,
> Lijie
>
> Venkatakrishnan Sowrirajan  于2024年2月29日周四 05:49写道:
>
> > Hi Flink devs,
> >
> > With Flink's AdaptiveBatchScheduler
> > <
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/#adaptive-batch-scheduler
> > >
> > (Note:
> > this is different from AdaptiveScheduler
> > <
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/#adaptive-scheduler
> > >),
> > the scheduler automatically determines the correct number of downstream
> > tasks required to process the shuffle generated by the upstream vertex.
> >
> > I have a question regarding the current behavior. There are 2 configs
> which
> > are in interplay here.
> > 1. jobmanager.adaptive-batch-scheduler.default-source-parallelism
> > <
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism
> > >
> >  - The default parallelism of data source.
> > 2. jobmanager.adaptive-batch-scheduler.max-parallelism
> > <
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism
> > >
> > -
> > Upper bound of allowed parallelism to set adaptively.
> >
> > Currently, if "
> > jobmanager.adaptive-batch-scheduler.default-source-parallelism
> > <
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism
> > >"
> > is greater than "jobmanager.adaptive-batch-scheduler.max-parallelism
> > <
> >
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism
> > >",
> > Flink application fails with the below message:
> >
> > "Vertex's parallelism should be smaller than or equal to vertex's max
> > parallelism."
> >
> > This is the corresponding code in Flink's DefaultVertexParallelismInfo
> > <
> >
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java#L110
> > >.
> > My question is, "default-source-parallelism" config should be independent
> > from the "max-parallelism" flag. The former controls the default source
> > parallelism while the latter controls the max number of partitions to
> write
> > the intermediate shuffle.
> >
> > If this is true, then the above check should be fixed. Otherwise, wanted
> to
> > understand why the "default-source-parallelism` should be less than the
> > "max-parallelism"
> >
> > Thanks
> > Venkat
> >
>


Re: Question around Flink's AdaptiveBatchScheduler

2024-02-29 Thread Lijie Wang
Hi Venkat,

>> default-source-parallelism config should be independent from the
max-parallelism

Actually, it's not.

Firstly, it's obvious that the parallelism should be less than or equal to
the max parallelism(both literally and execution). The
"jobmanager.adaptive-batch-scheduler.max-parallelism" will be used as the
max parallelism for a vertex if you don't set max parallelism for it
individually (Just like the source in your case).

Secondly, we first need to limit the max parallelism of (downstream)
vertex, and then we can decide how many subpartitions (upstream vertex)
should produce. The limit should be effective, otherwise some downstream
tasks will have no data to process. Source can actually ignore this limit
because it has no upstream, but this will lead to semantic inconsistency.

Best,
Lijie

Venkatakrishnan Sowrirajan  于2024年2月29日周四 05:49写道:

> Hi Flink devs,
>
> With Flink's AdaptiveBatchScheduler
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/#adaptive-batch-scheduler
> >
> (Note:
> this is different from AdaptiveScheduler
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/#adaptive-scheduler
> >),
> the scheduler automatically determines the correct number of downstream
> tasks required to process the shuffle generated by the upstream vertex.
>
> I have a question regarding the current behavior. There are 2 configs which
> are in interplay here.
> 1. jobmanager.adaptive-batch-scheduler.default-source-parallelism
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism
> >
>  - The default parallelism of data source.
> 2. jobmanager.adaptive-batch-scheduler.max-parallelism
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism
> >
> -
> Upper bound of allowed parallelism to set adaptively.
>
> Currently, if "
> jobmanager.adaptive-batch-scheduler.default-source-parallelism
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism
> >"
> is greater than "jobmanager.adaptive-batch-scheduler.max-parallelism
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism
> >",
> Flink application fails with the below message:
>
> "Vertex's parallelism should be smaller than or equal to vertex's max
> parallelism."
>
> This is the corresponding code in Flink's DefaultVertexParallelismInfo
> <
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java#L110
> >.
> My question is, "default-source-parallelism" config should be independent
> from the "max-parallelism" flag. The former controls the default source
> parallelism while the latter controls the max number of partitions to write
> the intermediate shuffle.
>
> If this is true, then the above check should be fixed. Otherwise, wanted to
> understand why the "default-source-parallelism` should be less than the
> "max-parallelism"
>
> Thanks
> Venkat
>


Question around Flink's AdaptiveBatchScheduler

2024-02-28 Thread Venkatakrishnan Sowrirajan
Hi Flink devs,

With Flink's AdaptiveBatchScheduler

(Note:
this is different from AdaptiveScheduler
),
the scheduler automatically determines the correct number of downstream
tasks required to process the shuffle generated by the upstream vertex.

I have a question regarding the current behavior. There are 2 configs which
are in interplay here.
1. jobmanager.adaptive-batch-scheduler.default-source-parallelism

 - The default parallelism of data source.
2. jobmanager.adaptive-batch-scheduler.max-parallelism

-
Upper bound of allowed parallelism to set adaptively.

Currently, if "
jobmanager.adaptive-batch-scheduler.default-source-parallelism
"
is greater than "jobmanager.adaptive-batch-scheduler.max-parallelism
",
Flink application fails with the below message:

"Vertex's parallelism should be smaller than or equal to vertex's max
parallelism."

This is the corresponding code in Flink's DefaultVertexParallelismInfo
.
My question is, "default-source-parallelism" config should be independent
from the "max-parallelism" flag. The former controls the default source
parallelism while the latter controls the max number of partitions to write
the intermediate shuffle.

If this is true, then the above check should be fixed. Otherwise, wanted to
understand why the "default-source-parallelism` should be less than the
"max-parallelism"

Thanks
Venkat