On a second thought, DefaultSchedulerFactory is probably not the right/only place to fix this because there are other scheduler factories as well (adaptive scheduler)
On Tue, Dec 2, 2025 at 8:47 AM Gyula Fóra <[email protected]> wrote: > Hi! > > Sorry for the late reply, generally it's best to open a jira directly in > these cases. > > You are completely right, I have tested this now and it seems to be > completely broken for 2.1 (works for 1.20). > I haven't dug very deep but your proposed fix is probably good. Can you > please open a JIRA/PR for this? > > We should definitely add some test coverage that would catch the current > issue if it comes back. > > Cheers > Gyula > > On Wed, Nov 26, 2025 at 8:16 PM Barak Bar-On <[email protected]> > wrote: > >> Hi Flink developers, >> >> I believe I've found a potential bug (or possibly missing feature) in >> Flink >> 2.x where pipeline.jobvertex-parallelism-overrides appears to be ignored >> when running jobs in Application Mode. I wanted to discuss this with the >> community before filing a JIRA. >> >> This is affecting the Flink Kubernetes Operator autoscaler functionality >> in >> our environment. >> >> *The Problem* >> When using the Flink Kubernetes Operator with autoscaling enabled, the >> autoscaler correctly calculates per-vertex parallelism and writes it to >> pipeline.jobvertex-parallelism-overrides. However, the job always runs >> with >> uniform parallelism (parallelism.default) instead of the per-vertex >> overrides. >> >> For example, Kafka sources connected to topics with 24 partitions end up >> running with parallelism 280 (the default), wasting resources with 256 >> idle >> subtasks per source. >> >> *Analysis* >> I traced this to Dispatcher.java. In Flink 2.0, FLINK-36446 changed >> internalSubmitJob() to accept ExecutionPlan instead of JobGraph: >> >> private CompletableFuture<Acknowledge> internalSubmitJob(ExecutionPlan >> executionPlan) { >> ... >> if (executionPlan instanceof JobGraph) { >> applyParallelismOverrides((JobGraph) executionPlan); >> } >> ... >> } >> >> In Application Mode, Flink submits a StreamGraph (not JobGraph) to the >> Dispatcher. Since executionPlan instanceof JobGraph is false, the >> parallelism overrides are skipped entirely. >> >> The StreamGraph is later converted to JobGraph in >> DefaultSchedulerFactory.createScheduler(): >> >> } else if (executionPlan instanceof StreamGraph) { >> jobGraph = ((StreamGraph) executionPlan).getJobGraph(userCodeLoader); >> } >> >> But by this point, the override logic has already been bypassed and >> there's >> no second check. >> >> *Evidence* >> From JobManager logs: >> >> - Config IS loaded: Loading configuration property: >> pipeline.jobvertex-parallelism-overrides, ... >> - Log shows: Added StreamGraph(jobId: xxx) - confirming StreamGraph is >> used >> - The message "Changing job vertex {} parallelism from {} to {}" does >> NOT appear >> - All tasks run with uniform parallelism instead of configured >> overrides >> >> *Versions Checked* >> I checked all 2.x branches and the same code pattern exists in: >> >> - release-2.0 >> - release-2.1 >> - release-2.2 >> - master >> >> *Proposed Fix* >> Apply parallelism overrides in DefaultSchedulerFactory right after the >> StreamGraph → JobGraph conversion: >> >> } else if (executionPlan instanceof StreamGraph) { >> jobGraph = ((StreamGraph) executionPlan).getJobGraph(userCodeLoader); >> >> // Apply parallelism overrides to the converted JobGraph >> Map<String, String> overrides = new HashMap<>(); >> >> overrides.putAll(configuration.get(PipelineOptions.PARALLELISM_OVERRIDES)); >> >> overrides.putAll(jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES)); >> >> for (JobVertex vertex : jobGraph.getVertices()) { >> String override = overrides.get(vertex.getID().toHexString()); >> if (override != null) { >> vertex.setParallelism(Integer.parseInt(override)); >> } >> } >> } >> >> This location works well because the JobGraph already has correct vertex >> IDs and configuration is available. >> >> *Questions* >> >> 1. Is this the expected behavior, or is this a bug/oversight from the >> FLINK-36446 refactoring? >> 2. Was pipeline.jobvertex-parallelism-overrides ever intended to work >> with StreamGraph submissions in Application Mode? >> 3. If this is indeed a bug, is the proposed fix location ( >> DefaultSchedulerFactory) appropriate, or would a different approach be >> preferred? >> 4. Should I create a JIRA issue for this? >> >> I'm happy to provide more details or contribute a fix if the community >> confirms this is a bug. >> >> Thanks. >> >
