Barak created FLINK-38770:
-----------------------------

             Summary: pipeline.jobvertex-parallelism-overrides ignored for 
StreamGraph in Application Mode
                 Key: FLINK-38770
                 URL: https://issues.apache.org/jira/browse/FLINK-38770
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Coordination
    Affects Versions: 2.1.1, 2.2.0, 2.0.1, 2.1.0, 2.0.0
            Reporter: Barak
             Fix For: 2.1.2, 2.0.2, 2.2.0


h2. Problem

In Flink 2.0+, when running in Application Mode, the 
{{pipeline.jobvertex-parallelism-overrides}} configuration is completely 
ignored. This breaks the Flink Kubernetes Operator's autoscaler functionality, 
which relies on this configuration to adjust job parallelism during rescaling 
operations.

h2. Background

FLINK-36446 introduced the ability to submit {{StreamGraph}} directly instead 
of converting to {{JobGraph}} on the client side. This is the default behavior 
in Application Mode for Flink 2.0+.

However, the parallelism override logic in {{Dispatcher.internalSubmitJob()}} 
only handles {{JobGraph}}:

{code:java}
private CompletableFuture<Acknowledge> internalSubmitJob(ExecutionPlan 
executionPlan) {
    if (executionPlan instanceof JobGraph) {
        applyParallelismOverrides((JobGraph) executionPlan);  // StreamGraph 
bypasses this!
    }
    // ...
}
{code}

When {{StreamGraph}} is submitted, it bypasses this check entirely. The 
{{StreamGraph}} → {{JobGraph}} conversion happens later in the scheduler 
factories, *after* the override logic is skipped.

h2. Impact

* *Flink Kubernetes Operator autoscaler is broken* for Flink 2.0+ in 
Application Mode
* Any use case relying on {{pipeline.jobvertex-parallelism-overrides}} in 
Application Mode fails silently
* Jobs run with original parallelism instead of the overridden values

h2. Steps to Reproduce

# Create a simple Flink streaming job
# Deploy using Flink Kubernetes Operator in Application Mode with Flink 2.0+
# Trigger autoscaler rescaling (or manually set 
{{pipeline.jobvertex-parallelism-overrides}})
# Observe that job parallelism does not change

h2. Root Cause Analysis

h3. Session Mode (JobGraph submitted) - WORKING

{noformat}
Client: StreamGraph → JobGraph conversion
   ↓
Dispatcher.internalSubmitJob(JobGraph)
   ↓
✅ applyParallelismOverrides(JobGraph) - APPLIED
   ↓
SchedulerFactory receives modified JobGraph
{noformat}

h3. Application Mode (StreamGraph submitted) - BROKEN

{noformat}
Application Driver: Creates StreamGraph directly
   ↓
Dispatcher.internalSubmitJob(StreamGraph)
   ↓
❌ if (instanceof JobGraph) → FALSE, SKIPPED!
   ↓
SchedulerFactory.createInstance(StreamGraph)
   ↓
StreamGraph → JobGraph conversion happens HERE
   ↓
❌ Overrides never applied
{noformat}

h3. Affected Code Paths

The {{StreamGraph}} → {{JobGraph}} conversion happens in multiple scheduler 
factories:

||Scheduler Factory||File||Conversion Location||
|DefaultSchedulerFactory|DefaultSchedulerFactory.java:92|Direct conversion|
|AdaptiveSchedulerFactory|AdaptiveSchedulerFactory.java:96|Direct conversion|
|AdaptiveBatchSchedulerFactory|AdaptiveExecutionHandlerFactory.java:65|When 
batch recovery enabled|
|AdaptiveBatchScheduler (adaptive)|DefaultAdaptiveExecutionHandler|Incremental 
via AdaptiveGraphManager|

h2. Proposed Solution

h3. Approach

# *Create a utility class* {{ParallelismOverrideUtil}} with the override logic
# *Remove* the override logic from {{Dispatcher.internalSubmitJob()}}
# *Call the utility* in all scheduler factories after {{StreamGraph}} → 
{{JobGraph}} conversion
# *Handle incremental conversion* for {{DefaultAdaptiveExecutionHandler}}

h3. New Utility Class

Create {{flink-runtime/.../scheduler/ParallelismOverrideUtil.java}}:

{code:java}
public class ParallelismOverrideUtil {
    private static final Logger LOG = 
LoggerFactory.getLogger(ParallelismOverrideUtil.class);

    /**
     * Applies parallelism overrides from configuration to the JobGraph.
     * This method MUST be called after converting StreamGraph to JobGraph
     * in all SchedulerNGFactory implementations.
     */
    public static void applyParallelismOverrides(JobGraph jobGraph, 
Configuration configuration) {
        Map<String, String> overrides = new HashMap<>();
        
overrides.putAll(configuration.get(PipelineOptions.PARALLELISM_OVERRIDES));
        
overrides.putAll(jobGraph.getJobConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES));

        for (JobVertex vertex : jobGraph.getVertices()) {
            String override = overrides.get(vertex.getID().toHexString());
            if (override != null) {
                int currentParallelism = vertex.getParallelism();
                int overrideParallelism = Integer.parseInt(override);
                LOG.info("Applying parallelism override for vertex {}: {} -> 
{}",
                        vertex.getID(), currentParallelism, 
overrideParallelism);
                vertex.setParallelism(overrideParallelism);
            }
        }
    }
}
{code}

h3. Files to Modify

# {{Dispatcher.java}} - Remove {{applyParallelismOverrides()}} method and its 
call
# {{DefaultSchedulerFactory.java}} - Add utility call after conversion (line 
~92)
# {{AdaptiveSchedulerFactory.java}} - Add utility call after conversion (line 
~96)
# {{AdaptiveExecutionHandlerFactory.java}} - Add utility call after conversion 
(line ~65)
# {{DefaultAdaptiveExecutionHandler}} or {{AdaptiveGraphManager}} - Handle 
incremental conversion

h2. Testing Plan

h3. Existing Test Coverage

There is one existing test in {{DispatcherTest.java}}:
* {{testOverridingJobVertexParallelisms()}} - only tests {{JobGraph}} submission

*Missing coverage:* No tests exist for {{StreamGraph}} submission with 
parallelism overrides, which is why this regression was not caught.

h3. Required: Integration Test (using MiniClusterExtension)

Create new test class {{ParallelismOverridesITCase.java}} in {{flink-tests}} 
module.

This is the *recommended approach* because it:
* Tests the real flow: StreamGraph → Scheduler → JobGraph → Running job
* Can verify actual parallelism via REST API
* Uses existing test infrastructure ({{MiniClusterExtension}})
* Is self-contained and doesn't require complex mocking

{code:java}
@ExtendWith(TestLoggerExtension.class)
public class ParallelismOverridesITCase {

    @RegisterExtension
    private static final MiniClusterExtension MINI_CLUSTER_EXTENSION =
            new MiniClusterExtension(
                    new MiniClusterResourceConfiguration.Builder()
                            .setConfiguration(createConfiguration())
                            .setNumberSlotsPerTaskManager(4)
                            .build());

    private static Configuration createConfiguration() {
        Configuration config = new Configuration();
        // Parallelism overrides will be set per-test
        return config;
    }

    @Test
    void testParallelismOverridesWithStreamGraph(
            @InjectClusterClient RestClusterClient<?> client,
            @InjectMiniCluster MiniCluster miniCluster) throws Exception {

        // 1. Create StreamGraph with initial parallelism = 1
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStream<Integer> stream = env.fromElements(1, 2, 3)
                .map(x -> x);
        stream.sinkTo(new DiscardingSink<>());
        StreamGraph streamGraph = env.getStreamGraph();

        // 2. Get vertex ID and configure override to parallelism = 4
        String vertexId = getMapVertexId(streamGraph);
        Configuration config = new Configuration();
        config.set(PipelineOptions.PARALLELISM_OVERRIDES,
                   Map.of(vertexId, "4"));

        // 3. Submit StreamGraph with overrides
        JobID jobId = miniCluster.submitJob(streamGraph).get().getJobID();

        // 4. Verify actual parallelism is 4 (not 1)
        JobDetailsInfo jobDetails = client.getJobDetails(jobId).join();
        int actualParallelism = jobDetails.getJobVertexInfos().stream()
                .filter(v -> v.getName().contains("Map"))
                .findFirst()
                .map(JobDetailsInfo.JobVertexDetailsInfo::getParallelism)
                .orElseThrow();

        assertEquals(4, actualParallelism,
                "Parallelism override should be applied to StreamGraph 
submission");
    }

    @Test
    void testParallelismOverridesWithJobGraph(
            @InjectClusterClient RestClusterClient<?> client) throws Exception {
        // Ensure JobGraph submission still works (regression test for existing 
behavior)
        // Similar structure but submit JobGraph instead of StreamGraph
    }
}
{code}

h3. Test Matrix

||Test Case||ExecutionPlan||Expected Behavior||
|testParallelismOverridesWithStreamGraph|StreamGraph|Override applied (THIS IS 
THE BUG FIX)|
|testParallelismOverridesWithJobGraph|JobGraph|Override applied (existing 
behavior)|
|testOverridesFromJobConfigTakePrecedence|StreamGraph|Job config overrides 
cluster config|
|testPartialOverrides|StreamGraph|Only specified vertices modified|

h3. Optional: Unit Test for Utility Class

Add unit tests for {{ParallelismOverrideUtil}} if created:

{code:java}
class ParallelismOverrideUtilTest {
    @Test
    void testApplyOverridesFromConfiguration() { }

    @Test
    void testApplyOverridesFromJobGraphConfig() { }

    @Test
    void testJobGraphConfigTakesPrecedence() { }

    @Test
    void testUnknownVertexIdIgnored() { }

    @Test
    void testNoOverridesWhenEmpty() { }
}
{code}

h2. Acceptance Criteria

* {{pipeline.jobvertex-parallelism-overrides}} works in Application Mode with 
StreamGraph
* All scheduler types (Default, Adaptive, AdaptiveBatch) apply overrides 
correctly
* Session Mode (JobGraph submission) continues to work
* Overrides from both jobMasterConfiguration and jobConfiguration are respected
* Unit tests cover the utility method
* Integration tests cover all scheduler/execution-plan combinations

h2. Related

* FLINK-36446 - Submit StreamGraph directly in Application Mode (introduced the 
regression)
* [Mailing list 
discussion|https://lists.apache.org/thread/qb06cghv2bo6p1xhzt6qzj6wf3cl29gz]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to