[ 
https://issues.apache.org/jira/browse/FLINK-25267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias Pohl updated FLINK-25267:
----------------------------------
    Component/s: Runtime / Checkpointing

> Unable to (always) recover using checkpoint in HA setup (both Zookeeper and 
> Kubernetes)
> ---------------------------------------------------------------------------------------
>
>                 Key: FLINK-25267
>                 URL: https://issues.apache.org/jira/browse/FLINK-25267
>             Project: Flink
>          Issue Type: Bug
>          Components: Deployment / Kubernetes, Runtime / Checkpointing, 
> Stateful Functions
>    Affects Versions: 1.12.1, statefun-3.0.0, statefun-3.1.0, 1.13.2
>         Environment: MacOS 11.6, minikube v1.23.2, tried with both Stateful 
> Functions 3.0.0 and Stateful Functions 3.1.0
>            Reporter: Kerem Ulutaş
>            Priority: Major
>
> My Stateful Functions job is running on Kubernetes (minikube on my local env) 
> and has these settings:
>  * Using StateFun v3.1.0
>  * Checkpoints are stored on HDFS (state.checkpoint-storage: filesystem)
>  * Checkpointing mode is EXACTLY_ONCE
>  * State backend is rocksdb and incremental checkpointing is enabled
> When I kill the jobmanager (master) pod, minikube starts another pod and this 
> new pod fails when it tries to load last checkpoint:
> {code:java}
> ...
> 2021-12-11 14:25:26,426 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Initializing job myStatefunApp 
> (00000000000000000000000000000000).
> 2021-12-11 14:25:26,443 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Using restart back off time strategy 
> FixedDelayRestartBackoffTimeStrategy(maxNumberRestartAttempts=2147483647, 
> backoffTimeMS=1000) for myStatefunApp (00000000000000000000000000000000).
> 2021-12-11 14:25:26,516 INFO  org.apache.flink.runtime.util.ZooKeeperUtils    
>              [] - Initialized DefaultCompletedCheckpointStore in 
> 'ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}'
>  with /checkpoints/00000000000000000000000000000000.
> 2021-12-11 14:25:26,599 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Running initialization on master for job myStatefunApp 
> (00000000000000000000000000000000).
> 2021-12-11 14:25:26,599 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Successfully ran initialization on master in 0 ms.
> 2021-12-11 14:25:26,617 INFO  
> org.apache.flink.runtime.scheduler.adapter.DefaultExecutionTopology [] - 
> Built 1 pipelined regions in 1 ms
> 2021-12-11 14:25:26,626 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Using job/cluster config to configure application-defined 
> state backend: EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, 
> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, 
> writeBatchSize=2097152}
> 2021-12-11 14:25:26,627 INFO  
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
> Using predefined options: DEFAULT.
> 2021-12-11 14:25:26,627 INFO  
> org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend [] - 
> Using application-defined options factory: 
> DefaultConfigurableOptionsFactory{configuredOptions={state.backend.rocksdb.thread.num=1}}.
> 2021-12-11 14:25:26,627 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Using application-defined state backend: 
> EmbeddedRocksDBStateBackend{, localRocksDbDirectories=null, 
> enableIncrementalCheckpointing=TRUE, numberOfTransferThreads=1, 
> writeBatchSize=2097152}
> 2021-12-11 14:25:26,631 INFO  org.apache.flink.runtime.jobmaster.JobMaster    
>              [] - Checkpoint storage is set to 'filesystem': (checkpoints 
> "hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp")
> 2021-12-11 14:25:26,712 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Recovering checkpoints from 
> ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
> 2021-12-11 14:25:26,724 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Found 1 checkpoints in 
> ZooKeeperStateHandleStore{namespace='statefun_zk_recovery/my-statefun-app/checkpoints/00000000000000000000000000000000'}.
> 2021-12-11 14:25:26,725 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Trying to fetch 1 checkpoints from storage.
> 2021-12-11 14:25:26,725 INFO  
> org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - 
> Trying to retrieve checkpoint 2.
> 2021-12-11 14:25:26,931 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator    [] - Restoring 
> job 00000000000000000000000000000000 from Checkpoint 2 @ 1639232587220 for 
> 00000000000000000000000000000000 located at 
> hdfs://hdfs-namenode:8020/tmp/statefun_checkpoints/myStatefunApp/00000000000000000000000000000000/chk-2.
> 2021-12-11 14:25:27,012 ERROR 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Fatal error 
> occurred in the cluster entrypoint.
> org.apache.flink.util.FlinkException: JobMaster for job 
> 00000000000000000000000000000000 failed.
>     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:415)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at java.util.concurrent.CompletableFuture.uniHandle(Unknown Source) ~[?:?]
>     at java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown 
> Source) ~[?:?]
>     at java.util.concurrent.CompletableFuture$Completion.run(Unknown Source) 
> ~[?:?]
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.actor.Actor.aroundReceive(Actor.scala:517) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
> [flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  [flink-dist_2.12-1.13.2.jar:1.13.2]
> Caused by: org.apache.flink.runtime.client.JobInitializationException: Could 
> not start the JobMaster.
>     at 
> org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at java.util.concurrent.CompletableFuture.uniWhenComplete(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(Unknown 
> Source) ~[?:?]
>     at java.util.concurrent.CompletableFuture.postComplete(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>  Source) ~[?:?]
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
> ~[?:?]
>     at java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: java.util.concurrent.CompletionException: 
> java.lang.IllegalStateException: There is no operator for the state 
> 18666b435c78ee2416e74bb997b798a7
>      at java.util.concurrent.CompletableFuture.encodeThrowable(Unknown 
> Source) ~[?:?]
>     at java.util.concurrent.CompletableFuture.completeThrowable(Unknown 
> Source) ~[?:?]
>     at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>  Source) ~[?:?]
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
> ~[?:?]
>     at java.lang.Thread.run(Unknown Source) ~[?:?]
> Caused by: java.lang.IllegalStateException: There is no operator for the 
> state 18666b435c78ee2416e74bb997b798a7
>      at 
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.checkStateMappingCompleteness(StateAssignmentOperation.java:712)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.checkpoint.StateAssignmentOperation.assignStates(StateAssignmentOperation.java:100)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1562)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) 
> ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at 
> org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
>  ~[flink-dist_2.12-1.13.2.jar:1.13.2]
>     at java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.FutureTask.run(Unknown Source) ~[?:?]
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown
>  Source) ~[?:?]
>     at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) 
> ~[?:?]
>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) 
> ~[?:?]
>     at java.lang.Thread.run(Unknown Source) ~[?:?]
> 2021-12-11 14:25:27,017 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint        [] - Shutting 
> StatefulFunctionsClusterEntryPoint down with application status UNKNOWN. 
> Diagnostics Cluster entrypoint has been closed externally..
> 2021-12-11 14:25:27,021 INFO  
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shutting 
> down rest endpoint.
> 2021-12-11 14:25:27,025 INFO  org.apache.flink.runtime.blob.BlobServer        
>              [] - Stopped BLOB server at 0.0.0.0:6124
> 2021-12-11 14:25:27,034 INFO  
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Removing 
> cache directory 
> /tmp/flink-web-6c2dafc9-bb7d-489a-9e2d-cf78e3f19b67/flink-web-ui
> 2021-12-11 14:25:27,035 INFO  
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
> Stopping DefaultLeaderElectionService.
> 2021-12-11 14:25:27,035 INFO  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
> Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/rest_server_lock'}
> 2021-12-11 14:25:27,036 INFO  
> org.apache.flink.runtime.jobmaster.MiniDispatcherRestEndpoint [] - Shut down 
> complete.
> 2021-12-11 14:25:27,036 INFO  
> org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponent
>  [] - Closing components.
> 2021-12-11 14:25:27,037 INFO  
> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
> Stopping DefaultLeaderRetrievalService.
> 2021-12-11 14:25:27,037 INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - 
> Closing 
> ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/dispatcher_lock'}.
> 2021-12-11 14:25:27,037 INFO  
> org.apache.flink.runtime.leaderretrieval.DefaultLeaderRetrievalService [] - 
> Stopping DefaultLeaderRetrievalService.
> 2021-12-11 14:25:27,037 INFO  
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - 
> Closing 
> ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/resource_manager_lock'}.
> 2021-12-11 14:25:27,038 INFO  
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
> Stopping DefaultLeaderElectionService.
> 2021-12-11 14:25:27,038 INFO  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
> Closing ZooKeeperLeaderElectionDriver{leaderPath='/leader/dispatcher_lock'}
> 2021-12-11 14:25:27,039 INFO  
> org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcess [] - 
> Stopping JobDispatcherLeaderProcess.
> 2021-12-11 14:25:27,040 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Closing the slot manager.
> 2021-12-11 14:25:27,040 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.DeclarativeSlotManager 
> [] - Suspending the slot manager.
> 2021-12-11 14:25:27,041 INFO  
> org.apache.flink.runtime.leaderelection.DefaultLeaderElectionService [] - 
> Stopping DefaultLeaderElectionService.
> 2021-12-11 14:25:27,041 INFO  
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - 
> Closing 
> ZooKeeperLeaderElectionDriver{leaderPath='/leader/resource_manager_lock'} 
> {code}
>  
> But somehow, among several restarts, jobmanager can randomly restore job from 
> the last checkpoint. After I changed log level of Flink to DEBUG, I've 
> managed to get the difference between an unsuccessful (resulting in above 
> log) and a successful sequence of events. It seems that operators can get 
> assigned different hashes between restarts, here is the relevant log section 
> for the unsucessful assignment (renamed my operators for clarity):
>  
> {code:java}
> 2021-12-11 21:55:14,001 DEBUG 
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
> hash '32d5ca33c915e65563a5c7f4d62703ad' for node 'router (my-ingress-1-in)-5' 
> {id: 5, parallelism: 1, user function: }
> 2021-12-11 21:55:14,001 DEBUG 
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
> hash '33b86fe798648d648b237ddfc986200d' for node 'router (my-ingress-2-in)-4' 
> {id: 4, parallelism: 1, user function: }
> 2021-12-11 21:55:14,001 DEBUG 
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
> hash 'bd4c3fa1570bbcf606f2dabddd61ed7f' for node 'router (my-ingress-3-in)-6' 
> {id: 6, parallelism: 1, user function: } {code}
> .. and here is the same log section for the successful assignment:
> {code:java}
> 2021-12-11 21:55:34,543 DEBUG 
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
> hash 'a1448ecf31ac98d2215c38bfd119abe0' for node 'router (my-ingress-3-in)-5' 
> {id: 5, parallelism: 1, user function: }
> 2021-12-11 21:55:34,543 DEBUG 
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
> hash '05037ff96baea131d9cf1390846efd98' for node 'router (my-ingress-1-in)-4' 
> {id: 4, parallelism: 1, user function: }
> 2021-12-11 21:55:34,543 DEBUG 
> org.apache.flink.streaming.api.graph.StreamGraphHasherV2     [] - Generated 
> hash '18666b435c78ee2416e74bb997b798a7' for node 'router (my-ingress-2-in)-6' 
> {id: 6, parallelism: 1, user function: } {code}
> As you can see, the hash "18666b435c78ee2416e74bb997b798a7" is generated and 
> jobmanager could match the operator for the state loaded from the checkpoint 
> and it could continue normal operation. Another thing to note is, router 
> operators have different ids assigned between the 2 runs.
>  
> I took a look at StreamGraphHasherV2 code 
> ([link|https://github.com/apache/flink/blob/release-1.13.2/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java#L65])
>  there is an explicit attempt to have the operator order the same between 
> different attempts, however my Stateful Functions application seems to be 
> able to avoid that attempt.
>  
> Since we can't assign operator ids when using Stateful Functions, is there 
> anything I can do right to get it working correctly? Is this a bug, or am I 
> trying it with a wrong combination of settings or something like that?
>  
> As a last note, I've also posted the same earlier to Stack Overflow, here is 
> the 
> [link|https://stackoverflow.com/questions/70316498/flink-statefun-high-availability-exception-java-lang-illegalstateexception-th]
>  to the question.
> Thanks



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to