Hi Venkat,

>> default-source-parallelism config should be independent from the
max-parallelism

Actually, it's not.

Firstly, it's obvious that the parallelism should be less than or equal to
the max parallelism(both literally and execution). The
"jobmanager.adaptive-batch-scheduler.max-parallelism" will be used as the
max parallelism for a vertex if you don't set max parallelism for it
individually (Just like the source in your case).

Secondly, we first need to limit the max parallelism of (downstream)
vertex, and then we can decide how many subpartitions (upstream vertex)
should produce. The limit should be effective, otherwise some downstream
tasks will have no data to process. Source can actually ignore this limit
because it has no upstream, but this will lead to semantic inconsistency.

Best,
Lijie

Venkatakrishnan Sowrirajan <vsowr...@asu.edu> 于2024年2月29日周四 05:49写道:

> Hi Flink devs,
>
> With Flink's AdaptiveBatchScheduler
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/#adaptive-batch-scheduler
> >
> (Note:
> this is different from AdaptiveScheduler
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/elastic_scaling/#adaptive-scheduler
> >),
> the scheduler automatically determines the correct number of downstream
> tasks required to process the shuffle generated by the upstream vertex.
>
> I have a question regarding the current behavior. There are 2 configs which
> are in interplay here.
> 1. jobmanager.adaptive-batch-scheduler.default-source-parallelism
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism
> >
>  - The default parallelism of data source.
> 2. jobmanager.adaptive-batch-scheduler.max-parallelism
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism
> >
> -
> Upper bound of allowed parallelism to set adaptively.
>
> Currently, if "
> jobmanager.adaptive-batch-scheduler.default-source-parallelism
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-default-source-parallelism
> >"
> is greater than "jobmanager.adaptive-batch-scheduler.max-parallelism
> <
> https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/#jobmanager-adaptive-batch-scheduler-max-parallelism
> >",
> Flink application fails with the below message:
>
> "Vertex's parallelism should be smaller than or equal to vertex's max
> parallelism."
>
> This is the corresponding code in Flink's DefaultVertexParallelismInfo
> <
> https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfo.java#L110
> >.
> My question is, "default-source-parallelism" config should be independent
> from the "max-parallelism" flag. The former controls the default source
> parallelism while the latter controls the max number of partitions to write
> the intermediate shuffle.
>
> If this is true, then the above check should be fixed. Otherwise, wanted to
> understand why the "default-source-parallelism` should be less than the
> "max-parallelism"
>
> Thanks
> Venkat
>

Reply via email to