[ 
https://issues.apache.org/jira/browse/FLINK-23770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17399160#comment-17399160
 ] 

Yun Gao commented on FLINK-23770:
---------------------------------

Hi [~pnowojski] the code is in `StateAssignmentOperation#assignStates` from 
line 105 ~ 121, when it splits the states to different vertices. 

I think currently `uisHash` is used when users forget to set a uid at the first 
run, then the operator get a random id. Then if the job finished / failed, and 
users want to restart a new job from the savepoint / external checkpoint of the 
last job, then he needs to a method to specify an operator need to be recovered 
from the corresponding operators in the first run. He could then set `uidHash` 
to be the operator id of the operator id in the last run to retrieve the state. 
After the second run, the checkpoint would use the generated operator id, then 
from the third run the users could remove the uidHash settings. 

> FLIP-147: Unable to recover after source fully finished
> -------------------------------------------------------
>
>                 Key: FLINK-23770
>                 URL: https://issues.apache.org/jira/browse/FLINK-23770
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Runtime / Checkpointing
>    Affects Versions: 1.14.0
>            Reporter: Roman Khachatryan
>            Assignee: Yun Gao
>            Priority: Critical
>              Labels: pull-request-available
>             Fix For: 1.14.0
>
>
> When running one of the IT cases from 
> https://github.com/apache/flink/pull/16773 
> I see the following failure:
>  {code}
> 10194 [flink-akka.actor.default-dispatcher-7] INFO  
> org.apache.flink.runtime.jobmaster.JobMaster [] - Trying to recover from a 
> global failure.
> org.apache.flink.util.FlinkRuntimeException: Can not restore vertex Source: 
> Custom Source -> Timestamps/Watermarks(cbc357ccb763df2852fee8c4fc7d55f2) 
> which contain both finished and unfinished operators
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$VerticesFinishedCache.calculateIfFinished(CheckpointCoordinator.java:1651)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$VerticesFinishedCache.lambda$getOrUpdate$0(CheckpointCoordinator.java:1631)
>  ~[classes/:?]
>       at java.util.HashMap.computeIfAbsent(HashMap.java:1127) ~[?:1.8.0_271]
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator$VerticesFinishedCache.getOrUpdate(CheckpointCoordinator.java:1629)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.validateFinishedOperators(CheckpointCoordinator.java:1674)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1577)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateToSubtasks(CheckpointCoordinator.java:1438)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.scheduler.SchedulerBase.restoreState(SchedulerBase.java:398)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.restartTasks(DefaultScheduler.java:317)
>  ~[classes/:?]
>       at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.lambda$null$2(DefaultScheduler.java:287)
>  ~[classes/:?]
>       at 
> java.util.concurrent.CompletableFuture.uniRun(CompletableFuture.java:719) 
> ~[?:1.8.0_271]
>       at 
> java.util.concurrent.CompletableFuture$UniRun.tryFire(CompletableFuture.java:701)
>  ~[?:1.8.0_271]
>       at 
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)
>  ~[?:1.8.0_271]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRunAsync$4(AkkaRpcActor.java:455)
>  ~[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at 
> org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
>  ~[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:455)
>  ~[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:213)
>  ~[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)
>  ~[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
>  ~[flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) 
> [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) 
> [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
> [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
> [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
> [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at akka.actor.Actor.aroundReceive(Actor.scala:537) 
> [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at akka.actor.Actor.aroundReceive$(Actor.scala:535) 
> [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) 
> [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) 
> [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at akka.actor.ActorCell.invoke(ActorCell.scala:548) 
> [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) 
> [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at akka.dispatch.Mailbox.run(Mailbox.scala:231) 
> [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:243) 
> [flink-rpc-akka_1bc30f88-029c-4db2-8df5-833082f3d1a5.jar:1.14-SNAPSHOT]
>       at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
> [?:1.8.0_271]
>       at 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1067) 
> [?:1.8.0_271]
>       at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1703) 
> [?:1.8.0_271]
>       at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:172) 
> [?:1.8.0_271]
> {code}
> The graph has several sources, only one of which is fully finished (i.e. all 
> subtasks).
> All sources have setUidHash set.
> The latter I think causes the problem:
> VerticesFinishedCache.checkOperatorFinished uses a hashmap of opertor states, 
> keyed by operator ID. It prefers user-defined ID falling back to a generated 
> one.
> However, the map seems to be always keyed by generated ID.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to