Hey Gyula, Thank you for your answer! I’ll revisit my suggestion and identify the best place to address this issue. I’ll also open a Jira ticket.
On Tue, Dec 2, 2025 at 10:00 AM Gyula Fóra <[email protected]> wrote: > On a second thought, DefaultSchedulerFactory is probably not the right/only > place to fix this because there are other scheduler factories as well > (adaptive scheduler) > > On Tue, Dec 2, 2025 at 8:47 AM Gyula Fóra <[email protected]> wrote: > > > Hi! > > > > Sorry for the late reply, generally it's best to open a jira directly in > > these cases. > > > > You are completely right, I have tested this now and it seems to be > > completely broken for 2.1 (works for 1.20). > > I haven't dug very deep but your proposed fix is probably good. Can you > > please open a JIRA/PR for this? > > > > We should definitely add some test coverage that would catch the current > > issue if it comes back. > > > > Cheers > > Gyula > > > > On Wed, Nov 26, 2025 at 8:16 PM Barak Bar-On <[email protected]> > > wrote: > > > >> Hi Flink developers, > >> > >> I believe I've found a potential bug (or possibly missing feature) in > >> Flink > >> 2.x where pipeline.jobvertex-parallelism-overrides appears to be ignored > >> when running jobs in Application Mode. I wanted to discuss this with the > >> community before filing a JIRA. > >> > >> This is affecting the Flink Kubernetes Operator autoscaler functionality > >> in > >> our environment. > >> > >> *The Problem* > >> When using the Flink Kubernetes Operator with autoscaling enabled, the > >> autoscaler correctly calculates per-vertex parallelism and writes it to > >> pipeline.jobvertex-parallelism-overrides. However, the job always runs > >> with > >> uniform parallelism (parallelism.default) instead of the per-vertex > >> overrides. > >> > >> For example, Kafka sources connected to topics with 24 partitions end up > >> running with parallelism 280 (the default), wasting resources with 256 > >> idle > >> subtasks per source. > >> > >> *Analysis* > >> I traced this to Dispatcher.java. In Flink 2.0, FLINK-36446 changed > >> internalSubmitJob() to accept ExecutionPlan instead of JobGraph: > >> > >> private CompletableFuture<Acknowledge> internalSubmitJob(ExecutionPlan > >> executionPlan) { > >> ... > >> if (executionPlan instanceof JobGraph) { > >> applyParallelismOverrides((JobGraph) executionPlan); > >> } > >> ... > >> } > >> > >> In Application Mode, Flink submits a StreamGraph (not JobGraph) to the > >> Dispatcher. Since executionPlan instanceof JobGraph is false, the > >> parallelism overrides are skipped entirely. > >> > >> The StreamGraph is later converted to JobGraph in > >> DefaultSchedulerFactory.createScheduler(): > >> > >> } else if (executionPlan instanceof StreamGraph) { > >> jobGraph = ((StreamGraph) > executionPlan).getJobGraph(userCodeLoader); > >> } > >> > >> But by this point, the override logic has already been bypassed and > >> there's > >> no second check. > >> > >> *Evidence* > >> From JobManager logs: > >> > >> - Config IS loaded: Loading configuration property: > >> pipeline.jobvertex-parallelism-overrides, ... > >> - Log shows: Added StreamGraph(jobId: xxx) - confirming StreamGraph > is > >> used > >> - The message "Changing job vertex {} parallelism from {} to {}" does > >> NOT appear > >> - All tasks run with uniform parallelism instead of configured > >> overrides > >> > >> *Versions Checked* > >> I checked all 2.x branches and the same code pattern exists in: > >> > >> - release-2.0 > >> - release-2.1 > >> - release-2.2 > >> - master > >> > >> *Proposed Fix* > >> Apply parallelism overrides in DefaultSchedulerFactory right after the > >> StreamGraph → JobGraph conversion: > >> > >> } else if (executionPlan instanceof StreamGraph) { > >> jobGraph = ((StreamGraph) > executionPlan).getJobGraph(userCodeLoader); > >> > >> // Apply parallelism overrides to the converted JobGraph > >> Map<String, String> overrides = new HashMap<>(); > >> > >> > overrides.putAll(configuration.get(PipelineOptions.PARALLELISM_OVERRIDES)); > >> > >> > overrides.putAll(jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES)); > >> > >> for (JobVertex vertex : jobGraph.getVertices()) { > >> String override = overrides.get(vertex.getID().toHexString()); > >> if (override != null) { > >> vertex.setParallelism(Integer.parseInt(override)); > >> } > >> } > >> } > >> > >> This location works well because the JobGraph already has correct vertex > >> IDs and configuration is available. > >> > >> *Questions* > >> > >> 1. Is this the expected behavior, or is this a bug/oversight from the > >> FLINK-36446 refactoring? > >> 2. Was pipeline.jobvertex-parallelism-overrides ever intended to work > >> with StreamGraph submissions in Application Mode? > >> 3. If this is indeed a bug, is the proposed fix location ( > >> DefaultSchedulerFactory) appropriate, or would a different approach > be > >> preferred? > >> 4. Should I create a JIRA issue for this? > >> > >> I'm happy to provide more details or contribute a fix if the community > >> confirms this is a bug. > >> > >> Thanks. > >> > > > -- Barak Bar-On, Staff Engineer email: [email protected] web: www.forter.com mobile: 054-571-3374
