[ https://issues.apache.org/jira/browse/FLINK-4182?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Aljoscha Krettek updated FLINK-4182: ------------------------------------ Priority: Blocker (was: Major) > HA recovery not working properly under ApplicationMaster failures. > ------------------------------------------------------------------ > > Key: FLINK-4182 > URL: https://issues.apache.org/jira/browse/FLINK-4182 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, State Backends, Checkpointing > Affects Versions: 1.0.3 > Reporter: Stefan Richter > Priority: Blocker > > When randomly killing TaskManager and ApplicationMaster, a job sometimes does > not properly recover in HA mode. > There can be different symptoms for this. For example, in one case the job is > dying with the following exception: > {code} > The program finished with the following exception: > org.apache.flink.client.program.ProgramInvocationException: The program > execution failed: Cannot set up the user code libraries: Cannot get library > with hash 7fafffe9595cd06aff213b81b5da7b1682e1d6b0 > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:413) > at > org.apache.flink.yarn.YarnClusterClient.submitJob(YarnClusterClient.java:208) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:389) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1381) > at > da.testing.StreamingStateMachineJob.main(StreamingStateMachineJob.java:61) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:606) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:509) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:403) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:331) > at > org.apache.flink.client.CliFrontend.executeProgram(CliFrontend.java:738) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:251) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:966) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1009) > Caused by: org.apache.flink.runtime.client.JobSubmissionException: Cannot set > up the user code libraries: Cannot get library with hash > 7fafffe9595cd06aff213b81b5da7b1682e1d6b0 > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1089) > at > org.apache.flink.runtime.jobmanager.JobManager$$anonfun$handleMessage$1.applyOrElse(JobManager.scala:506) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.yarn.YarnJobManager$$anonfun$handleYarnMessage$1.applyOrElse(YarnJobManager.scala:105) > at scala.PartialFunction$OrElse.apply(PartialFunction.scala:162) > at > org.apache.flink.runtime.LeaderSessionMessageFilter$$anonfun$receive$1.applyOrElse(LeaderSessionMessageFilter.scala:36) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) > at > scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:33) > at > org.apache.flink.runtime.LogMessages$$anon$1.apply(LogMessages.scala:28) > at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) > at > org.apache.flink.runtime.LogMessages$$anon$1.applyOrElse(LogMessages.scala:28) > at akka.actor.Actor$class.aroundReceive(Actor.scala:465) > at > org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:118) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) > at akka.dispatch.Mailbox.run(Mailbox.scala:221) > at akka.dispatch.Mailbox.exec(Mailbox.scala:231) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.pollAndExecAll(ForkJoinPool.java:1253) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1346) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > Caused by: java.io.IOException: Cannot get library with hash > 7fafffe9595cd06aff213b81b5da7b1682e1d6b0 > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerReferenceToBlobKeyAndGetURL(BlobLibraryCacheManager.java:257) > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:116) > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerJob(BlobLibraryCacheManager.java:88) > at > org.apache.flink.runtime.jobmanager.JobManager.org$apache$flink$runtime$jobmanager$JobManager$$submitJob(JobManager.scala:1084) > ... 26 more > Caused by: java.io.IOException: Failed to copy from blob store. > at org.apache.flink.runtime.blob.BlobServer.getURL(BlobServer.java:358) > at > org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerReferenceToBlobKeyAndGetURL(BlobLibraryCacheManager.java:248) > ... 29 more > Caused by: java.io.IOException: > gs:///flink/recovery/blob/cache/blob_7fafffe9595cd06aff213b81b5da7b1682e1d6b0 > does not exist. > at > org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:121) > at > org.apache.flink.runtime.blob.FileSystemBlobStore.get(FileSystemBlobStore.java:93) > at org.apache.flink.runtime.blob.BlobServer.getURL(BlobServer.java:355) > ... 30 more > {code} > In other cases, i noticed inconsistencies in the results by testing with a > streaming state machine job and a Kafka source. My guess is that value state > is no restored properly, because all invalid transactions in the log start > from the initial state, which is the default value for the value state. -- This message was sent by Atlassian JIRA (v6.3.4#6332)