[ 
https://issues.apache.org/jira/browse/FLINK-4182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-4182:
------------------------------------
    Priority: Blocker  (was: Major)

> HA recovery not working properly under ApplicationMaster failures.
> ------------------------------------------------------------------
>
>                 Key: FLINK-4182
>                 URL: https://issues.apache.org/jira/browse/FLINK-4182
>             Project: Flink
>          Issue Type: Bug
>          Components: Distributed Coordination, State Backends, Checkpointing
>    Affects Versions: 1.0.3
>            Reporter: Stefan Richter
>            Priority: Blocker
>
> When randomly killing TaskManager and ApplicationMaster, a job sometimes does 
> not properly recover in HA mode.
> There can be different symptoms for this. For example, in one case the job is 
> dying with the following exception:
> {code}
>  The program finished with the following exception:
> org.apache.flink.client.program.ProgramInvocationException: The program 
> execution failed: Cannot set up the user code libraries: Cannot get library 
> with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413)
>       at 
> org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389)
>       at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>       at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381)
>       at 
> da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509)
>       at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403)
>       at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331)
>       at 
> org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738)
>       at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251)
>       at 
> org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966)
>       at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009)
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set 
> up the user code libraries: Cannot get library with hash 
> 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
>       at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>       at 
> org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105)
>       at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162)
>       at 
> org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
>       at 
> scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28)
>       at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
>       at 
> org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: java.io.IOException: Cannot get library with hash 
> 7fafffe9595cd06aff213b81b5da7b1682e1d6b0
>       at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerReferenceToBlobKeyAndGetURL(BlobLibraryCacheManager.java:257)
>       at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:116)
>       at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerJob(BlobLibraryCacheManager.java:88)
>       at 
> org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1084)
>       ... 26 more
> Caused by: java.io.IOException: Failed to copy from blob store.
>       at org.apache.flink.runtime.blob.BlobServer.getURL(BlobServer.java:358)
>       at 
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerReferenceToBlobKeyAndGetURL(BlobLibraryCacheManager.java:248)
>       ... 29 more
> Caused by: java.io.IOException: 
> gs:///flink/recovery/blob/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0 
> does not exist.
>       at 
> org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:121)
>       at 
> org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:93)
>       at org.apache.flink.runtime.blob.BlobServer.getURL(BlobServer.java:355)
>       ... 30 more
> {code}
> In other cases, i noticed inconsistencies in the results by testing with a 
> streaming state machine job and a Kafka source. My guess is that value state 
> is no restored properly, because all invalid transactions in the log start 
> from the initial state, which is the default value for the value state.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to