* Flink version 1.15.1
* Running on HA Kubernetes cluster using flink-kubernetes-operator-1.1.0
operator
* Using RocksDB

Hello everyone,

I have a WindowedStream which I apply an aggregate function on

SingleOutputStreamOperator<Tuple3<String, String, Long>> count =
windowed.aggregate(
                new CountAggregateFunction<>(), new
ProcessWindowFunction<Long, Tuple3<String, String, Long>,
Tuple2<String, String>, GlobalWindow>() {
                    @Override
                    public void process(
                            Tuple2<String, String> key,
                            ProcessWindowFunction<Long, Tuple3<String,
String, Long>, Tuple2<String, String>, GlobalWindow>.Context context,
                            Iterable<Long> elements,
                            Collector<Tuple3<String, String, Long>> out
                    ) {
                        Long count = elements.iterator().next();
                        out.collect(Tuple3.of(key.f0, key.f1, count));
                    }
                }
        )
        .uid("count_1")
        .setMaxParallelism(1024)
        .setParallelism(2);


When I try to change the `.setParallelism(2)` to `setParallelism(4)` the
application fails to start with the following error:

Caused by: java.lang.IllegalStateException: Failed to rollback to
checkpoint/savepoint Checkpoint Metadata. Max parallelism mismatch between
checkpoint/savepoint state and new program. Cannot map operator
21310d6efc824fa24be58c9ddbfb752a with max parallelism 128 to new program
with max parallelism 1024. This indicates that the program has been changed
in a non-compatible way after the checkpoint/savepoint.
at
org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:184)
~[flink-dist-1.15.1.jar:1.15.1]
at
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1771)
~[flink-dist-1.15.1.jar:1.15.1]
at
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.tryRestoreExecutionGraphFromSavepoint(DefaultExecutionGraphFactory.java:206)
~[flink-dist-1.15.1.jar:1.15.1]
at
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:181)
~[flink-dist-1.15.1.jar:1.15.1]
at
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:363)
~[flink-dist-1.15.1.jar:1.15.1]
at
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:208)
~[flink-dist-1.15.1.jar:1.15.1]
...


Note that most of the operations have set the MaxParallelism to 1024 (only
some map and filter operations were left to their default value 128). But
the only difference from the previous state is the parallelism mentioned
above.
I am wondering if anyone has faced a similar issue while trying to scale a
specific operation, and maybe some hints on what I could be doing wrong.
Also I'm interested to learn if there is a way to find out which operator
is `21310d6efc824fa24be58c9ddbfb752a` reported in the stacktrace.

I've also tried changing the maxParallelism for all the jobs that have the
value 1024 to 128 but none of them was the offending one.

Reply via email to