* Flink version 1.15.1 * Running on HA Kubernetes cluster using flink-kubernetes-operator-1.1.0 operator * Using RocksDB
Hello everyone, I have a WindowedStream which I apply an aggregate function on SingleOutputStreamOperator<Tuple3<String, String, Long>> count = windowed.aggregate( new CountAggregateFunction<>(), new ProcessWindowFunction<Long, Tuple3<String, String, Long>, Tuple2<String, String>, GlobalWindow>() { @Override public void process( Tuple2<String, String> key, ProcessWindowFunction<Long, Tuple3<String, String, Long>, Tuple2<String, String>, GlobalWindow>.Context context, Iterable<Long> elements, Collector<Tuple3<String, String, Long>> out ) { Long count = elements.iterator().next(); out.collect(Tuple3.of(key.f0, key.f1, count)); } } ) .uid("count_1") .setMaxParallelism(1024) .setParallelism(2); When I try to change the `.setParallelism(2)` to `setParallelism(4)` the application fails to start with the following error: Caused by: java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint Checkpoint Metadata. Max parallelism mismatch between checkpoint/savepoint state and new program. Cannot map operator 21310d6efc824fa24be58c9ddbfb752a with max parallelism 128 to new program with max parallelism 1024. This indicates that the program has been changed in a non-compatible way after the checkpoint/savepoint. at org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:184) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1771) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:206) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:181) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:363) ~[flink-dist-1.15.1.jar:1.15.1] at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:208) ~[flink-dist-1.15.1.jar:1.15.1] ... Note that most of the operations have set the MaxParallelism to 1024 (only some map and filter operations were left to their default value 128). But the only difference from the previous state is the parallelism mentioned above. I am wondering if anyone has faced a similar issue while trying to scale a specific operation, and maybe some hints on what I could be doing wrong. Also I'm interested to learn if there is a way to find out which operator is `21310d6efc824fa24be58c9ddbfb752a` reported in the stacktrace. I've also tried changing the maxParallelism for all the jobs that have the value 1024 to 128 but none of them was the offending one.