[ 
https://issues.apache.org/jira/browse/FLINK-25267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kerem Ulutaş updated FLINK-25267:
---------------------------------
    Description: 
My Stateful Functions job is running on Kubernetes (minikube on my local env) 
and has these settings:


 * Using StateFun v3.1.0
 * Checkpoints are stored on HDFS (state.checkpoint-storage: filesystem)
 * Checkpointing mode is EXACTLY_ONCE
 * State backend is rocksdb and incremental checkpointing is enabled


When I kill the jobmanager (master) pod, minikube starts another pod and this 
new pod fails when it tries to load last checkpoint:
{code:java}
...
2021-12-11 14:25:26,426 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Initializing job myStatefunApp 
(00000000000000000000000000000000).
2021-12-11 14:25:26,443 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Using restart back off time strategy 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, 
backoffTimeMS=1000) for myStatefunApp (00000000000000000000000000000000).
2021-12-11 14:25:26,516 INFO  org.apache.flink.runtime.util.ZooKeeperUtils      
           [] - Initialized DefaultCompletedCheckpointStore in 
'ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}'
 with /checkpoints/00000000000000000000000000000000.
2021-12-11 14:25:26,599 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Running initialization on master for job myStatefunApp 
(00000000000000000000000000000000).
2021-12-11 14:25:26,599 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Successfully ran initialization on master in 0 ms.
2021-12-11 14:25:26,617 INFO  
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 
1 pipelined regions in 1 ms
2021-12-11 14:25:26,626 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Using job/cluster config to configure application-defined state 
backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, 
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, 
writeBatchSize=2097152}
2021-12-11 14:25:26,627 INFO  
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using 
predefined options: DEFAULT.
2021-12-11 14:25:26,627 INFO  
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using 
application-defined options factory: 
DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=1}}.
2021-12-11 14:25:26,627 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Using application-defined state backend: 
EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, 
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, 
writeBatchSize=2097152}
2021-12-11 14:25:26,631 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Checkpoint storage is set to 'filesystem': (checkpoints 
"hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp")
2021-12-11 14:25:26,712 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
Recovering checkpoints from 
ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
2021-12-11 14:25:26,724 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Found 
1 checkpoints in 
ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
2021-12-11 14:25:26,725 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying 
to fetch 1 checkpoints from storage.
2021-12-11 14:25:26,725 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying 
to retrieve checkpoint 2.
2021-12-11 14:25:26,931 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring job 
00000000000000000000000000000000 from Checkpoint 2 @ 1639232587220 for 
00000000000000000000000000000000 located at 
hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp/00000000000000000000000000000000/chk-2.
2021-12-11 14:25:27,012 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error 
occurred in the cluster entrypoint.
org.apache.flink.util.FlinkException: JobMaster for job 
00000000000000000000000000000000 failed.
    at 
org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:415)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source) 
~[?:?]
    at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) 
~[?:?]
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.actor.Actor.aroundReceive(Actor.scala:517) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
not start the JobMaster.
    at 
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) 
~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown 
Source) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) 
~[?:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
~[?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
    at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.util.concurrent.CompletionException: 
java.lang.IllegalStateException: There is no operator for the state 
18666b435c78ee2416e74bb997b798a7
     at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) 
~[?:?]
    at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) 
~[?:?]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) 
~[?:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
~[?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
    at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.IllegalStateException: There is no operator for the state 
18666b435c78ee2416e74bb997b798a7
     at 
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:712)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:100)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190) 
~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) 
~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) 
~[?:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
~[?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
    at java.lang.Thread.run(Unknown Source) ~[?:?]
2021-12-11 14:25:27,017 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting 
StatefulFunctionsClusterEntryPoint down with application status UNKNOWN. 
Diagnostics Cluster entrypoint has been closed externally..
2021-12-11 14:25:27,021 INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting 
down rest endpoint.
2021-12-11 14:25:27,025 INFO  org.apache.flink.runtime.blob.BlobServer          
           [] - Stopped BLOB server at 0.0.0.0:6124
2021-12-11 14:25:27,034 INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing 
cache directory /tmp/flink-web-6c2dafc9-bb7d-489a-9e2d-cf78e3f19b67/flink-web-ui
2021-12-11 14:25:27,035 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,035 INFO  
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/rest_server_lock'}
2021-12-11 14:25:27,036 INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down 
complete.
2021-12-11 14:25:27,036 INFO  
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
 [] - Closing components.
2021-12-11 14:25:27,037 INFO  
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
Stopping DefaultLeaderRetrievalService.
2021-12-11 14:25:27,037 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - 
Closing ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}.
2021-12-11 14:25:27,037 INFO  
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
Stopping DefaultLeaderRetrievalService.
2021-12-11 14:25:27,037 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - 
Closing 
ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}.
2021-12-11 14:25:27,038 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,038 INFO  
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'}
2021-12-11 14:25:27,039 INFO  
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - 
Stopping JobDispatcherLeaderProcess.
2021-12-11 14:25:27,040 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Closing the slot manager.
2021-12-11 14:25:27,040 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Suspending the slot manager.
2021-12-11 14:25:27,041 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,041 INFO  
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
Closing 
ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'} {code}
 

But somehow, among several restarts, jobmanager can randomly restore job from 
the last checkpoint. After I changed log level of Flink to DEBUG, I've managed 
to get the difference between an unsuccessful (resulting in above log) and a 
successful sequence of events. It seems that operators can get assigned 
different hashes between restarts, here is the relevant log section for the 
unsucessful assignment (renamed my operators for clarity):

 
{code:java}
2021-12-11 21:55:14,001 DEBUG 
org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
hash '32d5ca33c915e65563a5c7f4d62703ad' for node 'router (my-ingress-1-in)-5' 
{id: 5, parallelism: 1, user function: }
2021-12-11 21:55:14,001 DEBUG 
org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
hash '33b86fe798648d648b237ddfc986200d' for node 'router (my-ingress-2-in)-4' 
{id: 4, parallelism: 1, user function: }
2021-12-11 21:55:14,001 DEBUG 
org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
hash 'bd4c3fa1570bbcf606f2dabddd61ed7f' for node 'router (my-ingress-3-in)-6' 
{id: 6, parallelism: 1, user function: } {code}
.. and here is the same log section for the successful assignment:
{code:java}
2021-12-11 21:55:34,543 DEBUG 
org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
hash 'a1448ecf31ac98d2215c38bfd119abe0' for node 'router (my-ingress-3-in)-5' 
{id: 5, parallelism: 1, user function: }
2021-12-11 21:55:34,543 DEBUG 
org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
hash '05037ff96baea131d9cf1390846efd98' for node 'router (my-ingress-1-in)-4' 
{id: 4, parallelism: 1, user function: }
2021-12-11 21:55:34,543 DEBUG 
org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
hash '18666b435c78ee2416e74bb997b798a7' for node 'router (my-ingress-2-in)-6' 
{id: 6, parallelism: 1, user function: } {code}
As you can see, the hash "18666b435c78ee2416e74bb997b798a7" is generated and 
jobmanager could match the operator for the state loaded from the checkpoint 
and it could continue normal operation. Another thing to note is, router 
operators have different ids assigned between the 2 runs.

 

I took a look at StreamGraphHasherV2 code ([link|#L65]),]) there is an explicit 
attempt to have the operator order the same between different attempts, however 
my Stateful Functions application seems to be able to avoid that attempt.

 

Since we can't assign operator ids when using Stateful Functions, is there 
anything I can do right to get it working correctly? Is this a bug, or am I 
trying it with a wrong combination of settings or something like that?

 

As a last note, I've also posted the same earlier to Stack Overflow, here is 
the 
[link|https://stackoverflow.com/questions/70316498/flink-statefun-high-availability-exception-java-lang-illegalstateexception-th]
 to the question.

Thanks

  was:
My Stateful Functions job is running on Kubernetes (minikube on my local env) 
and has these settings:
 * Using StateFun v3.1.0
 * Checkpoints are stored on HDFS (state.checkpoint-storage: filesystem)
 * Checkpointing mode is EXACTLY_ONCE
 * State backend is rocksdb and incremental checkpointing is enabled

When I kill the jobmanager (master) pod, minikube starts another pod and this 
new pod fails when it tries to load last checkpoint:

 
{code:java}
...
2021-12-11 14:25:26,426 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Initializing job myStatefunApp 
(00000000000000000000000000000000).
2021-12-11 14:25:26,443 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Using restart back off time strategy 
FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, 
backoffTimeMS=1000) for myStatefunApp (00000000000000000000000000000000).
2021-12-11 14:25:26,516 INFO  org.apache.flink.runtime.util.ZooKeeperUtils      
           [] - Initialized DefaultCompletedCheckpointStore in 
'ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}'
 with /checkpoints/00000000000000000000000000000000.
2021-12-11 14:25:26,599 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Running initialization on master for job myStatefunApp 
(00000000000000000000000000000000).
2021-12-11 14:25:26,599 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Successfully ran initialization on master in 0 ms.
2021-12-11 14:25:26,617 INFO  
org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - Built 
1 pipelined regions in 1 ms
2021-12-11 14:25:26,626 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Using job/cluster config to configure application-defined state 
backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, 
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, 
writeBatchSize=2097152}
2021-12-11 14:25:26,627 INFO  
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using 
predefined options: DEFAULT.
2021-12-11 14:25:26,627 INFO  
org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - Using 
application-defined options factory: 
DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=1}}.
2021-12-11 14:25:26,627 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Using application-defined state backend: 
EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, 
enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, 
writeBatchSize=2097152}
2021-12-11 14:25:26,631 INFO  org.apache.flink.runtime.jobmaster.JobMaster      
           [] - Checkpoint storage is set to 'filesystem': (checkpoints 
"hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp")
2021-12-11 14:25:26,712 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
Recovering checkpoints from 
ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
2021-12-11 14:25:26,724 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Found 
1 checkpoints in 
ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
2021-12-11 14:25:26,725 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying 
to fetch 1 checkpoints from storage.
2021-12-11 14:25:26,725 INFO  
org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - Trying 
to retrieve checkpoint 2.
2021-12-11 14:25:26,931 INFO  
org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring job 
00000000000000000000000000000000 from Checkpoint 2 @ 1639232587220 for 
00000000000000000000000000000000 located at 
hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp/00000000000000000000000000000000/chk-2.
2021-12-11 14:25:27,012 ERROR 
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error 
occurred in the cluster entrypoint.
org.apache.flink.util.FlinkException: JobMaster for job 
00000000000000000000000000000000 failed.
    at 
org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:415)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?]
    at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source) 
~[?:?]
    at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) 
~[?:?]
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.actor.Actor.aroundReceive(Actor.scala:517) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
[flink-dist_2.12-1.13.2.jar:1.13.2]
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
not start the JobMaster.
    at 
org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) 
~[?:?]
    at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown 
Source) ~[?:?]
    at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
~[?:?]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) 
~[?:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
~[?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
    at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.util.concurrent.CompletionException: 
java.lang.IllegalStateException: There is no operator for the state 
18666b435c78ee2416e74bb997b798a7
     at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source) 
~[?:?]
    at java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source) 
~[?:?]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) 
~[?:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
~[?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
    at java.lang.Thread.run(Unknown Source) ~[?:?]
Caused by: java.lang.IllegalStateException: There is no operator for the state 
18666b435c78ee2416e74bb997b798a7
     at 
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:712)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:100)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190) 
~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) 
~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at 
org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
 ~[flink-dist_2.12-1.13.2.jar:1.13.2]
    at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) 
~[?:?]
    at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
~[?:?]
    at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
    at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
 Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) ~[?:?]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) ~[?:?]
    at java.lang.Thread.run(Unknown Source) ~[?:?]
2021-12-11 14:25:27,017 INFO  
org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting 
StatefulFunctionsClusterEntryPoint down with application status UNKNOWN. 
Diagnostics Cluster entrypoint has been closed externally..
2021-12-11 14:25:27,021 INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting 
down rest endpoint.
2021-12-11 14:25:27,025 INFO  org.apache.flink.runtime.blob.BlobServer          
           [] - Stopped BLOB server at 0.0.0.0:6124
2021-12-11 14:25:27,034 INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing 
cache directory /tmp/flink-web-6c2dafc9-bb7d-489a-9e2d-cf78e3f19b67/flink-web-ui
2021-12-11 14:25:27,035 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,035 INFO  
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/rest_server_lock'}
2021-12-11 14:25:27,036 INFO  
org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down 
complete.
2021-12-11 14:25:27,036 INFO  
org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
 [] - Closing components.
2021-12-11 14:25:27,037 INFO  
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
Stopping DefaultLeaderRetrievalService.
2021-12-11 14:25:27,037 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - 
Closing ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}.
2021-12-11 14:25:27,037 INFO  
org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
Stopping DefaultLeaderRetrievalService.
2021-12-11 14:25:27,037 INFO  
org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - 
Closing 
ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}.
2021-12-11 14:25:27,038 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,038 INFO  
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'}
2021-12-11 14:25:27,039 INFO  
org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - 
Stopping JobDispatcherLeaderProcess.
2021-12-11 14:25:27,040 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Closing the slot manager.
2021-12-11 14:25:27,040 INFO  
org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager [] 
- Suspending the slot manager.
2021-12-11 14:25:27,041 INFO  
org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
Stopping DefaultLeaderElectionService.
2021-12-11 14:25:27,041 INFO  
org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
Closing 
ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'} {code}
But somehow, among several restarts, jobmanager can randomly restore job from 
the last checkpoint. After I changed log level of Flink to DEBUG, I've managed 
to get the difference between an unsuccessful (resulting in above log) and a 
successful sequence of events. It seems that operators can get assigned 
different hashes between restarts, here is the relevant log section for the 
unsucessful assignment (renamed my operators for clarity):

 
{code:java}
2021-12-11 21:55:14,001 DEBUG 
org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
hash '32d5ca33c915e65563a5c7f4d62703ad' for node 'router (my-ingress-1-in)-5' 
{id: 5, parallelism: 1, user function: }
2021-12-11 21:55:14,001 DEBUG 
org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
hash '33b86fe798648d648b237ddfc986200d' for node 'router (my-ingress-2-in)-4' 
{id: 4, parallelism: 1, user function: }
2021-12-11 21:55:14,001 DEBUG 
org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
hash 'bd4c3fa1570bbcf606f2dabddd61ed7f' for node 'router (my-ingress-3-in)-6' 
{id: 6, parallelism: 1, user function: } {code}
.. and here is the same log section for the successful assignment:
{code:java}
2021-12-11 21:55:34,543 DEBUG 
org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
hash 'a1448ecf31ac98d2215c38bfd119abe0' for node 'router (my-ingress-3-in)-5' 
{id: 5, parallelism: 1, user function: }
2021-12-11 21:55:34,543 DEBUG 
org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
hash '05037ff96baea131d9cf1390846efd98' for node 'router (my-ingress-1-in)-4' 
{id: 4, parallelism: 1, user function: }
2021-12-11 21:55:34,543 DEBUG 
org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
hash '18666b435c78ee2416e74bb997b798a7' for node 'router (my-ingress-2-in)-6' 
{id: 6, parallelism: 1, user function: } {code}
As you can see, the hash "18666b435c78ee2416e74bb997b798a7" is generated and 
jobmanager could match the operator for the state loaded from the checkpoint 
and it could continue normal operation. Another thing to note is, router 
operators have different ids assigned between the 2 runs.

I took a look at StreamGraphHasherV2 code ([link|#L65]) there is an explicit 
attempt to have the operator order the same between different runs, however my 
Stateful Functions application seems to be able to avoid that attempt.

Since we can't assign operator ids when using Stateful Functions, is there 
anything I can do right to get it working correctly? Is this a bug, or am I 
trying it with a wrong combination of settings or something like that?

As a last note, I've also posted the same earlier to Stack Overflow, here is 
the 
[link|https://stackoverflow.com/questions/70316498/flink-statefun-high-availability-exception-java-lang-illegalstateexception-th]
 to the question.

Thanks


> Unable to (always) recover using checkpoint in HA setup (both Zookeeper and 
> Kubernetes)
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-25267
>                 URL: https://issues.apache.org/jira/browse/FLINK-25267
>             Project: Flink
>          Issue Type: Bug
>          Components: Deployment / Kubernetes, Stateful Functions
>    Affects Versions: 1.12.1, statefun-3.0.0, statefun-3.1.0, 1.13.2
>         Environment: MacOS 11.6, minikube v1.23.2, tried with both Stateful 
> Functions 3.0.0 and Stateful Functions 3.1.0
>            Reporter: Kerem Ulutaş
>            Priority: Major
>
> My Stateful Functions job is running on Kubernetes (minikube on my local env) 
> and has these settings:
>  * Using StateFun v3.1.0
>  * Checkpoints are stored on HDFS (state.checkpoint-storage: filesystem)
>  * Checkpointing mode is EXACTLY_ONCE
>  * State backend is rocksdb and incremental checkpointing is enabled
> When I kill the jobmanager (master) pod, minikube starts another pod and this 
> new pod fails when it tries to load last checkpoint:
> {code:java}
> ...
> 2021-12-11 14:25:26,426 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Initializing job myStatefunApp 
> (00000000000000000000000000000000).
> 2021-12-11 14:25:26,443 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Using restart back off time strategy 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, 
> backoffTimeMS=1000) for myStatefunApp (00000000000000000000000000000000).
> 2021-12-11 14:25:26,516 INFO  org.apache.flink.runtime.util.ZooKeeperUtils    
>              [] - Initialized DefaultCompletedCheckpointStore in 
> 'ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}'
>  with /checkpoints/00000000000000000000000000000000.
> 2021-12-11 14:25:26,599 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Running initialization on master for job myStatefunApp 
> (00000000000000000000000000000000).
> 2021-12-11 14:25:26,599 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Successfully ran initialization on master in 0 ms.
> 2021-12-11 14:25:26,617 INFO  
> org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - 
> Built 1 pipelined regions in 1 ms
> 2021-12-11 14:25:26,626 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Using job/cluster config to configure application-defined 
> state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, 
> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, 
> writeBatchSize=2097152}
> 2021-12-11 14:25:26,627 INFO  
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
> Using predefined options: DEFAULT.
> 2021-12-11 14:25:26,627 INFO  
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
> Using application-defined options factory: 
> DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=1}}.
> 2021-12-11 14:25:26,627 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Using application-defined state backend: 
> EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, 
> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, 
> writeBatchSize=2097152}
> 2021-12-11 14:25:26,631 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Checkpoint storage is set to 'filesystem': (checkpoints 
> "hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp")
> 2021-12-11 14:25:26,712 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Recovering checkpoints from 
> ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
> 2021-12-11 14:25:26,724 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Found 1 checkpoints in 
> ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
> 2021-12-11 14:25:26,725 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Trying to fetch 1 checkpoints from storage.
> 2021-12-11 14:25:26,725 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Trying to retrieve checkpoint 2.
> 2021-12-11 14:25:26,931 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring 
> job 00000000000000000000000000000000 from Checkpoint 2 @ 1639232587220 for 
> 00000000000000000000000000000000 located at 
> hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp/00000000000000000000000000000000/chk-2.
> 2021-12-11 14:25:27,012 ERROR 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error 
> occurred in the cluster entrypoint.
> org.apache.flink.util.FlinkException: JobMaster for job 
> 00000000000000000000000000000000 failed.
>     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:415)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?]
>     at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
> Source) ~[?:?]
>     at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) 
> ~[?:?]
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.actor.Actor.aroundReceive(Actor.scala:517) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [flink-dist_2.12-1.13.2.jar:1.13.2]
> Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
> not start the JobMaster.
>     at 
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown 
> Source) ~[?:?]
>     at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>  Source) ~[?:?]
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
> ~[?:?]
>     at java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: java.util.concurrent.CompletionException: 
> java.lang.IllegalStateException: There is no operator for the state 
> 18666b435c78ee2416e74bb997b798a7
>      at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown 
> Source) ~[?:?]
>     at java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
> Source) ~[?:?]
>     at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>  Source) ~[?:?]
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
> ~[?:?]
>     at java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: java.lang.IllegalStateException: There is no operator for the 
> state 18666b435c78ee2416e74bb997b798a7
>      at 
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:712)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:100)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) 
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>  Source) ~[?:?]
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
> ~[?:?]
>     at java.lang.Thread.run(Unknown Source) ~[?:?]
> 2021-12-11 14:25:27,017 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting 
> StatefulFunctionsClusterEntryPoint down with application status UNKNOWN. 
> Diagnostics Cluster entrypoint has been closed externally..
> 2021-12-11 14:25:27,021 INFO  
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting 
> down rest endpoint.
> 2021-12-11 14:25:27,025 INFO  org.apache.flink.runtime.blob.BlobServer        
>              [] - Stopped BLOB server at 0.0.0.0:6124
> 2021-12-11 14:25:27,034 INFO  
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing 
> cache directory 
> /tmp/flink-web-6c2dafc9-bb7d-489a-9e2d-cf78e3f19b67/flink-web-ui
> 2021-12-11 14:25:27,035 INFO  
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
> Stopping DefaultLeaderElectionService.
> 2021-12-11 14:25:27,035 INFO  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
> Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/rest_server_lock'}
> 2021-12-11 14:25:27,036 INFO  
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down 
> complete.
> 2021-12-11 14:25:27,036 INFO  
> org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
>  [] - Closing components.
> 2021-12-11 14:25:27,037 INFO  
> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
> Stopping DefaultLeaderRetrievalService.
> 2021-12-11 14:25:27,037 INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - 
> Closing 
> ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}.
> 2021-12-11 14:25:27,037 INFO  
> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
> Stopping DefaultLeaderRetrievalService.
> 2021-12-11 14:25:27,037 INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - 
> Closing 
> ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}.
> 2021-12-11 14:25:27,038 INFO  
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
> Stopping DefaultLeaderElectionService.
> 2021-12-11 14:25:27,038 INFO  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
> Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'}
> 2021-12-11 14:25:27,039 INFO  
> org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - 
> Stopping JobDispatcherLeaderProcess.
> 2021-12-11 14:25:27,040 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Closing the slot manager.
> 2021-12-11 14:25:27,040 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Suspending the slot manager.
> 2021-12-11 14:25:27,041 INFO  
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
> Stopping DefaultLeaderElectionService.
> 2021-12-11 14:25:27,041 INFO  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
> Closing 
> ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'} 
> {code}
>  
> But somehow, among several restarts, jobmanager can randomly restore job from 
> the last checkpoint. After I changed log level of Flink to DEBUG, I've 
> managed to get the difference between an unsuccessful (resulting in above 
> log) and a successful sequence of events. It seems that operators can get 
> assigned different hashes between restarts, here is the relevant log section 
> for the unsucessful assignment (renamed my operators for clarity):
>  
> {code:java}
> 2021-12-11 21:55:14,001 DEBUG 
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
> hash '32d5ca33c915e65563a5c7f4d62703ad' for node 'router (my-ingress-1-in)-5' 
> {id: 5, parallelism: 1, user function: }
> 2021-12-11 21:55:14,001 DEBUG 
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
> hash '33b86fe798648d648b237ddfc986200d' for node 'router (my-ingress-2-in)-4' 
> {id: 4, parallelism: 1, user function: }
> 2021-12-11 21:55:14,001 DEBUG 
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
> hash 'bd4c3fa1570bbcf606f2dabddd61ed7f' for node 'router (my-ingress-3-in)-6' 
> {id: 6, parallelism: 1, user function: } {code}
> .. and here is the same log section for the successful assignment:
> {code:java}
> 2021-12-11 21:55:34,543 DEBUG 
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
> hash 'a1448ecf31ac98d2215c38bfd119abe0' for node 'router (my-ingress-3-in)-5' 
> {id: 5, parallelism: 1, user function: }
> 2021-12-11 21:55:34,543 DEBUG 
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
> hash '05037ff96baea131d9cf1390846efd98' for node 'router (my-ingress-1-in)-4' 
> {id: 4, parallelism: 1, user function: }
> 2021-12-11 21:55:34,543 DEBUG 
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
> hash '18666b435c78ee2416e74bb997b798a7' for node 'router (my-ingress-2-in)-6' 
> {id: 6, parallelism: 1, user function: } {code}
> As you can see, the hash "18666b435c78ee2416e74bb997b798a7" is generated and 
> jobmanager could match the operator for the state loaded from the checkpoint 
> and it could continue normal operation. Another thing to note is, router 
> operators have different ids assigned between the 2 runs.
>  
> I took a look at StreamGraphHasherV2 code ([link|#L65]),]) there is an 
> explicit attempt to have the operator order the same between different 
> attempts, however my Stateful Functions application seems to be able to avoid 
> that attempt.
>  
> Since we can't assign operator ids when using Stateful Functions, is there 
> anything I can do right to get it working correctly? Is this a bug, or am I 
> trying it with a wrong combination of settings or something like that?
>  
> As a last note, I've also posted the same earlier to Stack Overflow, here is 
> the 
> [link|https://stackoverflow.com/questions/70316498/flink-statefun-high-availability-exception-java-lang-illegalstateexception-th]
>  to the question.
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to