[ 
https://issues.apache.org/jira/browse/FLINK-3021?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15008420#comment-15008420
 ] 

Till Rohrmann commented on FLINK-3021:
--------------------------------------

The problem is that the {{StreamingJobGraphGenerator}} sets the input format 
for the {{JobVertex}} on the client side. Therefore, the input format is 
deserialized by Akka when the job is submitted to the JobManager. However, the 
InputFormat can contain user code classes and thus should be deserialized using 
the user code class loader. This is usually done in the {{initializeOnMaster}} 
method which is called by the {{JobManager}}.

> Job submission times out due to classloading issue on JobManager
> ----------------------------------------------------------------
>
>                 Key: FLINK-3021
>                 URL: https://issues.apache.org/jira/browse/FLINK-3021
>             Project: Flink
>          Issue Type: Bug
>          Components: JobManager
>    Affects Versions: 0.10.0
>            Reporter: Robert Metzger
>            Assignee: Till Rohrmann
>            Priority: Critical
>
> A user reported the following issue when submitting a very simple job using 
> the {{DataStream}} API:
> {code}
> Caused by: org.apache.flink.runtime.client.JobExecutionException: 
> Communication with JobManager failed: Job submission to the JobManager timed 
> out.
>       at 
> org.apache.flink.runtime.client.JobClient.submitJobAndWait(JobClient.java:141)
>       at org.apache.flink.client.program.Client.runBlocking(Client.java:368)
>       ... 13 more
> Caused by: 
> org.apache.flink.runtime.client.JobClientActorSubmissionTimeoutException: Job 
> submission to the JobManager timed out.
>       at 
> org.apache.flink.runtime.client.JobClientActor.handleMessage(JobClientActor.java:255)
>       at 
> org.apache.flink.runtime.akka.FlinkUntypedActor.handleLeaderSessionID(FlinkUntypedActor.java:88)
>       at 
> org.apache.flink.runtime.akka.FlinkUntypedActor.onReceive(FlinkUntypedActor.java:68)
>       at 
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:167)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> {code}
> The problem is that akka can not deserialize the job submit message on the 
> JobManager. From the logs, the issue becomes apparent:
> {code}
> 22:14:12,964 DEBUG akka.serialization.Serialization(akka://flink)             
>    - Using serializer[akka.serialization.JavaSerializer] for message 
> [akka.actor.ActorIdentity]
> 22:14:12,995 DEBUG akka.serialization.Serialization(akka://flink)             
>    - Using serializer[akka.serialization.JavaSerializer] for message 
> [java.lang.Integer]
> 22:14:13,007 DEBUG org.apache.flink.runtime.blob.BlobServerConnection         
>    - Received PUT request for content addressable BLOB
> 22:14:13,134 ERROR akka.remote.EndpointWriter                                 
>    - AssociationError [akka.tcp://[email protected]:6123] <- 
> [akka.tcp://[email protected]:58424]: Error [com.dataartisans.SimpleEntity] [
> java.lang.ClassNotFoundException: com.dataartisans.SimpleEntity
>       at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>       at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>       at java.security.AccessController.doPrivileged(Native Method)
>       at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
>       at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>       at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
>       at java.lang.Class.forName0(Native Method)
>       at java.lang.Class.forName(Class.java:274)
>       at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:625)
>       at 
> akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19)
>       at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
>       at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
>       at java.io.ObjectInputStream.readClass(ObjectInputStream.java:1483)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1333)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>       at java.util.HashMap.readObject(HashMap.java:1180)
>       at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>       at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>       at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>       at java.lang.reflect.Method.invoke(Method.java:606)
>       at 
> java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
>       at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
>       at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
>       at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
>       at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>       at 
> akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
>       at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>       at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
>       at 
> akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
>       at scala.util.Try$.apply(Try.scala:161)
>       at akka.serialization.Serialization.deserialize(Serialization.scala:98)
>       at 
> akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
>       at 
> akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:58)
>       at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:58)
>       at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:76)
>       at 
> akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:937)
>       at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
>       at akka.remote.EndpointActor.aroundReceive(Endpoint.scala:415)
>       at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
>       at akka.actor.ActorCell.invoke(ActorCell.scala:487)
>       at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254)
>       at akka.dispatch.Mailbox.run(Mailbox.scala:221)
>       at akka.dispatch.Mailbox.exec(Mailbox.scala:231)
>       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>       at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>       at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> ]
> 22:14:13,137 WARN  akka.remote.ReliableDeliverySupervisor                     
>    - Association with remote system [akka.tcp://[email protected]:58424] has 
> failed, address is now gated for [5000] ms. Reason is: 
> [com.dataartisans.SimpleEntity].
> 22:14:13,142 DEBUG akka.remote.EndpointWriter                                 
>    - Disassociated [akka.tcp://[email protected]:6123] <- 
> [akka.tcp://[email protected]:58424]
> {code}
> I suspect the issue is that the job is using an AvroInputFormat which holds a 
> reference to the POJO.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to