It works with "spark.executor.extraClassPath" – no exceptions in this case and 
I’m getting expected results.
But to me it limits/complicates usage Akka based receivers a lot. Do you think 
it should be considered as a bug?

Even if it’s not, can it be fixed/worked around by some classloading magic at 
either Spark or application code?


From: Tathagata Das [mailto:[email protected]]
Sent: Friday, August 29, 2014 7:21 PM
To: Anton Brazhnyk
Cc: [email protected]
Subject: Re: [Streaming] Akka-based receiver with messages defined in uploaded 
jar

Can you try adding the JAR to the class path of the executors directly, by 
setting the config "spark.executor.extraClassPath" in the SparkConf. See 
Configuration page - 
http://spark.apache.org/docs/latest/configuration.html#runtime-environment

I think what you guessed is correct. The Akka actor system is not aware of the 
classes that are dynamically added when the custom jar is added with setJar.

TD
On Fri, Aug 29, 2014 at 6:44 PM, Anton Brazhnyk 
<[email protected]<mailto:[email protected]>> wrote:
Just checked it with 1.0.2
Still same exception.

From: Anton Brazhnyk 
[mailto:[email protected]<mailto:[email protected]>]
Sent: Wednesday, August 27, 2014 6:46 PM
To: Tathagata Das
Cc: [email protected]<mailto:[email protected]>
Subject: RE: [Streaming] Akka-based receiver with messages defined in uploaded 
jar

Sorry for the delay with answer – was on vacation.
As I said I was using modified version of launcher from the example. 
Modification is just about setting spark master URL in the code to not use 
run-example script.
The launcher itself was in the attached zip (attaching it once more) as 
ActorWordCount object.

From: Tathagata Das [mailto:[email protected]]
Sent: Tuesday, August 05, 2014 11:32 PM
To: Anton Brazhnyk
Cc: [email protected]<mailto:[email protected]>
Subject: Re: [Streaming] Akka-based receiver with messages defined in uploaded 
jar

How are you launching/submitting the program? Using spark-submit? Or some other 
script (can you provide that)?

TD

On Tue, Aug 5, 2014 at 6:52 PM, Anton Brazhnyk 
<[email protected]<mailto:[email protected]>> wrote:
Went through it once again to leave the only modification in question. Still 
same exception.
I hope sources as zip file (instead of github) still can be tolerated. :)

Here is the stacktrace generated with this sources:
14/08/05 18:45:54 DEBUG RecurringTimer: Callback for BlockGenerator called at 
time 1407289554800
14/08/05 18:45:54 ERROR Remoting: 
org.apache.spark.examples.streaming.CustomMessage
java.lang.ClassNotFoundException: 
org.apache.spark.examples.streaming.CustomMessage
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:270)
        at java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623)
        at 
akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectInputStream.scala:19)
        at 
java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610)
        at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1769)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at 
akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
        at 
akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104)
        at scala.util.Try$.apply(Try.scala:161)
        at akka.serialization.Serialization.deserialize(Serialization.scala:98)
        at 
akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
        at 
akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55)
        at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
        at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
        at 
akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
-----Original Message-----
From: Tathagata Das 
[mailto:[email protected]<mailto:[email protected]>]
Sent: Tuesday, August 05, 2014 5:42 PM
To: Anton Brazhnyk
Cc: [email protected]<mailto:[email protected]>
Subject: Re: [Streaming] Akka-based receiver with messages defined in uploaded 
jar

 Can you show us the modified version. The reason could very well be what you 
suggest, but I want to understand what conditions lead to this.

TD

On Tue, Aug 5, 2014 at 3:55 PM, Anton Brazhnyk 
<[email protected]<mailto:[email protected]>> wrote:
> Greetings,
>
>
>
> I modified ActorWordCount example a little and it uses simple case
> class as the message for Streaming instead of the primitive string.
>
> I also modified launch code to not use run-example script, but set
> spark master in the code and attach the jar (setJars(…)) with all the
> classes including new case class. It runs fine in the local[*] mode
> but fails with ClassNotFoundException in standalone cluster (stacktrace 
> follows).
>
>
>
> I assume it’s the classloader problems and akka remoting just doesn’t
> know about the classes coming to the executor from attached jar.
> Am I right?
>
>
>
> I guess I could pass primitive values around and do my own
> (de)serialization but maybe there is a better way?
>
> What’s the correct way to build custom akka-based receiver with usage
> of non-primitive messages?
>
>
>
>
>
> Here is the log excerpt with stacktrace:
>
> 14/08/04 20:59:41 DEBUG RecurringTimer: Callback for BlockGenerator
> called at time 1407211181800
>
> 14/08/04 20:59:41 ERROR Remoting:
> com.genesys.gpe.analytics.akka.messages.SubscribeAck
>
> java.lang.ClassNotFoundException:
> com.genesys.gpe.analytics.akka.messages.SubscribeAck
>
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
>
>         at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
>
>         at java.security.AccessController.doPrivileged(Native Method)
>
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
>
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>
>         at
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
>
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
>         at java.lang.Class.forName0(Native Method)
>
>         at java.lang.Class.forName(Class.java:270)
>
>         at
> java.io.ObjectInputStream.resolveClass(ObjectInputStream.java:623)
>
>         at
> akka.util.ClassLoaderObjectInputStream.resolveClass(ClassLoaderObjectI
> nputStream.scala:19)
>
>         at
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1610
> )
>
>         at
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1515)
>
>         at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:17
> 69)
>
>         at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1348)
>
>         at
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
>
>         at
> akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:13
> 6)
>
>         at
> scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>
>         at
> akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136)
>
>         at
> akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serializ
> ation.scala:104)
>
>         at scala.util.Try$.apply(Try.scala:161)
>
>         at
> akka.serialization.Serialization.deserialize(Serialization.scala:98)
>
>         at
> akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23)
>
>         at
> akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.sca
> la:55)
>
>         at
> akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55)
>
>         at
> akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73)
>
>         at
> akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.sca
> la:764)
>
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>
>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>
>         at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(Abstr
> actDispatcher.scala:386)
>
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
>         at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.
> java:1339)
>
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:197
> 9)
>
>         at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThrea
> d.java:107)
>
>
>
>
> WBR,
>
> Anton


Reply via email to