Yunfeng Zhou created FLINK-28941: ------------------------------------ Summary: Savepoint ignores MaxConcurrentCheckpoint limit in aligned checkpoint case Key: FLINK-28941 URL: https://issues.apache.org/jira/browse/FLINK-28941 Project: Flink Issue Type: Bug Components: Runtime / Checkpointing Affects Versions: 1.16.0 Reporter: Yunfeng Zhou
When the unaligned checkpoint is disabled, savepoints would be set as forced[1], which means they can ignore the maxConcurrentCheckpoint limit[2] and lead to the situation that there are more than maxConcurrentCheckpoint running simultaneously. This behavior is incompatible with OperatorCoordinatorHolder, which requires that there should be at most one pending checkpoint at a time. As a result, exceptions, as follows, might be thrown[3]. {code:java} java.lang.IllegalStateException: Cannot mark for checkpoint 9, already marked for checkpoint 8 at org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.markForCheckpoint(SubtaskGatewayImpl.java:185) ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinatorInternal$6(OperatorCoordinatorHolder.java:328) ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at java.util.HashMap.forEach(HashMap.java:1289) ~[?:1.8.0_292] at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.checkpointCoordinatorInternal(OperatorCoordinatorHolder.java:327) ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinator$0(OperatorCoordinatorHolder.java:243) ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453) ~[classes/:?] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68) ~[classes/:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453) ~[classes/:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218) ~[classes/:?] at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84) ~[classes/:?] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168) ~[classes/:?] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) [akka-actor_2.12-2.6.15.jar:2.6.15] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) [akka-actor_2.12-2.6.15.jar:2.6.15] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) [scala-library-2.12.7.jar:?] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) [scala-library-2.12.7.jar:?] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) [akka-actor_2.12-2.6.15.jar:2.6.15] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) [scala-library-2.12.7.jar:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [scala-library-2.12.7.jar:?] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) [scala-library-2.12.7.jar:?] at akka.actor.Actor.aroundReceive(Actor.scala:537) [akka-actor_2.12-2.6.15.jar:2.6.15] at akka.actor.Actor.aroundReceive$(Actor.scala:535) [akka-actor_2.12-2.6.15.jar:2.6.15] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) [akka-actor_2.12-2.6.15.jar:2.6.15] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) [akka-actor_2.12-2.6.15.jar:2.6.15] at akka.actor.ActorCell.invoke(ActorCell.scala:548) [akka-actor_2.12-2.6.15.jar:2.6.15] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) [akka-actor_2.12-2.6.15.jar:2.6.15] at akka.dispatch.Mailbox.run(Mailbox.scala:231) [akka-actor_2.12-2.6.15.jar:2.6.15] at akka.dispatch.Mailbox.exec(Mailbox.scala:243) [akka-actor_2.12-2.6.15.jar:2.6.15] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) [?:1.8.0_292] at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) [?:1.8.0_292] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) [?:1.8.0_292] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) [?:1.8.0_292] {code} [1] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java#L160-L164 [2] https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L444-L449 [3] https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39860&view=logs&j=219f6d90-20a2-5863-7c1b-c80377a1018f&t=20186858-1485-5059-c9c6-446952519524&s=ab6e269b-88b2-5ded-2544-4aa5b1124530 -- This message was sent by Atlassian Jira (v8.20.10#820010)