Lijie Wang created FLINK-31114:
----------------------------------

             Summary: Batch job fails with IllegalStateException when using 
adaptive batch scheduler
                 Key: FLINK-31114
                 URL: https://issues.apache.org/jira/browse/FLINK-31114
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Coordination
            Reporter: Lijie Wang
            Assignee: Lijie Wang
             Fix For: 1.17.0


This is caused by FLINK-30942. Currently, if two job vertices have the same 
input and the same parallelism(even the parallelism is -1), they will share 
partitions. However after FLINK-30942, the scheduler may change the job 
vertices' parallelism before scheduling, resulting in two job vertices having 
the same parallelism in  compilation phase (in which case will share 
partitions), but different parallelism in the scheduling phase, and then cause 
the following exception:

{code:java}
Caused by: java.util.concurrent.CompletionException: 
java.lang.IllegalStateException: Consumers must have the same max parallelism.
        at 
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
        at 
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
        at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975)
        at 
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
        ... 37 more
Caused by: java.lang.IllegalStateException: Consumers must have the same max 
parallelism.
        at 
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
        at 
org.apache.flink.runtime.executiongraph.IntermediateResult.getConsumersMaxParallelism(IntermediateResult.java:219)
        at 
org.apache.flink.runtime.executiongraph.Execution.getPartitionMaxParallelism(Execution.java:501)
        at 
org.apache.flink.runtime.executiongraph.Execution.registerProducedPartitions(Execution.java:472)
        at 
org.apache.flink.runtime.executiongraph.Execution.registerProducedPartitions(Execution.java:431)
        at 
org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$registerProducedPartitions$5(DefaultExecutionDeployer.java:277)
        at 
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
        ... 38 more
{code}

Putting the following test into {{AdaptiveBatchSchedulerITCase}} can reproduce 
the problem:

{code:java}
    @Test
    void testDifferentConsumerParallelism() throws Exception {
        final Configuration configuration = createConfiguration();
        final StreamExecutionEnvironment env =
                
StreamExecutionEnvironment.createLocalEnvironment(configuration);
        env.setRuntimeMode(RuntimeExecutionMode.BATCH);
        env.setParallelism(8);

        final DataStream<Long> source1 =
                env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
                        .setParallelism(8)
                        .name("source1")
                        .slotSharingGroup("group1");

        final DataStream<Long> source2 =
                env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
                        .setParallelism(8)
                        .name("source2")
                        .slotSharingGroup("group2");

        source1.forward()
                .union(source2)
                .map(new NumberCounter())
                .name("map1")
                .slotSharingGroup("group3");

        source2.map(new 
NumberCounter()).name("map2").slotSharingGroup("group4");

        env.execute();
    }
{code}





--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to