Yunfeng Zhou created FLINK-28941:
------------------------------------
Summary: Savepoint ignores MaxConcurrentCheckpoint limit in
aligned checkpoint case
Key: FLINK-28941
URL: https://issues.apache.org/jira/browse/FLINK-28941
Project: Flink
Issue Type: Bug
Components: Runtime / Checkpointing
Affects Versions: 1.16.0
Reporter: Yunfeng Zhou
When the unaligned checkpoint is disabled, savepoints would be set as
forced[1], which means they can ignore the maxConcurrentCheckpoint limit[2] and
lead to the situation that there are more than maxConcurrentCheckpoint running
simultaneously.
This behavior is incompatible with OperatorCoordinatorHolder, which requires
that there should be at most one pending checkpoint at a time. As a result,
exceptions, as follows, might be thrown[3].
{code:java}
java.lang.IllegalStateException: Cannot mark for checkpoint 9, already marked
for checkpoint 8
at
org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.markForCheckpoint(SubtaskGatewayImpl.java:185)
~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinatorInternal$6(OperatorCoordinatorHolder.java:328)
~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
at java.util.HashMap.forEach(HashMap.java:1289) ~[?:1.8.0_292]
at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.checkpointCoordinatorInternal(OperatorCoordinatorHolder.java:327)
~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
at
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinator$0(OperatorCoordinatorHolder.java:243)
~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
~[classes/:?]
at
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
~[classes/:?]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
~[classes/:?]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
~[classes/:?]
at
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
~[classes/:?]
at
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
~[classes/:?]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
[akka-actor_2.12-2.6.15.jar:2.6.15]
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
[akka-actor_2.12-2.6.15.jar:2.6.15]
at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
[scala-library-2.12.7.jar:?]
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
[scala-library-2.12.7.jar:?]
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
[akka-actor_2.12-2.6.15.jar:2.6.15]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
[scala-library-2.12.7.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
[scala-library-2.12.7.jar:?]
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
[scala-library-2.12.7.jar:?]
at akka.actor.Actor.aroundReceive(Actor.scala:537)
[akka-actor_2.12-2.6.15.jar:2.6.15]
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
[akka-actor_2.12-2.6.15.jar:2.6.15]
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
[akka-actor_2.12-2.6.15.jar:2.6.15]
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
[akka-actor_2.12-2.6.15.jar:2.6.15]
at akka.actor.ActorCell.invoke(ActorCell.scala:548)
[akka-actor_2.12-2.6.15.jar:2.6.15]
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
[akka-actor_2.12-2.6.15.jar:2.6.15]
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
[akka-actor_2.12-2.6.15.jar:2.6.15]
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
[akka-actor_2.12-2.6.15.jar:2.6.15]
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
[?:1.8.0_292]
at
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
[?:1.8.0_292]
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
[?:1.8.0_292]
at
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
[?:1.8.0_292]
{code}
[1]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java#L160-L164
[2]
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L444-L449
[3]
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39860&view=logs&j=219f6d90-20a2-5863-7c1b-c80377a1018f&t=20186858-1485-5059-c9c6-446952519524&s=ab6e269b-88b2-5ded-2544-4aa5b1124530
--
This message was sent by Atlassian Jira
(v8.20.10#820010)