[ https://issues.apache.org/jira/browse/FLINK-11225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16729531#comment-16729531 ]
TisonKun commented on FLINK-11225: ---------------------------------- Hi [~dongtingting8...@163.com], this issue is likely duplicated by FLINK-10011. At that time [~till.rohrmann] sent a [pr|https://github.com/apache/flink/pull/6587] as a fix, which based on an assumption that the JobGraph can be faster released than recovered and brought into a globally terminal state by another JobManager/JobMaster. This is not the case in your situation, so we run into the same problem. FLINK-10333 is a follow-up issue to this. The community is rethinking ZK based store to avoid all of this kind of race conditions. Mainly we might be likely to communicate with ZK with a session id to avoid modifying SubmittedJobGraphStore even if Dispatcher is not leader. > Error state of addedJobGraphs when Dispatcher with concurrent revoking and > granting leadership > ---------------------------------------------------------------------------------------------- > > Key: FLINK-11225 > URL: https://issues.apache.org/jira/browse/FLINK-11225 > Project: Flink > Issue Type: Bug > Components: Core, Distributed Coordination > Affects Versions: 1.6.2 > Environment: flink 1.6.2 on yarn > Reporter: dongtingting > Priority: Major > > Dispatcher was revoked leadership and immediately grant leadership in some > cases like appmaster go through a long time of full gc。 This can lead to > Dispatcher.'revokeLeadership' and 'grantLeadership' concurrently run。Then > ZooKeeperSubmittedJobGraphStore may 'recoverJobGraph' happen before > 'releaseJobGraph',and addedJobGraphs in ZooKeeperSubmittedJobGraphStore do > not contain the running job。 Later when we cancle the running job, cant not > remove jobgraph from zk $basedir/jobgraphs/$job_id. If appmaster restart it > will recover the cancled job. > case, appmaster log: > 2018-12-08 21:12:03,729 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn[main-SendThread(******:2181)] > - *Client session timed out*, have not heard from server in 40082ms for > sessionid 0x1657682ceee6082, closing socket connection and attempting > reconnect > 2018-12-08 21:12:03,978 INFO > org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager[main-EventThread] > - State change: SUSPENDED > 2018-12-08 21:12:03,980 WARN > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService[Curator-ConnectionStateManager-0] > - Connection to ZooKeeper suspended. Can no longer retrieve the leader from > ZooKeeper. > 2018-12-08 21:12:03,980 WARN > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService[Curator-ConnectionStateManager-0] > - Connection to ZooKeeper suspended. The contender > [http://***:***|http://%2A%2A%2A%2A%2A%2A/] no longer participates in the > leader election. > 2018-12-08 21:12:03,980 WARN > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService[Curator-ConnectionStateManager-0] > - Connection to ZooKeeper suspended. Can no longer retrieve the leader from > ZooKeeper. > 2018-12-08 21:12:03,981 WARN > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService[Curator-ConnectionStateManager-0] > - Connection to ZooKeeper suspended. The contender > akka.tcp://fl...@bjm7-lc453.jxq:44815/user/resourcemanager no longer > participates in the leader election. > 2018-12-08 21:12:03,982 INFO > org.apache.flink.runtime.jobmaster.JobManagerRunner > [Curator-ConnectionStateManager-0] - JobManager for job job_*** > (2a16bfa299b56432e1141df3b1361fbc) was *revoked* *leadership* at > akka.tcp://flink@****:***/user/jobmanager_0. > 2018-12-08 21:12:03,984 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService[flink-akka.actor.default-dispatcher-186] > - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. > 2018-12-08 21:12:03,986 INFO > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint > [Curator-ConnectionStateManager-0] - > [http://****:***|http://%2A%2A%2A%2A%2A%2A%2A/] lost leadership > 2018-12-08 21:12:03,986 INFO org.apache.flink.yarn.YarnResourceManager > [flink-akka.actor.default-dispatcher-287] - ResourceManager > akka.tcp://flink@****:***/user/resourcemanager was *revoked* *leadership*. > Clearing fencing token. > 2018-12-08 21:12:03,986 INFO > org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService[flink-akka.actor.default-dispatcher-287] > - Stopping ZooKeeperLeaderRetrievalService > /leader/2a16bfa299b56432e1141df3b1361fbc/job_manager_lock. > 2018-12-08 21:12:03,990 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher > [flink-akka.actor.default-dispatcher-281] - *Dispatcher* > akka.tcp://fl...@bjm7-lc453.jxq:44815/user/dispatcher was *revoked > leadership.* > 2018-12-08 21:12:03,990 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher > [flink-akka.actor.default-dispatcher-281] - Stopping all currently running > jobs of dispatcher akka.tcp://flink@****:***/user/dispatcher. > 2018-12-08 21:12:04,181 INFO > org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn[main-SendThread(10.54.33.12:2181)] > - Session establishment complete on server ****/****:2181, sessionid = > 0x1657682ceee6082, negotiated timeout = 60000 > 2018-12-08 21:12:04,181 INFO > org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager[main-EventThread] > - State change: *RECONNECTED* > 2018-12-08 21:12:04,188 INFO > org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [main-EventThread] > - [http://bjm7-lc453.jxq:45684|http://bjm7-lc453.jxq:45684/] *was granted > leadership* with leaderSessionID=43f8a4f4-d3a6-48fb-afef-1f2f03ad5626 > 2018-12-08 21:12:04,188 INFO org.apache.flink.yarn.YarnResourceManager > [flink-akka.actor.default-dispatcher-281] - ResourceManager > akka.tcp://fl...@bjm7-lc453.jxq:44815/user/resourcemanager was granted > leadership with fencing token acacde8ee4e115851f872189e7064971 > 2018-12-08 21:12:04,188 INFO > org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager[flink-akka.actor.default-dispatcher-281] > - Starting the SlotManager. > 2018-12-08 21:12:04,190 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher > [flink-akka.actor.default-dispatcher-218] - Dispatcher > akka.tcp://flink@****:***/user/dispatcher was granted leadership with fencing > token 8fc420d6-5526-41b0-ac0f-881437c55919 > 2018-12-08 21:12:04,190 INFO > org.apache.flink.runtime.dispatcher.StandaloneDispatcher > [flink-akka.actor.default-dispatcher-276] - *Recovering all persisted jobs*. > 2018-12-08 21:12:04,624 INFO org.apache.flink.runtime.jobmaster.JobMaster > [flink-akka.actor.default-dispatcher-186] - Stopping the JobMaster for job > job_***(2a16bfa299b56432e1141df3b1361fbc). > 2018-12-08 21:12:04,648 INFO > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore[flink-akka.actor.default-dispatcher-276] > -{color:#ff0000} Recovered > SubmittedJobGraph(2a16bfa299b56432e1141df3b1361fbc, null){color}. > 2018-12-08 21:12:04,665 INFO > org.apache.flink.runtime.jobmaster.slotpool.SlotPool > [flink-akka.actor.default-dispatcher-242] - Stopping SlotPool. > 2018-12-08 21:12:04,673 INFO > org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService[flink-akka.actor.default-dispatcher-186] > - Stopping ZooKeeperLeaderElectionService > ZooKeeperLeaderElectionService\{leaderPath='/leader/2a16bfa299b56432e1141df3b1361fbc/job_manager_lock'}. > 2018-12-08 21:12:08,283 INFO > org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore[flink-akka.actor.default-dispatcher-218] > -{color:#ff0000} Released locks of job graph > 2a16bfa299b56432e1141df3b1361fbc{color} from ZooKeeper. > > here, ZooKeeperSubmittedJobGraphStore do not contain the running job > 2a16bfa299b56432e1141df3b1361fbc > later the job is cancled ,but when appmaster restart, it will try to recover > the cancled job and fail. appmaster log: > > 2018-12-08 22:02:30,160 ERROR > org.apache.flink.runtime.entrypoint.ClusterEntrypoint > [flink-akka.actor.default-dispatcher-670] - Fatal error occurred in the > cluster entrypoint. > java.lang.RuntimeException: > org.apache.flink.runtime.client.JobExecutionException: Could not set up > JobManager > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36) > at > java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) > at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not > set up JobManager > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176) > at > org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058) > at > org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308) > at > org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34) > ... 7 more > Caused by: java.lang.Exception: Cannot set up the user code libraries: File > does not exist: > /home/flink/data/flink/state/**/***/zk/**/blob/job_2a16bfa299b56432e1141df3b1361fbc/blob_p-ce59b177934b5091b6aa0f244265465fce2a6b9b-32b9e2c3e16da7bc6e27339e5bd19bef > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66) > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2028) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1998) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1911) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:572) > at > org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:89) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1076) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2250) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2246) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1796) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2244) > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:134) > ... 10 more > Caused by: java.io.FileNotFoundException: File does not exist: > /home/flink/data/flink/state/***/****/zk/***/blob/job_{color:#ff0000}2a16bfa299b56432e1141df3b1361fbc{color}/blob_p-ce59b177934b5091b6aa0f244265465fce2a6b9b-32b9e2c3e16da7bc6e27339e5bd19bef > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66) > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2028) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1998) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1911) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:572) > at > org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:89) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1076) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2250) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2246) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1796) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2244) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at > org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106) > at > org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73) > at > org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1211) > at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1199) > at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1189) > at > org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:275) > at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:242) > at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:235) > at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1487) > at > org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:302) > at > org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298) > at > org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81) > at > org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298) > at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:120) > at > org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:37) > at > org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:102) > at > org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:84) > at > org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:493) > at > org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:444) > at org.apache.flink.runtime.blob.BlobServer.getFile(BlobServer.java:417) > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120) > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerJob(BlobLibraryCacheManager.java:91) > at > org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:131) > ... 10 more > Caused by: > org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File > does not exist: > /home/flink/data/flink/state/**/***/zk/***/blob/{color:#ff0000}job_2a16bfa299b56432e1141df3b1361fbc{color}/blob_p-ce59b177934b5091b6aa0f244265465fce2a6b9b-32b9e2c3e16da7bc6e27339e5bd19bef > at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66) > at > org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2028) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1998) > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1911) > at > org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:572) > at > org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:89) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365) > at > org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617) > at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1076) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2250) > at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2246) > at java.security.AccessController.doPrivileged(Native Method) > at javax.security.auth.Subject.doAs(Subject.java:422) > at > org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1796) > at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2244) > at org.apache.hadoop.ipc.Client.call(Client.java:1470) > at org.apache.hadoop.ipc.Client.call(Client.java:1401) > at > org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232) > at com.sun.proxy.$Proxy9.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:254) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187) > at > org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102) > at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source) > at > org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1209) > ... 31 more > -- This message was sent by Atlassian JIRA (v7.6.3#76005)