Anton Kalashnikov created FLINK-22379:
-----------------------------------------

             Summary: Introduce a new JobStatus to avoid premature checkpoint 
triggering
                 Key: FLINK-22379
                 URL: https://issues.apache.org/jira/browse/FLINK-22379
             Project: Flink
          Issue Type: Improvement
          Components: Runtime / Checkpointing
            Reporter: Anton Kalashnikov


Right now, when JobStatus switches to RUNNING it allows CheckpointCoordinator 
to trigger checkpoint which is ok. But unfortunately, JobStatus switches to 
RUNNING before TaskState(ExecutionState) switches even to SCHEDULED. And this 
leads to several problems, one of them you can see in the log:
{noformat}
WARN  org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Failed 
to trigger checkpoint for job 
bc943302f92d979824fbc8f4cabc5db3.)org.apache.flink.runtime.checkpoint.CheckpointException:
 Checkpoint triggering task Source: EventSource -> Timestamps/Watermarks (1/7) 
of job bc943302f92d979824fbc8f4cabc5db3 has not being executed at the moment. 
Aborting checkpoint. Failure reason: Not all required tasks are currently 
running.    at 
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.checkTasksStarted(DefaultCheckpointPlanCalculator.java:152)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
org.apache.flink.runtime.checkpoint.DefaultCheckpointPlanCalculator.lambda$calculateCheckpointPlan$1(DefaultCheckpointPlanCalculator.java:114)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)
 ~[?:1.8.0_272]    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
akka.actor.Actor$class.aroundReceive(Actor.scala:517) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
akka.actor.ActorCell.invoke(ActorCell.scala:561) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
akka.dispatch.Mailbox.run(Mailbox.scala:225) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]    at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
~[flink-dist_2.11-1.13-SNAPSHOT.jar:1.13-SNAPSHOT]{noformat}
To avoid this problem, it is a good idea to introduce new JobStatus between 
CREATED and RUNNING(RESTORING?). And then:
 * JobStatus CREATED switches to RESTORING at the same time when right now 
CREATED switches to RUNNING
 * JobStatus RESTORING switches to RUNNING when all tasks switched their states 
from INITIALIZING to RUNNING


It also makes sense to rename ExecutionState.INITIALIZING to RESTORING in order 
to have the same name for job and task.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to