[ https://issues.apache.org/jira/browse/FLINK-24543?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Aitozi updated FLINK-24543: --------------------------- Summary: Zookeeper connection issue causes inconsistent state in Flink (was: jar) > Zookeeper connection issue causes inconsistent state in Flink > -------------------------------------------------------------- > > Key: FLINK-24543 > URL: https://issues.apache.org/jira/browse/FLINK-24543 > Project: Flink > Issue Type: Bug > Components: Runtime / Coordination > Affects Versions: 1.14.0, 1.13.3 > Reporter: Jun Qin > Assignee: David Morávek > Priority: Blocker > Labels: pull-request-available > > Env: Flink 1.13.2 with Zookeeper HA > Here is what happened: > {code:bash} > # checkpoint 1116 was triggered > 2021-10-09 00:16:49,555 [Checkpoint Timer] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Triggering > checkpoint 1116 (type=CHECKPOINT) @ 1633738609538 for job > a8a4fb85b681a897ba118db64333c9e5. > # a few seconds later, zookeeper connection suspended, it turned out to be a > disk issue at zookeeper side caused slow fsync and commit) > 2021-10-09 00:16:58,563 [Curator-ConnectionStateManager-0] WARN > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - > Connection to ZooKeeper suspended. Can no longer retrieve the leader from > ZooKeeper. > 2021-10-09 00:16:58,563 [Curator-ConnectionStateManager-0] WARN > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - > Connection to ZooKeeper suspended. The contender LeaderContender: > DefaultDispatcherRunner no longer participates in the leader election. > # job was switching to suspended > 2021-10-09 00:16:58,564 [flink-akka.actor.default-dispatcher-61] INFO > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - > Disconnect job manager > b79b79fe513fb5f47e7bf447b7d94...@akka.tcp://flink@flink-...-jobmanager:50010/user/rpc/jobmanager_3 > for job a8a4fb85b681a897ba118db64333c9e5 from the resource manager. > 2021-10-09 00:16:58,565 [flink-akka.actor.default-dispatcher-92] INFO > org.apache.flink.runtime.resourcemanager.StandaloneResourceManager [] - > Registering job manager > b79b79fe513fb5f47e7bf447b7d94...@akka.tcp://flink@flink-...-jobmanager:50010/user/rpc/jobmanager_3 > for job a8a4fb85b681a897ba118db64333c9e5. > 2021-10-09 00:16:58,565 [flink-akka.actor.default-dispatcher-90] INFO > org.apache.flink.runtime.jobmaster.JobMaster [] - Stopping > the JobMaster for job Flink ...(a8a4fb85b681a897ba118db64333c9e5). > 2021-10-09 00:16:58,567 [flink-akka.actor.default-dispatcher-90] INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job Flink > ... (a8a4fb85b681a897ba118db64333c9e5) switched from state RUNNING to > SUSPENDED. > 2021-10-09 00:16:58,570 [flink-akka.actor.default-dispatcher-86] INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalDriver [] - > Closing > ZookeeperLeaderRetrievalDriver{retrievalPath='/leader/a8a4fb85b681a897ba118db64333c9e5/job_manager_lock'}. > 2021-10-09 00:16:58,667 [flink-akka.actor.default-dispatcher-92] INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher [] - Job > a8a4fb85b681a897ba118db64333c9e5 reached terminal state SUSPENDED. > # zookeeper connector restored > 2021-10-09 00:17:08,225 [Curator-ConnectionStateManager-0] INFO > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - > Connection to ZooKeeper was reconnected. Leader election can be restarted. > # received checkpoint acknowledgement, trying to finalize it, then failed to > add to zookeeper due to KeeperException$NodeExistsException > 2021-10-09 00:17:14,382 [flink-akka.actor.default-dispatcher-90] INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Source: ... > (1/5) (09d25852e3e206d6b7fe0d6bc965870f) switched from RUNNING to CANCELING. > 2021-10-09 00:17:14,382 [jobmanager-future-thread-1] WARN > org.apache.flink.runtime.jobmaster.JobMaster [] - Error while > processing AcknowledgeCheckpoint message > org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete > the pending checkpoint 1116. Failure reason: Failure to finalize checkpoint. > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1227) > > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.receiveAcknowledgeMessage(CheckpointCoordinator.java:1072) > > at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$acknowledgeCheckpoint$1(ExecutionGraphHandler.java:89) > > at > org.apache.flink.runtime.scheduler.ExecutionGraphHandler.lambda$processCheckpointCoordinatorMessage$3(ExecutionGraphHandler.java:119) > > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) [?:?] > at java.util.concurrent.FutureTask.run(FutureTask.java:264) [?:?] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > [?:?] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > [?:?] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > [?:?] > at java.lang.Thread.run(Thread.java:834) [?:?] > Caused by: > org.apache.flink.runtime.persistence.StateHandleStore$AlreadyExistException: > ZooKeeper node /0000000000000001116 already exists. > at > org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.lambda$addAndLock$0(ZooKeeperStateHandleStore.java:179) > > at java.util.Optional.map(Optional.java:265) ~[?:?] > at > org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:177) > > at > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.addCheckpoint(DefaultCompletedCheckpointStore.java:182) > > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1209) > > ... 9 more > Caused by: > org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException$NodeExistsException: > KeeperErrorCode = NodeExists > at > org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.KeeperException.create(KeeperException.java:122) > ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1] > at > org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper.multiInternal(ZooKeeper.java:1015) > ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1] > at > org.apache.flink.shaded.zookeeper3.org.apache.zookeeper.ZooKeeper.multi(ZooKeeper.java:919) > ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1] > at > org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorTransactionImpl.doOperation(CuratorTransactionImpl.java:197) > ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1] > at > org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorTransactionImpl.access$000(CuratorTransactionImpl.java:37) > ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1] > at > org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorTransactionImpl$2.call(CuratorTransactionImpl.java:130) > ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1] > at > org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorTransactionImpl$2.call(CuratorTransactionImpl.java:126) > ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1] > at > org.apache.flink.shaded.curator4.org.apache.curator.connection.StandardConnectionHandlingPolicy.callWithRetry(StandardConnectionHandlingPolicy.java:64) > ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1] > at > org.apache.flink.shaded.curator4.org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:100) > ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1] > at > org.apache.flink.shaded.curator4.org.apache.curator.framework.imps.CuratorTransactionImpl.commit(CuratorTransactionImpl.java:123) > ~[flink-shaded-zookeeper-3.4.14.jar:3.4.14-13.0-stream1] > at > org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.writeStoreHandleTransactionally(ZooKeeperStateHandleStore.java:204) > > at > org.apache.flink.runtime.zookeeper.ZooKeeperStateHandleStore.addAndLock(ZooKeeperStateHandleStore.java:165) > > at > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.addCheckpoint(DefaultCompletedCheckpointStore.java:182) > > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.completePendingCheckpoint(CheckpointCoordinator.java:1209) > > ... 9 more > # checkpoint coordinator was stopping > 2021-10-09 00:17:14,385 [flink-akka.actor.default-dispatcher-90] INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Stopping > checkpoint coordinator for job a8a4fb85b681a897ba118db64333c9e5. > 2021-10-09 00:17:14,401 [flink-akka.actor.default-dispatcher-90] INFO > org.apache.flink.runtime.executiongraph.ExecutionGraph [] - Job > a8a4fb85b681a897ba118db64333c9e5 has been suspended. > # clean up > 2021-10-09 00:17:14,403 > [AkkaRpcService-Supervisor-Termination-Future-Executor-thread-1] INFO > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionDriver [] - > Closing > ZooKeeperLeaderElectionDriver{leaderPath='/leader/a8a4fb85b681a897ba118db64333c9e5/job_manager_lock'} > 2021-10-09 00:17:14,404 [cluster-io-thread-2] INFO > org.apache.flink.runtime.jobmanager.DefaultJobGraphStore [] - Released > job graph a8a4fb85b681a897ba118db64333c9e5 from > ZooKeeperStateHandleStore{namespace='flink/flink-.../jobgraphs'}. > # however, during recovery, checkpoint 1116 was found in zookeeper, but the > metadata file /mnt/flink/.flink/ha/flink-.../completedCheckpoint42683d1121c7 > was cleaned up due to the KeeperException$NodeExistsException happened before > 2021-10-09 00:18:18,678 [jobmanager-future-thread-1] INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - > Recovering checkpoints from > ZooKeeperStateHandleStore{namespace='flink/flink-.../checkpoints/a8a4fb85b681a897ba118db64333c9e5'}. > 2021-10-09 00:18:18,686 [jobmanager-future-thread-1] INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - > Found 4 checkpoints in > ZooKeeperStateHandleStore{namespace='flink/flink-.../checkpoints/a8a4fb85b681a897ba118db64333c9e5'}. > 2021-10-09 00:18:18,686 [jobmanager-future-thread-1] INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - > Trying to fetch 4 checkpoints from storage. > 2021-10-09 00:18:18,686 [jobmanager-future-thread-1] INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - > Trying to retrieve checkpoint 1113. > 2021-10-09 00:18:18,689 [jobmanager-future-thread-1] INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - > Trying to retrieve checkpoint 1114. > 2021-10-09 00:18:18,691 [jobmanager-future-thread-1] INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - > Trying to retrieve checkpoint 1115. > 2021-10-09 00:18:18,693 [jobmanager-future-thread-1] INFO > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore [] - > Trying to retrieve checkpoint 1116. > 2021-10-09 00:18:18,700 [flink-akka.actor.default-dispatcher-18] ERROR > org.apache.flink.runtime.entrypoint.ClusterEntrypoint [] - Fatal error > occurred in the cluster entrypoint. > org.apache.flink.util.FlinkException: JobMaster for job > a8a4fb85b681a897ba118db64333c9e5 failed. > at > org.apache.flink.runtime.dispatcher.Dispatcher.jobMasterFailed(Dispatcher.java:873) > > at > org.apache.flink.runtime.dispatcher.Dispatcher.jobManagerRunnerFailed(Dispatcher.java:459) > > at > org.apache.flink.runtime.dispatcher.Dispatcher.handleJobManagerRunnerResult(Dispatcher.java:436) > > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$runJob$3(Dispatcher.java:415) > > at > java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930) > ~[?:?] > at > java.util.concurrent.CompletableFuture$UniHandle.tryFire(CompletableFuture.java:907) > ~[?:?] > at > java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:478) > ~[?:?] > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:440) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:208) > > at > org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77) > > at > org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158) > > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) > at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) > at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) > at akka.actor.Actor.aroundReceive(Actor.scala:517) > at akka.actor.Actor.aroundReceive$(Actor.scala:515) > at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) > at akka.actor.ActorCell.invoke(ActorCell.scala:561) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) > at akka.dispatch.Mailbox.run(Mailbox.scala:225) > at akka.dispatch.Mailbox.exec(Mailbox.scala:235) > at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > Caused by: org.apache.flink.runtime.client.JobInitializationException: Could > not start the JobMaster. > at > org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) > > at > java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859) > ~[?:?] > at > java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837) > ~[?:?] > at > java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506) > ~[?:?] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705) > ~[?:?] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?] > at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ~[?:?] > at java.lang.Thread.run(Thread.java:834) ~[?:?] > Caused by: java.util.concurrent.CompletionException: > java.lang.RuntimeException: org.apache.flink.util.FlinkException: Could not > retrieve checkpoint 1116 from state handle under /0000000000000001116. This > indicates that the retrieved state handle is broken. Try cleaning the state > handle store. > at > java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314) > ~[?:?] > at > java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319) > ~[?:?] > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702) > ~[?:?] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?] > at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ~[?:?] > at java.lang.Thread.run(Thread.java:834) ~[?:?] > Caused by: java.lang.RuntimeException: org.apache.flink.util.FlinkException: > Could not retrieve checkpoint 1116 from state handle under > /0000000000000001116. This indicates that the retrieved state handle is > broken. Try cleaning the state handle store. > at > org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:316) > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114) > > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) > ~[?:?] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?] > at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ~[?:?] > at java.lang.Thread.run(Thread.java:834) ~[?:?] > Caused by: org.apache.flink.util.FlinkException: Could not retrieve > checkpoint 1116 from state handle under /0000000000000001116. This indicates > that the retrieved state handle is broken. Try cleaning the state handle > store. > at > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.retrieveCompletedCheckpoint(DefaultCompletedCheckpointStore.java:309) > > at > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.recover(DefaultCompletedCheckpointStore.java:151) > > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1513) > > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476) > > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134) > > at > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342) > > at > org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190) > > at > org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122) > > at > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132) > > at > org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110) > > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340) > > at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107) > > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) > > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) > > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) > ~[?:?] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?] > at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ~[?:?] > at java.lang.Thread.run(Thread.java:834) ~[?:?] > Caused by: java.io.FileNotFoundException: > /mnt/flink/.flink/ha/flink-.../completedCheckpoint42683d1121c7 (No such file > or directory) > at java.io.FileInputStream.open0(Native Method) ~[?:?] > at java.io.FileInputStream.open(FileInputStream.java:219) ~[?:?] > at java.io.FileInputStream.<init>(FileInputStream.java:157) ~[?:?] > at > org.apache.flink.core.fs.local.LocalDataInputStream.<init>(LocalDataInputStream.java:50) > > at > org.apache.flink.core.fs.local.LocalFileSystem.open(LocalFileSystem.java:134) > at > org.apache.flink.runtime.state.filesystem.FileStateHandle.openInputStream(FileStateHandle.java:68) > > at > org.apache.flink.runtime.state.RetrievableStreamStateHandle.openInputStream(RetrievableStreamStateHandle.java:66) > > at > org.apache.flink.runtime.state.RetrievableStreamStateHandle.retrieveState(RetrievableStreamStateHandle.java:58) > > at > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.retrieveCompletedCheckpoint(DefaultCompletedCheckpointStore.java:298) > > at > org.apache.flink.runtime.checkpoint.DefaultCompletedCheckpointStore.recover(DefaultCompletedCheckpointStore.java:151) > > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreLatestCheckpointedStateInternal(CheckpointCoordinator.java:1513) > > at > org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreInitialCheckpointIfPresent(CheckpointCoordinator.java:1476) > > at > org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:134) > > at > org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:342) > > at > org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:190) > > at > org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:122) > > at > org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:132) > > at > org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:110) > > at > org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:340) > > at > org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:317) > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:107) > > at > org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95) > > at > org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112) > > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) > ~[?:?] > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[?:?] > at java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[?:?] > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) > ~[?:?] > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) > ~[?:?] > at java.lang.Thread.run(Thread.java:834) ~[?:?] > {code} -- This message was sent by Atlassian Jira (v8.3.4#803005)