[ 
https://issues.apache.org/jira/browse/FLINK-11225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16729531#comment-16729531
 ] 

TisonKun commented on FLINK-11225:
----------------------------------

Hi [~dongtingting8...@163.com], this issue is likely duplicated by FLINK-10011. 
At that time [~till.rohrmann] sent a 
[pr|https://github.com/apache/flink/pull/6587] as a fix, which based on an 
assumption that the JobGraph can be faster released than recovered and brought 
into a globally terminal state by another JobManager/JobMaster.

This is not the case in your situation, so we run into the same problem. 
FLINK-10333 is a follow-up issue to this. The community is rethinking ZK based 
store to avoid all of this kind of race conditions. Mainly we might be likely 
to communicate with ZK with a session id to avoid modifying 
SubmittedJobGraphStore even if Dispatcher is not leader.

> Error state of addedJobGraphs when Dispatcher with concurrent revoking and 
> granting leadership
> ----------------------------------------------------------------------------------------------
>
>                 Key: FLINK-11225
>                 URL: https://issues.apache.org/jira/browse/FLINK-11225
>             Project: Flink
>          Issue Type: Bug
>          Components: Core, Distributed Coordination
>    Affects Versions: 1.6.2
>         Environment: flink 1.6.2  on yarn 
>            Reporter: dongtingting
>            Priority: Major
>
> Dispatcher was revoked leadership and immediately grant leadership in some 
> cases like appmaster go through a long time of full gc。 This can lead to 
> Dispatcher.'revokeLeadership' and 'grantLeadership' concurrently run。Then 
> ZooKeeperSubmittedJobGraphStore may 'recoverJobGraph' happen before 
> 'releaseJobGraph',and  addedJobGraphs in ZooKeeperSubmittedJobGraphStore do 
> not contain the running job。 Later when we cancle the running job, cant not 
> remove jobgraph from zk $basedir/jobgraphs/$job_id. If appmaster restart it 
> will recover the cancled job.
>  case, appmaster log:
> 2018-12-08 21:12:03,729 INFO 
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn[main-SendThread(******:2181)]
>  - *Client session timed out*, have not heard from server in 40082ms for 
> sessionid 0x1657682ceee6082, closing socket connection and attempting 
> reconnect
>  2018-12-08 21:12:03,978 INFO 
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager[main-EventThread]
>  - State change: SUSPENDED
> 2018-12-08 21:12:03,980 WARN 
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService[Curator-ConnectionStateManager-0]
>  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.
>  2018-12-08 21:12:03,980 WARN 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService[Curator-ConnectionStateManager-0]
>  - Connection to ZooKeeper suspended. The contender 
> [http://***:***|http://%2A%2A%2A%2A%2A%2A/] no longer participates in the 
> leader election.
>  2018-12-08 21:12:03,980 WARN 
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService[Curator-ConnectionStateManager-0]
>  - Connection to ZooKeeper suspended. Can no longer retrieve the leader from 
> ZooKeeper.
>  2018-12-08 21:12:03,981 WARN 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService[Curator-ConnectionStateManager-0]
>  - Connection to ZooKeeper suspended. The contender 
> akka.tcp://fl...@bjm7-lc453.jxq:44815/user/resourcemanager no longer 
> participates in the leader election.
>  2018-12-08 21:12:03,982 INFO 
> org.apache.flink.runtime.jobmaster.JobManagerRunner 
> [Curator-ConnectionStateManager-0] - JobManager for job job_*** 
> (2a16bfa299b56432e1141df3b1361fbc) was *revoked* *leadership* at 
> akka.tcp://flink@****:***/user/jobmanager_0.
>  2018-12-08 21:12:03,984 INFO 
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService[flink-akka.actor.default-dispatcher-186]
>  - Stopping ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
> 2018-12-08 21:12:03,986 INFO 
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint 
> [Curator-ConnectionStateManager-0] - 
> [http://****:***|http://%2A%2A%2A%2A%2A%2A%2A/] lost leadership
>  2018-12-08 21:12:03,986 INFO org.apache.flink.yarn.YarnResourceManager 
> [flink-akka.actor.default-dispatcher-287] - ResourceManager 
> akka.tcp://flink@****:***/user/resourcemanager was *revoked* *leadership*. 
> Clearing fencing token.
>  2018-12-08 21:12:03,986 INFO 
> org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService[flink-akka.actor.default-dispatcher-287]
>  - Stopping ZooKeeperLeaderRetrievalService 
> /leader/2a16bfa299b56432e1141df3b1361fbc/job_manager_lock.
> 2018-12-08 21:12:03,990 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher    
> [flink-akka.actor.default-dispatcher-281]  - *Dispatcher* 
> akka.tcp://fl...@bjm7-lc453.jxq:44815/user/dispatcher was *revoked 
> leadership.*
> 2018-12-08 21:12:03,990 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher    
> [flink-akka.actor.default-dispatcher-281]  - Stopping all currently running 
> jobs of dispatcher akka.tcp://flink@****:***/user/dispatcher.
> 2018-12-08 21:12:04,181 INFO 
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn[main-SendThread(10.54.33.12:2181)]
>  - Session establishment complete on server ****/****:2181, sessionid = 
> 0x1657682ceee6082, negotiated timeout = 60000
>  2018-12-08 21:12:04,181 INFO 
> org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager[main-EventThread]
>  - State change: *RECONNECTED*
> 2018-12-08 21:12:04,188 INFO 
> org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint [main-EventThread] 
> - [http://bjm7-lc453.jxq:45684|http://bjm7-lc453.jxq:45684/] *was granted 
> leadership* with leaderSessionID=43f8a4f4-d3a6-48fb-afef-1f2f03ad5626
>  2018-12-08 21:12:04,188 INFO org.apache.flink.yarn.YarnResourceManager 
> [flink-akka.actor.default-dispatcher-281] - ResourceManager 
> akka.tcp://fl...@bjm7-lc453.jxq:44815/user/resourcemanager was granted 
> leadership with fencing token acacde8ee4e115851f872189e7064971
>  2018-12-08 21:12:04,188 INFO 
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager[flink-akka.actor.default-dispatcher-281]
>  - Starting the SlotManager.
>  2018-12-08 21:12:04,190 INFO 
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher 
> [flink-akka.actor.default-dispatcher-218] - Dispatcher 
> akka.tcp://flink@****:***/user/dispatcher was granted leadership with fencing 
> token 8fc420d6-5526-41b0-ac0f-881437c55919
>  2018-12-08 21:12:04,190 INFO 
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher 
> [flink-akka.actor.default-dispatcher-276] - *Recovering all persisted jobs*.
> 2018-12-08 21:12:04,624 INFO org.apache.flink.runtime.jobmaster.JobMaster 
> [flink-akka.actor.default-dispatcher-186] - Stopping the JobMaster for job 
> job_***(2a16bfa299b56432e1141df3b1361fbc).
>  2018-12-08 21:12:04,648 INFO 
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore[flink-akka.actor.default-dispatcher-276]
>  -{color:#ff0000} Recovered 
> SubmittedJobGraph(2a16bfa299b56432e1141df3b1361fbc, null){color}.
>  2018-12-08 21:12:04,665 INFO 
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool 
> [flink-akka.actor.default-dispatcher-242] - Stopping SlotPool.
>  2018-12-08 21:12:04,673 INFO 
> org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService[flink-akka.actor.default-dispatcher-186]
>  - Stopping ZooKeeperLeaderElectionService 
> ZooKeeperLeaderElectionService\{leaderPath='/leader/2a16bfa299b56432e1141df3b1361fbc/job_manager_lock'}.
> 2018-12-08 21:12:08,283 INFO  
> org.apache.flink.runtime.jobmanager.ZooKeeperSubmittedJobGraphStore[flink-akka.actor.default-dispatcher-218]
>   -{color:#ff0000} Released locks of job graph 
> 2a16bfa299b56432e1141df3b1361fbc{color} from ZooKeeper.
>  
> here, ZooKeeperSubmittedJobGraphStore do not contain the running job 
> 2a16bfa299b56432e1141df3b1361fbc
> later the job is cancled ,but when appmaster restart, it will try to recover 
> the cancled job and fail.  appmaster  log:
>  
> 2018-12-08 22:02:30,160 ERROR 
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint 
> [flink-akka.actor.default-dispatcher-670] - Fatal error occurred in the 
> cluster entrypoint.
>  java.lang.RuntimeException: 
> org.apache.flink.runtime.client.JobExecutionException: Could not set up 
> JobManager
>  at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
>  at 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
>  at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
>  at 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
>  at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>  at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>  at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>  at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>  Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not 
> set up JobManager
>  at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:176)
>  at 
> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
>  at 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
>  at 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
>  ... 7 more
>  Caused by: java.lang.Exception: Cannot set up the user code libraries: File 
> does not exist: 
> /home/flink/data/flink/state/**/***/zk/**/blob/job_2a16bfa299b56432e1141df3b1361fbc/blob_p-ce59b177934b5091b6aa0f244265465fce2a6b9b-32b9e2c3e16da7bc6e27339e5bd19bef
>  at 
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
>  at 
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
>  at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2028)
>  at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1998)
>  at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1911)
>  at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:572)
>  at 
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:89)
>  at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>  at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>  at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
>  at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1076)
>  at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2250)
>  at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2246)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1796)
>  at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2244)
> at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:134)
>  ... 10 more
>  Caused by: java.io.FileNotFoundException: File does not exist: 
> /home/flink/data/flink/state/***/****/zk/***/blob/job_{color:#ff0000}2a16bfa299b56432e1141df3b1361fbc{color}/blob_p-ce59b177934b5091b6aa0f244265465fce2a6b9b-32b9e2c3e16da7bc6e27339e5bd19bef
>  at 
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
>  at 
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
>  at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2028)
>  at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1998)
>  at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1911)
>  at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:572)
>  at 
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:89)
>  at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>  at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>  at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
>  at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1076)
>  at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2250)
>  at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2246)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1796)
>  at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2244)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
>  at 
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
>  at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
>  at 
> org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:106)
>  at 
> org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:73)
>  at 
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1211)
>  at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1199)
>  at org.apache.hadoop.hdfs.DFSClient.getLocatedBlocks(DFSClient.java:1189)
>  at 
> org.apache.hadoop.hdfs.DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength(DFSInputStream.java:275)
>  at org.apache.hadoop.hdfs.DFSInputStream.openInfo(DFSInputStream.java:242)
>  at org.apache.hadoop.hdfs.DFSInputStream.<init>(DFSInputStream.java:235)
>  at org.apache.hadoop.hdfs.DFSClient.open(DFSClient.java:1487)
>  at 
> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:302)
>  at 
> org.apache.hadoop.hdfs.DistributedFileSystem$3.doCall(DistributedFileSystem.java:298)
>  at 
> org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
>  at 
> org.apache.hadoop.hdfs.DistributedFileSystem.open(DistributedFileSystem.java:298)
>  at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:766)
>  at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:120)
>  at 
> org.apache.flink.runtime.fs.hdfs.HadoopFileSystem.open(HadoopFileSystem.java:37)
>  at 
> org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:102)
>  at 
> org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:84)
>  at 
> org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:493)
>  at 
> org.apache.flink.runtime.blob.BlobServer.getFileInternal(BlobServer.java:444)
>  at org.apache.flink.runtime.blob.BlobServer.getFile(BlobServer.java:417)
>  at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
>  at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerJob(BlobLibraryCacheManager.java:91)
>  at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.<init>(JobManagerRunner.java:131)
>  ... 10 more
>  Caused by: 
> org.apache.hadoop.ipc.RemoteException(java.io.FileNotFoundException): File 
> does not exist: 
> /home/flink/data/flink/state/**/***/zk/***/blob/{color:#ff0000}job_2a16bfa299b56432e1141df3b1361fbc{color}/blob_p-ce59b177934b5091b6aa0f244265465fce2a6b9b-32b9e2c3e16da7bc6e27339e5bd19bef
> at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66)
>  at 
> org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56)
>  at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:2028)
>  at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1998)
>  at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1911)
>  at 
> org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:572)
>  at 
> org.apache.hadoop.hdfs.server.namenode.AuthorizationProviderProxyClientProtocol.getBlockLocations(AuthorizationProviderProxyClientProtocol.java:89)
>  at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:365)
>  at 
> org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
>  at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:617)
>  at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:1076)
>  at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2250)
>  at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2246)
>  at java.security.AccessController.doPrivileged(Native Method)
>  at javax.security.auth.Subject.doAs(Subject.java:422)
>  at 
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1796)
>  at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2244)
> at org.apache.hadoop.ipc.Client.call(Client.java:1470)
>  at org.apache.hadoop.ipc.Client.call(Client.java:1401)
>  at 
> org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:232)
>  at com.sun.proxy.$Proxy9.getBlockLocations(Unknown Source)
>  at 
> org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.getBlockLocations(ClientNamenodeProtocolTranslatorPB.java:254)
>  at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>  at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>  at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>  at java.lang.reflect.Method.invoke(Method.java:498)
>  at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
>  at 
> org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
>  at com.sun.proxy.$Proxy10.getBlockLocations(Unknown Source)
>  at 
> org.apache.hadoop.hdfs.DFSClient.callGetBlockLocations(DFSClient.java:1209)
>  ... 31 more
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to