Lijie Wang created FLINK-31114:
----------------------------------
Summary: Batch job fails with IllegalStateException when using
adaptive batch scheduler
Key: FLINK-31114
URL: https://issues.apache.org/jira/browse/FLINK-31114
Project: Flink
Issue Type: Bug
Components: Runtime / Coordination
Reporter: Lijie Wang
Assignee: Lijie Wang
Fix For: 1.17.0
This is caused by FLINK-30942. Currently, if two job vertices have the same
input and the same parallelism(even the parallelism is -1), they will share
partitions. However after FLINK-30942, the scheduler may change the job
vertices' parallelism before scheduling, resulting in two job vertices having
the same parallelism in compilation phase (in which case will share
partitions), but different parallelism in the scheduling phase, and then cause
the following exception:
{code:java}
Caused by: java.util.concurrent.CompletionException:
java.lang.IllegalStateException: Consumers must have the same max parallelism.
at
java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:273)
at
java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:280)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:975)
at
java.util.concurrent.CompletableFuture$UniCompose.tryFire(CompletableFuture.java:940)
... 37 more
Caused by: java.lang.IllegalStateException: Consumers must have the same max
parallelism.
at
org.apache.flink.util.Preconditions.checkState(Preconditions.java:193)
at
org.apache.flink.runtime.executiongraph.IntermediateResult.getConsumersMaxParallelism(IntermediateResult.java:219)
at
org.apache.flink.runtime.executiongraph.Execution.getPartitionMaxParallelism(Execution.java:501)
at
org.apache.flink.runtime.executiongraph.Execution.registerProducedPartitions(Execution.java:472)
at
org.apache.flink.runtime.executiongraph.Execution.registerProducedPartitions(Execution.java:431)
at
org.apache.flink.runtime.scheduler.DefaultExecutionDeployer.lambda$registerProducedPartitions$5(DefaultExecutionDeployer.java:277)
at
java.util.concurrent.CompletableFuture.uniCompose(CompletableFuture.java:966)
... 38 more
{code}
Putting the following test into {{AdaptiveBatchSchedulerITCase}} can reproduce
the problem:
{code:java}
@Test
void testDifferentConsumerParallelism() throws Exception {
final Configuration configuration = createConfiguration();
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.createLocalEnvironment(configuration);
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
env.setParallelism(8);
final DataStream<Long> source1 =
env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
.setParallelism(8)
.name("source1")
.slotSharingGroup("group1");
final DataStream<Long> source2 =
env.fromSequence(0, NUMBERS_TO_PRODUCE - 1)
.setParallelism(8)
.name("source2")
.slotSharingGroup("group2");
source1.forward()
.union(source2)
.map(new NumberCounter())
.name("map1")
.slotSharingGroup("group3");
source2.map(new
NumberCounter()).name("map2").slotSharingGroup("group4");
env.execute();
}
{code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)