Hi Alexey,

It seems that the KubernetesHAService works well since all the checkpoints
have been cleaned up when the job is canceled.
And we could find related logs "Found 0 checkpoints in
KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.".

However, it is a little strange that the CheckpointCoordinator is
recovering from a wrong savepoint path. Could you share the
full JobManager logs? One possible reason I could guess is the application
cluster entrypoint is not creating a new JobGraph from the specified
arguments.


Best,
Yang

Alexey Trenikhun <yen...@msn.com> 于2021年2月27日周六 上午1:48写道:

> Hello,
> We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is
> deployed as Job, single TM as StatefulSet). We taken savepoint with
> cancel=true. Now when we are trying to start job using --fromSavepoint *A*,
> where is *A* path we got from taking savepoint (ClusterEntrypoint reports
> *A* in log), but looks like Job for some reason ignores given *A* and
> actually trying to restore from some path *B* (CheckpointCoordinator logs
> *B* ):
>
> *{"ts":"2021-02-26T17:09:52.500Z","message":" Program
> Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> --configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> /opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> --fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-e8a201008f2c
> <http://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-e8a201008f2c>","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> --job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> com.App","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
> {"ts":"2021-02-26T17:09:52.501Z","message":"
>  
> --job-id","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
> {"ts":"2021-02-26T17:09:52.502Z","message":"
>  
> 00000000000000000000000000000000","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
>  *
> *...*
>
> *{"ts":"2021-02-26T17:09:59.176Z","message":"Recovering checkpoints from
> KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
> {"ts":"2021-02-26T17:09:59.181Z","message":"Found 0 checkpoints in
> KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
> {"ts":"2021-02-26T17:09:59.183Z","message":"All 0 checkpoints found are
> already
> downloaded.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
> {"ts":"2021-02-26T17:09:59.183Z","message":"No checkpoint found during
> restore.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
> {"ts":"2021-02-26T17:09:59.183Z","message":"Starting job
> 00000000000000000000000000000000 from savepoint
> wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-fbcd58f66685
> <http://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-fbcd58f66685>
> (allowing non restored
> state)","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
> {"ts":"2021-02-26T17:09:59.191Z","message":"0.0.7+g9bb29061\n  build
> 2021-02-21T21:13:31-0800\n  tag: 0.0.0.7\n  id:
> 0.0.0.7\n","logger_name":"com.Fsp","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
> {"ts":"2021-02-26T17:09:59.273Z","message":"Fatal error occurred in the
> cluster
> entrypoint.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-4","level":"ERROR","level_value":40000,"stack_trace":"org.apache.flink.util.FlinkException:
> JobMaster for job 00000000000000000000000000000000 failed.\n\tat
> org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:890)\n\tat
> org.apache.flink.runtime.dispatcher.Dispatcher.dispatcherJobFailed(Dispatcher.java:465)\n\tat
> org.apache.flink.runtime.dispatcher.Dispatcher.handleDispatcherJobResult(Dispatcher.java:444)\n\tat
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:423)\n\tat
> java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
> java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
> java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)\n\tat
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)\n\tat
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)\n\tat
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat
> akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat
> scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat
> scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat
> akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat
> scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat
> akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat
> akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat
> akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat
> akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat
> akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat
> akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat
> akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat
> akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat
> akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat
> akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused
> by: org.apache.flink.runtime.client.JobInitializationException: Could not
> instantiate JobManager.\n\tat
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)\n\tat
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\tat
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
> java.lang.Thread.run(Thread.java:748)\nCaused by:
> java.io.FileNotFoundException: Cannot find checkpoint or savepoint
> file/directory
> 'wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-fbcd58f66685
> <http://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-fbcd58f66685>'
> on file system 'wasbs'.\n\tat
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:250)\n\tat
> org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpoint(AbstractFsCheckpointStorageAccess.java:111)\n\tat
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1632)\n\tat
> org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:358)\n\tat
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:288)\n\tat
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:245)\n\tat
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133)\n\tat
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)\n\tat
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)\n\tat
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330)\n\tat
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)\n\tat
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)\n\tat
> org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162)\n\tat
> org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)\n\tat
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)\n\t...
> 4 common frames omitted\n"} *
>
> Any suggestions?
>
> Thanks,
> Alexey
>

Reply via email to