[ https://issues.apache.org/jira/browse/FLINK-25267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Matthias Pohl updated FLINK-25267: ---------------------------------- Component/s: Runtime / Checkpointing > Unable to (always) recover using checkpoint in HA setup (both Zookeeper and > Kubernetes) > --------------------------------------------------------------------------------------- > > Key: FLINK-25267 > URL: https://issues.apache.org/jira/browse/FLINK-25267 > Project: Flink > Issue Type: Bug > Components: Deployment / Kubernetes, Runtime / Checkpointing, > Stateful Functions > Affects Versions: 1.12.1, statefun-3.0.0, statefun-3.1.0, 1.13.2 > Environment: MacOS 11.6, minikube v1.23.2, tried with both Stateful > Functions 3.0.0 and Stateful Functions 3.1.0 > Reporter: Kerem Ulutaş > Priority: Major > > My Stateful Functions job is running on Kubernetes (minikube on my local env) > and has these settings: > * Using StateFun v3.1.0 > * Checkpoints are stored on HDFS (state.checkpoint-storage: filesystem) > * Checkpointing mode is EXACTLY_ONCE > * State backend is rocksdb and incremental checkpointing is enabled > When I kill the jobmanager (master) pod, minikube starts another pod and this > new pod fails when it tries to load last checkpoint: > {code:java} > ... > 2021-12-11 14:25:26,426 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Initializing job myStatefunApp > (00000000000000000000000000000000). > 2021-12-11 14:25:26,443 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Using restart back off time strategy > FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, > backoffTimeMS=1000) for myStatefunApp (00000000000000000000000000000000). > 2021-12-11 14:25:26,516 INFO org.apache.flink.runtime.util.ZooKeeperUtils > [] - Initialized DefaultCompletedCheckpointStore in > 'ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}' > with /checkpoints/00000000000000000000000000000000. > 2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Running initialization on master for job myStatefunApp > (00000000000000000000000000000000). > 2021-12-11 14:25:26,599 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Successfully ran initialization on master in 0 ms. > 2021-12-11 14:25:26,617 INFO > org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - > Built 1 pipelined regions in 1 ms > 2021-12-11 14:25:26,626 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Using job/cluster config to configure application-defined > state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, > enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, > writeBatchSize=2097152} > 2021-12-11 14:25:26,627 INFO > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - > Using predefined options: DEFAULT. > 2021-12-11 14:25:26,627 INFO > org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - > Using application-defined options factory: > DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=1}}. > 2021-12-11 14:25:26,627 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Using application-defined state backend: > EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, > enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, > writeBatchSize=2097152} > 2021-12-11 14:25:26,631 INFO org.apache.flink.runtime.jobmaster.JobMaster > [] - Checkpoint storage is set to 'filesystem': (checkpoints > "hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp") > 2021-12-11 14:25:26,712 INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - > Recovering checkpoints from > ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}. > 2021-12-11 14:25:26,724 INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - > Found 1 checkpoints in > ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}. > 2021-12-11 14:25:26,725 INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - > Trying to fetch 1 checkpoints from storage. > 2021-12-11 14:25:26,725 INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - > Trying to retrieve checkpoint 2. > 2021-12-11 14:25:26,931 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Restoring > job 00000000000000000000000000000000 from Checkpoint 2 @ 1639232587220 for > 00000000000000000000000000000000 located at > hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp/00000000000000000000000000000000/chk-2. > 2021-12-11 14:25:27,012 ERROR > org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error > occurred in the cluster entrypoint. > org.apache.flink.util.FlinkException: JobMaster for job > 00000000000000000000000000000000 failed. > at > org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:415) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?] > at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown > Source) ~[?:?] > at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) > ~[?:?] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.actor.Actor.aroundReceive(Actor.scala:517) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.actor.Actor.aroundReceive$(Actor.scala:515) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > [flink-dist_2.12-1.13.2.jar:1.13.2] > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > [flink-dist_2.12-1.13.2.jar:1.13.2] > Caused by: org.apache.flink.runtime.client.JobInitializationException: Could > not start the JobMaster. > at > org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) > ~[?:?] > at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown > Source) ~[?:?] > at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) > ~[?:?] > at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) > ~[?:?] > at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) > ~[?:?] > at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown > Source) ~[?:?] > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > ~[?:?] > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > ~[?:?] > at java.lang.Thread.run(Unknown Source) ~[?:?] > Caused by: java.util.concurrent.CompletionException: > java.lang.IllegalStateException: There is no operator for the state > 18666b435c78ee2416e74bb997b798a7 > at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown > Source) ~[?:?] > at java.util.concurrent.CompletableFuture.completeThrowable(Unknown > Source) ~[?:?] > at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) > ~[?:?] > at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) > ~[?:?] > at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown > Source) ~[?:?] > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > ~[?:?] > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > ~[?:?] > at java.lang.Thread.run(Unknown Source) ~[?:?] > Caused by: java.lang.IllegalStateException: There is no operator for the > state 18666b435c78ee2416e74bb997b798a7 > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:712) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:100) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) > ~[flink-dist_2.12-1.13.2.jar:1.13.2] > at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) > ~[?:?] > at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) > ~[?:?] > at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown > Source) ~[?:?] > at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) > ~[?:?] > at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) > ~[?:?] > at java.lang.Thread.run(Unknown Source) ~[?:?] > 2021-12-11 14:25:27,017 INFO > org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Shutting > StatefulFunctionsClusterEntryPoint down with application status UNKNOWN. > Diagnostics Cluster entrypoint has been closed externally.. > 2021-12-11 14:25:27,021 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting > down rest endpoint. > 2021-12-11 14:25:27,025 INFO org.apache.flink.runtime.blob.BlobServer > [] - Stopped BLOB server at 0.0.0.0:6124 > 2021-12-11 14:25:27,034 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing > cache directory > /tmp/flink-web-6c2dafc9-bb7d-489a-9e2d-cf78e3f19b67/flink-web-ui > 2021-12-11 14:25:27,035 INFO > org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - > Stopping DefaultLeaderElectionService. > 2021-12-11 14:25:27,035 INFO > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - > Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/rest_server_lock'} > 2021-12-11 14:25:27,036 INFO > org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down > complete. > 2021-12-11 14:25:27,036 INFO > org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent > [] - Closing components. > 2021-12-11 14:25:27,037 INFO > org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - > Stopping DefaultLeaderRetrievalService. > 2021-12-11 14:25:27,037 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - > Closing > ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}. > 2021-12-11 14:25:27,037 INFO > org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - > Stopping DefaultLeaderRetrievalService. > 2021-12-11 14:25:27,037 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - > Closing > ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}. > 2021-12-11 14:25:27,038 INFO > org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - > Stopping DefaultLeaderElectionService. > 2021-12-11 14:25:27,038 INFO > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - > Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'} > 2021-12-11 14:25:27,039 INFO > org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - > Stopping JobDispatcherLeaderProcess. > 2021-12-11 14:25:27,040 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Closing the slot manager. > 2021-12-11 14:25:27,040 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager > [] - Suspending the slot manager. > 2021-12-11 14:25:27,041 INFO > org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - > Stopping DefaultLeaderElectionService. > 2021-12-11 14:25:27,041 INFO > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - > Closing > ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'} > {code} > > But somehow, among several restarts, jobmanager can randomly restore job from > the last checkpoint. After I changed log level of Flink to DEBUG, I've > managed to get the difference between an unsuccessful (resulting in above > log) and a successful sequence of events. It seems that operators can get > assigned different hashes between restarts, here is the relevant log section > for the unsucessful assignment (renamed my operators for clarity): > > {code:java} > 2021-12-11 21:55:14,001 DEBUG > org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated > hash '32d5ca33c915e65563a5c7f4d62703ad' for node 'router (my-ingress-1-in)-5' > {id: 5, parallelism: 1, user function: } > 2021-12-11 21:55:14,001 DEBUG > org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated > hash '33b86fe798648d648b237ddfc986200d' for node 'router (my-ingress-2-in)-4' > {id: 4, parallelism: 1, user function: } > 2021-12-11 21:55:14,001 DEBUG > org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated > hash 'bd4c3fa1570bbcf606f2dabddd61ed7f' for node 'router (my-ingress-3-in)-6' > {id: 6, parallelism: 1, user function: } {code} > .. and here is the same log section for the successful assignment: > {code:java} > 2021-12-11 21:55:34,543 DEBUG > org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated > hash 'a1448ecf31ac98d2215c38bfd119abe0' for node 'router (my-ingress-3-in)-5' > {id: 5, parallelism: 1, user function: } > 2021-12-11 21:55:34,543 DEBUG > org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated > hash '05037ff96baea131d9cf1390846efd98' for node 'router (my-ingress-1-in)-4' > {id: 4, parallelism: 1, user function: } > 2021-12-11 21:55:34,543 DEBUG > org.apache.flink.streaming.api.graph.StreamGraphHasherV2 [] - Generated > hash '18666b435c78ee2416e74bb997b798a7' for node 'router (my-ingress-2-in)-6' > {id: 6, parallelism: 1, user function: } {code} > As you can see, the hash "18666b435c78ee2416e74bb997b798a7" is generated and > jobmanager could match the operator for the state loaded from the checkpoint > and it could continue normal operation. Another thing to note is, router > operators have different ids assigned between the 2 runs. > > I took a look at StreamGraphHasherV2 code > ([link|https://github.com/apache/flink/blob/release-1.13.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java#L65]) > there is an explicit attempt to have the operator order the same between > different attempts, however my Stateful Functions application seems to be > able to avoid that attempt. > > Since we can't assign operator ids when using Stateful Functions, is there > anything I can do right to get it working correctly? Is this a bug, or am I > trying it with a wrong combination of settings or something like that? > > As a last note, I've also posted the same earlier to Stack Overflow, here is > the > [link|https://stackoverflow.com/questions/70316498/flink-statefun-high-availability-exception-java-lang-illegalstateexception-th] > to the question. > Thanks -- This message was sent by Atlassian Jira (v8.20.1#820001)