[ 
https://issues.apache.org/jira/browse/FLINK-31114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lijie Wang closed FLINK-31114.
------------------------------
    Resolution: Fixed

> Batch job fails with IllegalStateException when using adaptive batch scheduler
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-31114
>                 URL: https://issues.apache.org/jira/browse/FLINK-31114
>             Project: Flink
>          Issue Type: Bug
>          Components: Runtime / Coordination
>            Reporter: Lijie Wang
>            Assignee: Lijie Wang
>            Priority: Blocker
>              Labels: pull-request-available
>             Fix For: 1.17.0
>
>
> This is caused by FLINK-30942. Currently, if two job vertices have the same 
> input and the same parallelism(even the parallelism is -1), they will share 
> partitions. However after FLINK-30942, the scheduler may change the job 
> vertices' parallelism before scheduling, resulting in two job vertices having 
> the same parallelism in  compilation phase (in which case will share 
> partitions), but different parallelism in the scheduling phase, and then 
> cause the following exception:
> {code:java}
> Caused by: java.util.concurrent.CompletionException: 
> java.lang.IllegalStateException: Consumers must have the same max parallelism.
>         at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
>         at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
>         at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975)
>         at 
> java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
>         ... 37 more
> Caused by: java.lang.IllegalStateException: Consumers must have the same max 
> parallelism.
>         at 
> org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
>         at 
> org.apache.flink.runtime.executiongraph.IntermediateResult.getConsumersMaxParallelism(IntermediateResult.java:219)
>         at 
> org.apache.flink.runtime.executiongraph.Execution.getPartitionMaxParallelism(Execution.java:501)
>         at 
> org.apache.flink.runtime.executiongraph.Execution.registerProducedPartitions(Execution.java:472)
>         at 
> org.apache.flink.runtime.executiongraph.Execution.registerProducedPartitions(Execution.java:431)
>         at 
> org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$registerProducedPartitions$5(DefaultExecutionDeployer.java:277)
>         at 
> java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
>         ... 38 more
> {code}
> Putting the following test into {{AdaptiveBatchSchedulerITCase}} can 
> reproduce the problem:
> {code:java}
>     @Test
>     void testDifferentConsumerParallelism() throws Exception {
>         final Configuration configuration = createConfiguration();
>         final StreamExecutionEnvironment env =
>                 
> StreamExecutionEnvironment.createLocalEnvironment(configuration);
>         env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>         env.setParallelism(8);
>         final DataStream<Long> source1 =
>                 env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
>                         .setParallelism(8)
>                         .name("source1")
>                         .slotSharingGroup("group1");
>         final DataStream<Long> source2 =
>                 env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
>                         .setParallelism(8)
>                         .name("source2")
>                         .slotSharingGroup("group2");
>         source1.forward()
>                 .union(source2)
>                 .map(new NumberCounter())
>                 .name("map1")
>                 .slotSharingGroup("group3");
>         source2.map(new 
> NumberCounter()).name("map2").slotSharingGroup("group4");
>         env.execute();
>     }
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to