Hi Yang,
Unfortunately I didn't save log. Trying to reproduce again, but now hitting 
different error - about incompatible version of ImmutableMapSerializer, which 
is strange, since while serialVersionUID indeed changed, however this 
serializer is only registered but not used, (there is no state using Kryo, I'm 
calling disableGenericTypes to ensure this), could be that when I call 
registerTypeWithKryoSerializer, the serializer become part of JobGraph? If that 
so, then perhaps it same root cause - new JobGraph is not created.

Thanks,
Alexey

________________________________
From: Yang Wang <danrtsey...@gmail.com>
Sent: Sunday, February 28, 2021 10:04 PM
To: Alexey Trenikhun <yen...@msn.com>
Cc: Flink User Mail List <user@flink.apache.org>
Subject: Re: Kubernetes HA - attempting to restore from wrong (non-existing) 
savepoint

Hi Alexey,

It seems that the KubernetesHAService works well since all the checkpoints have 
been cleaned up when the job is canceled.
And we could find related logs "Found 0 checkpoints in 
KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.".

However, it is a little strange that the CheckpointCoordinator is recovering 
from a wrong savepoint path. Could you share the
full JobManager logs? One possible reason I could guess is the application 
cluster entrypoint is not creating a new JobGraph from the specified arguments.


Best,
Yang

Alexey Trenikhun <yen...@msn.com<mailto:yen...@msn.com>> 于2021年2月27日周六 上午1:48写道:
Hello,
We have Flink job running in Kubernetes with Kuberenetes HA enabled (JM is 
deployed as Job, single TM as StatefulSet). We taken savepoint with 
cancel=true. Now when we are trying to start job using --fromSavepoint A, where 
is A path we got from taking savepoint (ClusterEntrypoint reports A in log), 
but looks like Job for some reason ignores given A and actually trying to 
restore from some path B (CheckpointCoordinator logs B ):

{"ts":"2021-02-26T17:09:52.500Z","message":" Program 
Arguments:","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    
--configDir","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    
/opt/flink/conf","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    
--fromSavepoint","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    
wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-e8a201008f2c<http://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-e8a201008f2c>","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    
--job-classname","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    
com.App","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.501Z","message":"    
--job-id","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:52.502Z","message":"    
00000000000000000000000000000000","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"main","level":"INFO","level_value":20000}
...
{"ts":"2021-02-26T17:09:59.176Z","message":"Recovering checkpoints from 
KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.181Z","message":"Found 0 checkpoints in 
KubernetesStateHandleStore{configMapName='gsp-00000000000000000000000000000000-jobmanager-leader'}.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"All 0 checkpoints found are already 
downloaded.","logger_name":"org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"No checkpoint found during 
restore.","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.183Z","message":"Starting job 
00000000000000000000000000000000 from savepoint 
wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-fbcd58f66685<http://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-fbcd58f66685>
 (allowing non restored 
state)","logger_name":"org.apache.flink.runtime.checkpoint.CheckpointCoordinator","thread_name":"cluster-io-thread-4","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.191Z","message":"0.0.7+g9bb29061\n  build 
2021-02-21T21:13:31-0800\n  tag: 0.0.0.7\n  id: 
0.0.0.7\n","logger_name":"com.Fsp","thread_name":"flink-akka.actor.default-dispatcher-2","level":"INFO","level_value":20000}
{"ts":"2021-02-26T17:09:59.273Z","message":"Fatal error occurred in the cluster 
entrypoint.","logger_name":"org.apache.flink.runtime.entrypoint.ClusterEntrypoint","thread_name":"flink-akka.actor.default-dispatcher-4","level":"ERROR","level_value":40000,"stack_trace":"org.apache.flink.util.FlinkException:
 JobMaster for job 00000000000000000000000000000000 failed.\n\tat 
org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:890)\n\tat
 
org.apache.flink.runtime.dispatcher.Dispatcher.dispatcherJobFailed(Dispatcher.java:465)\n\tat
 
org.apache.flink.runtime.dispatcher.Dispatcher.handleDispatcherJobResult(Dispatcher.java:444)\n\tat
 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:423)\n\tat
 
java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:836)\n\tat
 
java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:811)\n\tat
 
java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:456)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)\n\tat
 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)\n\tat
 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)\n\tat
 akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)\n\tat 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)\n\tat 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)\n\tat 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat 
akka.actor.Actor.aroundReceive(Actor.scala:517)\n\tat 
akka.actor.Actor.aroundReceive$(Actor.scala:515)\n\tat 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)\n\tat 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)\n\tat 
akka.actor.ActorCell.invoke(ActorCell.scala:561)\n\tat 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)\n\tat 
akka.dispatch.Mailbox.run(Mailbox.scala:225)\n\tat 
akka.dispatch.Mailbox.exec(Mailbox.scala:235)\n\tat 
akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)\n\tat 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)\n\tat
 akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)\n\tat 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)\nCaused
 by: org.apache.flink.runtime.client.JobInitializationException: Could not 
instantiate JobManager.\n\tat 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:494)\n\tat
 
java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1604)\n\tat
 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
 java.lang.Thread.run(Thread.java:748)\nCaused by: 
java.io.FileNotFoundException: Cannot find checkpoint or savepoint 
file/directory 
'wasbs://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-fbcd58f66685<http://gsp-st...@gspstatewestus2dev.blob.core.windows.net/gsp/savepoints/savepoint-000000-fbcd58f66685>'
 on file system 'wasbs'.\n\tat 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(AbstractFsCheckpointStorageAccess.java:250)\n\tat
 
org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess.resolveCheckpoint(AbstractFsCheckpointStorageAccess.java:111)\n\tat
 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1632)\n\tat
 
org.apache.flink.runtime.scheduler.SchedulerBase.tryRestoreExecutionGraphFromSavepoint(SchedulerBase.java:358)\n\tat
 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:288)\n\tat
 
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:245)\n\tat
 
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:133)\n\tat
 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:111)\n\tat
 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:345)\n\tat
 org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:330)\n\tat 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:95)\n\tat
 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.createJobMasterService(DefaultJobMasterServiceFactory.java:39)\n\tat
 
org.apache.flink.runtime.jobmaster.JobManagerRunnerImpl.<init>(JobManagerRunnerImpl.java:162)\n\tat
 
org.apache.flink.runtime.dispatcher.DefaultJobManagerRunnerFactory.createJobManagerRunner(DefaultJobManagerRunnerFactory.java:86)\n\tat
 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:478)\n\t...
 4 common frames omitted\n"}

Any suggestions?

Thanks,
Alexey

Attachment: job-graph-serialVersionUID.log
Description: job-graph-serialVersionUID.log

Reply via email to