Lijie Wang created FLINK-31114: ---------------------------------- Summary: Batch job fails with IllegalStateException when using adaptive batch scheduler Key: FLINK-31114 URL: https://issues.apache.org/jira/browse/FLINK-31114 Project: Flink Issue Type: Bug Components: Runtime / Coordination Reporter: Lijie Wang Assignee: Lijie Wang Fix For: 1.17.0
This is caused by FLINK-30942. Currently, if two job vertices have the same input and the same parallelism(even the parallelism is -1), they will share partitions. However after FLINK-30942, the scheduler may change the job vertices' parallelism before scheduling, resulting in two job vertices having the same parallelism in compilation phase (in which case will share partitions), but different parallelism in the scheduling phase, and then cause the following exception: {code:java} Caused by: java.util.concurrent.CompletionException: java.lang.IllegalStateException: Consumers must have the same max parallelism. at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273) at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975) at java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940) ... 37 more Caused by: java.lang.IllegalStateException: Consumers must have the same max parallelism. at org.apache.flink.util.Preconditions.checkState(Preconditions.java:193) at org.apache.flink.runtime.executiongraph.IntermediateResult.getConsumersMaxParallelism(IntermediateResult.java:219) at org.apache.flink.runtime.executiongraph.Execution.getPartitionMaxParallelism(Execution.java:501) at org.apache.flink.runtime.executiongraph.Execution.registerProducedPartitions(Execution.java:472) at org.apache.flink.runtime.executiongraph.Execution.registerProducedPartitions(Execution.java:431) at org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$registerProducedPartitions$5(DefaultExecutionDeployer.java:277) at java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966) ... 38 more {code} Putting the following test into {{AdaptiveBatchSchedulerITCase}} can reproduce the problem: {code:java} @Test void testDifferentConsumerParallelism() throws Exception { final Configuration configuration = createConfiguration(); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(configuration); env.setRuntimeMode(RuntimeExecutionMode.BATCH); env.setParallelism(8); final DataStream<Long> source1 = env.fromSequence(0, NUMBERS_TO_PRODUCE - 1) .setParallelism(8) .name("source1") .slotSharingGroup("group1"); final DataStream<Long> source2 = env.fromSequence(0, NUMBERS_TO_PRODUCE - 1) .setParallelism(8) .name("source2") .slotSharingGroup("group2"); source1.forward() .union(source2) .map(new NumberCounter()) .name("map1") .slotSharingGroup("group3"); source2.map(new NumberCounter()).name("map2").slotSharingGroup("group4"); env.execute(); } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)