[ https://issues.apache.org/jira/browse/FLINK-31114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Lijie Wang closed FLINK-31114. ------------------------------ Resolution: Fixed > Batch job fails with IllegalStateException when using adaptive batch scheduler > ------------------------------------------------------------------------------ > > Key: FLINK-31114 > URL: https://issues.apache.org/jira/browse/FLINK-31114 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Reporter: Lijie Wang > Assignee: Lijie Wang > Priority: Blocker > Labels: pull-request-available > Fix For: 1.17.0 > > > This is caused by FLINK-30942. Currently, if two job vertices have the same > input and the same parallelism(even the parallelism is -1), they will share > partitions. However after FLINK-30942, the scheduler may change the job > vertices' parallelism before scheduling, resulting in two job vertices having > the same parallelism in compilation phase (in which case will share > partitions), but different parallelism in the scheduling phase, and then > cause the following exception: > {code:java} > Caused by: java.util.concurrent.CompletionException: > java.lang.IllegalStateException: Consumers must have the same max parallelism. > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975) > at > java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) > ... 37 more > Caused by: java.lang.IllegalStateException: Consumers must have the same max > parallelism. > at > org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) > at > org.apache.flink.runtime.executiongraph.IntermediateResult.getConsumersMaxParallelism(IntermediateResult.java:219) > at > org.apache.flink.runtime.executiongraph.Execution.getPartitionMaxParallelism(Execution.java:501) > at > org.apache.flink.runtime.executiongraph.Execution.registerProducedPartitions(Execution.java:472) > at > org.apache.flink.runtime.executiongraph.Execution.registerProducedPartitions(Execution.java:431) > at > org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$registerProducedPartitions$5(DefaultExecutionDeployer.java:277) > at > java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) > ... 38 more > {code} > Putting the following test into {{AdaptiveBatchSchedulerITCase}} can > reproduce the problem: > {code:java} > @Test > void testDifferentConsumerParallelism() throws Exception { > final Configuration configuration = createConfiguration(); > final StreamExecutionEnvironment env = > > StreamExecutionEnvironment.createLocalEnvironment(configuration); > env.setRuntimeMode(RuntimeExecutionMode.BATCH); > env.setParallelism(8); > final DataStream<Long> source1 = > env.fromSequence(0, NUMBERS_TO_PRODUCE - 1) > .setParallelism(8) > .name("source1") > .slotSharingGroup("group1"); > final DataStream<Long> source2 = > env.fromSequence(0, NUMBERS_TO_PRODUCE - 1) > .setParallelism(8) > .name("source2") > .slotSharingGroup("group2"); > source1.forward() > .union(source2) > .map(new NumberCounter()) > .name("map1") > .slotSharingGroup("group3"); > source2.map(new > NumberCounter()).name("map2").slotSharingGroup("group4"); > env.execute(); > } > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)