Hey Gyula,
Thank you for your answer!

I’ll revisit my suggestion and identify the best place to address this
issue.
I’ll also open a Jira ticket.

On Tue, Dec 2, 2025 at 10:00 AM Gyula Fóra <[email protected]> wrote:

> On a second thought, DefaultSchedulerFactory is probably not the right/only
> place to fix this because there are other scheduler factories as well
> (adaptive scheduler)
>
> On Tue, Dec 2, 2025 at 8:47 AM Gyula Fóra <[email protected]> wrote:
>
> > Hi!
> >
> > Sorry for the late reply, generally it's best to open a jira directly in
> > these cases.
> >
> > You are completely right, I have tested this now and it seems to be
> > completely broken for 2.1 (works for 1.20).
> > I haven't dug very deep but your proposed fix is probably good. Can you
> > please open a JIRA/PR for this?
> >
> > We should definitely add some test coverage that would catch the current
> > issue if it comes back.
> >
> > Cheers
> > Gyula
> >
> > On Wed, Nov 26, 2025 at 8:16 PM Barak Bar-On <[email protected]>
> > wrote:
> >
> >> Hi Flink developers,
> >>
> >> I believe I've found a potential bug (or possibly missing feature) in
> >> Flink
> >> 2.x where pipeline.jobvertex-parallelism-overrides appears to be ignored
> >> when running jobs in Application Mode. I wanted to discuss this with the
> >> community before filing a JIRA.
> >>
> >> This is affecting the Flink Kubernetes Operator autoscaler functionality
> >> in
> >> our environment.
> >>
> >> *The Problem*
> >> When using the Flink Kubernetes Operator with autoscaling enabled, the
> >> autoscaler correctly calculates per-vertex parallelism and writes it to
> >> pipeline.jobvertex-parallelism-overrides. However, the job always runs
> >> with
> >> uniform parallelism (parallelism.default) instead of the per-vertex
> >> overrides.
> >>
> >> For example, Kafka sources connected to topics with 24 partitions end up
> >> running with parallelism 280 (the default), wasting resources with 256
> >> idle
> >> subtasks per source.
> >>
> >> *Analysis*
> >> I traced this to Dispatcher.java. In Flink 2.0, FLINK-36446 changed
> >> internalSubmitJob() to accept ExecutionPlan instead of JobGraph:
> >>
> >> private CompletableFuture<Acknowledge> internalSubmitJob(ExecutionPlan
> >> executionPlan) {
> >>     ...
> >>     if (executionPlan instanceof JobGraph) {
> >>         applyParallelismOverrides((JobGraph) executionPlan);
> >>     }
> >>     ...
> >> }
> >>
> >> In Application Mode, Flink submits a StreamGraph (not JobGraph) to the
> >> Dispatcher. Since executionPlan instanceof JobGraph is false, the
> >> parallelism overrides are skipped entirely.
> >>
> >> The StreamGraph is later converted to JobGraph in
> >> DefaultSchedulerFactory.createScheduler():
> >>
> >> } else if (executionPlan instanceof StreamGraph) {
> >>     jobGraph = ((StreamGraph)
> executionPlan).getJobGraph(userCodeLoader);
> >> }
> >>
> >> But by this point, the override logic has already been bypassed and
> >> there's
> >> no second check.
> >>
> >> *Evidence*
> >> From JobManager logs:
> >>
> >>    - Config IS loaded: Loading configuration property:
> >>    pipeline.jobvertex-parallelism-overrides, ...
> >>    - Log shows: Added StreamGraph(jobId: xxx) - confirming StreamGraph
> is
> >>    used
> >>    - The message "Changing job vertex {} parallelism from {} to {}" does
> >>    NOT appear
> >>    - All tasks run with uniform parallelism instead of configured
> >> overrides
> >>
> >> *Versions Checked*
> >> I checked all 2.x branches and the same code pattern exists in:
> >>
> >>    - release-2.0
> >>    - release-2.1
> >>    - release-2.2
> >>    - master
> >>
> >> *Proposed Fix*
> >> Apply parallelism overrides in DefaultSchedulerFactory right after the
> >> StreamGraph → JobGraph conversion:
> >>
> >> } else if (executionPlan instanceof StreamGraph) {
> >>     jobGraph = ((StreamGraph)
> executionPlan).getJobGraph(userCodeLoader);
> >>
> >>     // Apply parallelism overrides to the converted JobGraph
> >>     Map<String, String> overrides = new HashMap<>();
> >>
> >>
> overrides.putAll(configuration.get(PipelineOptions.PARALLELISM_OVERRIDES));
> >>
> >>
> overrides.putAll(jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES));
> >>
> >>     for (JobVertex vertex : jobGraph.getVertices()) {
> >>         String override = overrides.get(vertex.getID().toHexString());
> >>         if (override != null) {
> >>             vertex.setParallelism(Integer.parseInt(override));
> >>         }
> >>     }
> >> }
> >>
> >> This location works well because the JobGraph already has correct vertex
> >> IDs and configuration is available.
> >>
> >> *Questions*
> >>
> >>    1. Is this the expected behavior, or is this a bug/oversight from the
> >>    FLINK-36446 refactoring?
> >>    2. Was pipeline.jobvertex-parallelism-overrides ever intended to work
> >>    with StreamGraph submissions in Application Mode?
> >>    3. If this is indeed a bug, is the proposed fix location (
> >>    DefaultSchedulerFactory) appropriate, or would a different approach
> be
> >>    preferred?
> >>    4. Should I create a JIRA issue for this?
> >>
> >> I'm happy to provide more details or contribute a fix if the community
> >> confirms this is a bug.
> >>
> >> Thanks.
> >>
> >
>


-- 
Barak Bar-On, Staff Engineer
email: [email protected]  web: www.forter.com
mobile: 054-571-3374

Reply via email to