Yunfeng Zhou created FLINK-28941:
------------------------------------

             Summary: Savepoint ignores MaxConcurrentCheckpoint limit in 
aligned checkpoint case
                 Key: FLINK-28941
                 URL: https://issues.apache.org/jira/browse/FLINK-28941
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Checkpointing
    Affects Versions: 1.16.0
            Reporter: Yunfeng Zhou


When the unaligned checkpoint is disabled, savepoints would be set as 
forced[1], which means they can ignore the maxConcurrentCheckpoint limit[2] and 
lead to the situation that there are more than maxConcurrentCheckpoint running 
simultaneously. 

This behavior is incompatible with OperatorCoordinatorHolder, which requires 
that there should be at most one pending checkpoint at a time. As a result, 
exceptions, as follows, might be thrown[3].


{code:java}
java.lang.IllegalStateException: Cannot mark for checkpoint 9, already marked 
for checkpoint 8
        at 
org.apache.flink.runtime.operators.coordination.SubtaskGatewayImpl.markForCheckpoint(SubtaskGatewayImpl.java:185)
 ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinatorInternal$6(OperatorCoordinatorHolder.java:328)
 ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
        at java.util.HashMap.forEach(HashMap.java:1289) ~[?:1.8.0_292]
        at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.checkpointCoordinatorInternal(OperatorCoordinatorHolder.java:327)
 ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
        at 
org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.lambda$checkpointCoordinator$0(OperatorCoordinatorHolder.java:243)
 ~[flink-runtime-1.16-SNAPSHOT.jar:1.16-SNAPSHOT]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:453)
 ~[classes/:?]
        at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
 ~[classes/:?]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:453)
 ~[classes/:?]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:218)
 ~[classes/:?]
        at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
 ~[classes/:?]
        at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
 ~[classes/:?]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
[scala-library-2.12.7.jar:?]
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
[scala-library-2.12.7.jar:?]
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[scala-library-2.12.7.jar:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[scala-library-2.12.7.jar:?]
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[scala-library-2.12.7.jar:?]
        at akka.actor.Actor.aroundReceive(Actor.scala:537) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
        at akka.actor.Actor.aroundReceive$(Actor.scala:535) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
        at akka.actor.ActorCell.invoke(ActorCell.scala:548) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
        at akka.dispatch.Mailbox.run(Mailbox.scala:231) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243) 
[akka-actor_2.12-2.6.15.jar:2.6.15]
        at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
[?:1.8.0_292]
        at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
[?:1.8.0_292]
        at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
[?:1.8.0_292]
        at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175) 
[?:1.8.0_292]
{code}


[1] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointRequestDecider.java#L160-L164
[2] 
https://github.com/apache/flink/blob/master/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java#L444-L449
[3] 
https://dev.azure.com/apache-flink/apache-flink/_build/results?buildId=39860&view=logs&j=219f6d90-20a2-5863-7c1b-c80377a1018f&t=20186858-1485-5059-c9c6-446952519524&s=ab6e269b-88b2-5ded-2544-4aa5b1124530



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to