Barak created FLINK-38770:
-----------------------------
Summary: pipeline.jobvertex-parallelism-overrides ignored for
StreamGraph in Application Mode
Key: FLINK-38770
URL: https://issues.apache.org/jira/browse/FLINK-38770
Project: Flink
Issue Type: Bug
Components: Runtime / Coordination
Affects Versions: 2.1.1, 2.2.0, 2.0.1, 2.1.0, 2.0.0
Reporter: Barak
Fix For: 2.1.2, 2.0.2, 2.2.0
h2. Problem
In Flink 2.0+, when running in Application Mode, the
{{pipeline.jobvertex-parallelism-overrides}} configuration is completely
ignored. This breaks the Flink Kubernetes Operator's autoscaler functionality,
which relies on this configuration to adjust job parallelism during rescaling
operations.
h2. Background
FLINK-36446 introduced the ability to submit {{StreamGraph}} directly instead
of converting to {{JobGraph}} on the client side. This is the default behavior
in Application Mode for Flink 2.0+.
However, the parallelism override logic in {{Dispatcher.internalSubmitJob()}}
only handles {{JobGraph}}:
{code:java}
private CompletableFuture<Acknowledge> internalSubmitJob(ExecutionPlan
executionPlan) {
if (executionPlan instanceof JobGraph) {
applyParallelismOverrides((JobGraph) executionPlan); // StreamGraph
bypasses this!
}
// ...
}
{code}
When {{StreamGraph}} is submitted, it bypasses this check entirely. The
{{StreamGraph}} → {{JobGraph}} conversion happens later in the scheduler
factories, *after* the override logic is skipped.
h2. Impact
* *Flink Kubernetes Operator autoscaler is broken* for Flink 2.0+ in
Application Mode
* Any use case relying on {{pipeline.jobvertex-parallelism-overrides}} in
Application Mode fails silently
* Jobs run with original parallelism instead of the overridden values
h2. Steps to Reproduce
# Create a simple Flink streaming job
# Deploy using Flink Kubernetes Operator in Application Mode with Flink 2.0+
# Trigger autoscaler rescaling (or manually set
{{pipeline.jobvertex-parallelism-overrides}})
# Observe that job parallelism does not change
h2. Root Cause Analysis
h3. Session Mode (JobGraph submitted) - WORKING
{noformat}
Client: StreamGraph → JobGraph conversion
↓
Dispatcher.internalSubmitJob(JobGraph)
↓
✅ applyParallelismOverrides(JobGraph) - APPLIED
↓
SchedulerFactory receives modified JobGraph
{noformat}
h3. Application Mode (StreamGraph submitted) - BROKEN
{noformat}
Application Driver: Creates StreamGraph directly
↓
Dispatcher.internalSubmitJob(StreamGraph)
↓
❌ if (instanceof JobGraph) → FALSE, SKIPPED!
↓
SchedulerFactory.createInstance(StreamGraph)
↓
StreamGraph → JobGraph conversion happens HERE
↓
❌ Overrides never applied
{noformat}
h3. Affected Code Paths
The {{StreamGraph}} → {{JobGraph}} conversion happens in multiple scheduler
factories:
||Scheduler Factory||File||Conversion Location||
|DefaultSchedulerFactory|DefaultSchedulerFactory.java:92|Direct conversion|
|AdaptiveSchedulerFactory|AdaptiveSchedulerFactory.java:96|Direct conversion|
|AdaptiveBatchSchedulerFactory|AdaptiveExecutionHandlerFactory.java:65|When
batch recovery enabled|
|AdaptiveBatchScheduler (adaptive)|DefaultAdaptiveExecutionHandler|Incremental
via AdaptiveGraphManager|
h2. Proposed Solution
h3. Approach
# *Create a utility class* {{ParallelismOverrideUtil}} with the override logic
# *Remove* the override logic from {{Dispatcher.internalSubmitJob()}}
# *Call the utility* in all scheduler factories after {{StreamGraph}} →
{{JobGraph}} conversion
# *Handle incremental conversion* for {{DefaultAdaptiveExecutionHandler}}
h3. New Utility Class
Create {{flink-runtime/.../scheduler/ParallelismOverrideUtil.java}}:
{code:java}
public class ParallelismOverrideUtil {
private static final Logger LOG =
LoggerFactory.getLogger(ParallelismOverrideUtil.class);
/**
* Applies parallelism overrides from configuration to the JobGraph.
* This method MUST be called after converting StreamGraph to JobGraph
* in all SchedulerNGFactory implementations.
*/
public static void applyParallelismOverrides(JobGraph jobGraph,
Configuration configuration) {
Map<String, String> overrides = new HashMap<>();
overrides.putAll(configuration.get(PipelineOptions.PARALLELISM_OVERRIDES));
overrides.putAll(jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES));
for (JobVertex vertex : jobGraph.getVertices()) {
String override = overrides.get(vertex.getID().toHexString());
if (override != null) {
int currentParallelism = vertex.getParallelism();
int overrideParallelism = Integer.parseInt(override);
LOG.info("Applying parallelism override for vertex {}: {} ->
{}",
vertex.getID(), currentParallelism,
overrideParallelism);
vertex.setParallelism(overrideParallelism);
}
}
}
}
{code}
h3. Files to Modify
# {{Dispatcher.java}} - Remove {{applyParallelismOverrides()}} method and its
call
# {{DefaultSchedulerFactory.java}} - Add utility call after conversion (line
~92)
# {{AdaptiveSchedulerFactory.java}} - Add utility call after conversion (line
~96)
# {{AdaptiveExecutionHandlerFactory.java}} - Add utility call after conversion
(line ~65)
# {{DefaultAdaptiveExecutionHandler}} or {{AdaptiveGraphManager}} - Handle
incremental conversion
h2. Testing Plan
h3. Existing Test Coverage
There is one existing test in {{DispatcherTest.java}}:
* {{testOverridingJobVertexParallelisms()}} - only tests {{JobGraph}} submission
*Missing coverage:* No tests exist for {{StreamGraph}} submission with
parallelism overrides, which is why this regression was not caught.
h3. Required: Integration Test (using MiniClusterExtension)
Create new test class {{ParallelismOverridesITCase.java}} in {{flink-tests}}
module.
This is the *recommended approach* because it:
* Tests the real flow: StreamGraph → Scheduler → JobGraph → Running job
* Can verify actual parallelism via REST API
* Uses existing test infrastructure ({{MiniClusterExtension}})
* Is self-contained and doesn't require complex mocking
{code:java}
@ExtendWith(TestLoggerExtension.class)
public class ParallelismOverridesITCase {
@RegisterExtension
private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setConfiguration(createConfiguration())
.setNumberSlotsPerTaskManager(4)
.build());
private static Configuration createConfiguration() {
Configuration config = new Configuration();
// Parallelism overrides will be set per-test
return config;
}
@Test
void testParallelismOverridesWithStreamGraph(
@InjectClusterClient RestClusterClient<?> client,
@InjectMiniCluster MiniCluster miniCluster) throws Exception {
// 1. Create StreamGraph with initial parallelism = 1
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<Integer> stream = env.fromElements(1, 2, 3)
.map(x -> x);
stream.sinkTo(new DiscardingSink<>());
StreamGraph streamGraph = env.getStreamGraph();
// 2. Get vertex ID and configure override to parallelism = 4
String vertexId = getMapVertexId(streamGraph);
Configuration config = new Configuration();
config.set(PipelineOptions.PARALLELISM_OVERRIDES,
Map.of(vertexId, "4"));
// 3. Submit StreamGraph with overrides
JobID jobId = miniCluster.submitJob(streamGraph).get().getJobID();
// 4. Verify actual parallelism is 4 (not 1)
JobDetailsInfo jobDetails = client.getJobDetails(jobId).join();
int actualParallelism = jobDetails.getJobVertexInfos().stream()
.filter(v -> v.getName().contains("Map"))
.findFirst()
.map(JobDetailsInfo.JobVertexDetailsInfo::getParallelism)
.orElseThrow();
assertEquals(4, actualParallelism,
"Parallelism override should be applied to StreamGraph
submission");
}
@Test
void testParallelismOverridesWithJobGraph(
@InjectClusterClient RestClusterClient<?> client) throws Exception {
// Ensure JobGraph submission still works (regression test for existing
behavior)
// Similar structure but submit JobGraph instead of StreamGraph
}
}
{code}
h3. Test Matrix
||Test Case||ExecutionPlan||Expected Behavior||
|testParallelismOverridesWithStreamGraph|StreamGraph|Override applied (THIS IS
THE BUG FIX)|
|testParallelismOverridesWithJobGraph|JobGraph|Override applied (existing
behavior)|
|testOverridesFromJobConfigTakePrecedence|StreamGraph|Job config overrides
cluster config|
|testPartialOverrides|StreamGraph|Only specified vertices modified|
h3. Optional: Unit Test for Utility Class
Add unit tests for {{ParallelismOverrideUtil}} if created:
{code:java}
class ParallelismOverrideUtilTest {
@Test
void testApplyOverridesFromConfiguration() { }
@Test
void testApplyOverridesFromJobGraphConfig() { }
@Test
void testJobGraphConfigTakesPrecedence() { }
@Test
void testUnknownVertexIdIgnored() { }
@Test
void testNoOverridesWhenEmpty() { }
}
{code}
h2. Acceptance Criteria
* {{pipeline.jobvertex-parallelism-overrides}} works in Application Mode with
StreamGraph
* All scheduler types (Default, Adaptive, AdaptiveBatch) apply overrides
correctly
* Session Mode (JobGraph submission) continues to work
* Overrides from both jobMasterConfiguration and jobConfiguration are respected
* Unit tests cover the utility method
* Integration tests cover all scheduler/execution-plan combinations
h2. Related
* FLINK-36446 - Submit StreamGraph directly in Application Mode (introduced the
regression)
* [Mailing list
discussion|https://lists.apache.org/thread/qb06cghv2bo6p1xhzt6qzj6wf3cl29gz]
--
This message was sent by Atlassian Jira
(v8.20.10#820010)