It turns out Kyro doesn't play well with protobuf. Out of the box I see: com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException
Serialization trace: extra_ (com.foo.bar.MyMessage) com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626) ... Maybe I can fix this, not sure. I dug a big deeper into the Java serialization issue and I believe there's a bug in Spark introduced by a recent change in v1.1. FMI the full stack trace I see in my app (sorry the one in the first email was truncated) is: java.lang.RuntimeException: Unable to find proto buffer class at com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500) at org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:74) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:606) at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62) at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:744) So, the Executor appears to be using the right classloader here in spark 1.1: https://github.com/apache/spark/blob/2f9b2bd7844ee8393dc9c319f4fefedf95f5e460/core/src/main/scala/org/apache/spark/executor/Executor.scala#L159 I added some code to print the available classes in the initializer of the map() function and I *do* see my protobuf message class printed, but *only* for the current thread classloader and *not* for SparkEnv's class loader. try { ClassLoader cl = Thread.currentThread().getContextClassLoader(); //SparkEnv.getThreadLocal().getClass().getClassLoader(); Field f = ClassLoader.class.getDeclaredField("classes"); f.setAccessible(true); Vector<Class> classes = (Vector<Class>) f.get(cl); for (int i = 0; i < classes.size(); ++i) { Class c = classes.get(i); String name = c.toString(); if (name.indexOf("myPackage") != -1) { System.err.println("class: " + name); } } } catch (NoSuchFieldException e1) { e1.printStackTrace(); } catch (IllegalAccessException e1) { e1.printStackTrace(); } So my code is available to the classloader and there's perhaps something wrong with the classloader that ParallelCollectionPartition.readObject() is using. I note that if I run spark-submit with --driver-class-path uber.jar, then the error goes away (for local master only tho.. for remote the error returns). I think the root problem might be related to this change: https://github.com/apache/spark/commit/cc3648774e9a744850107bb187f2828d447e0a48#diff-7b43397a89d8249663cbd13374a48db0R42 That change did not appear to touch ParallelCollectionRDD, which I believe is using the root classloader (would explain why --driver-class-path fixes the problem): https://github.com/apache/spark/blob/2f9b2bd7844ee8393dc9c319f4fefedf95f5e460/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L74 If uber.jar is on the classpath, then the root classloader would have the code, hence why --driver-class-path fixes the bug. On Thu, Sep 18, 2014 at 5:42 PM, Paul Wais <pw...@yelp.com> wrote: > hmmmmmm would using kyro help me here? > > > On Thursday, September 18, 2014, Paul Wais <pw...@yelp.com> wrote: >> >> Ah, can one NOT create an RDD of any arbitrary Serializable type? It >> looks like I might be getting bitten by the same >> "java.io.ObjectInputStream uses root class loader only" bugs mentioned >> in: >> >> * >> http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ClassNotFoundException-td3259.html >> * https://github.com/apache/spark/pull/181 >> >> * >> http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3c7f6aa9e820f55d4a96946a87e086ef4a4bcdf...@eagh-erfpmbx41.erf.thomson.com%3E >> * https://groups.google.com/forum/#!topic/spark-users/Q66UOeA2u-I >> >> >> >> >> On Thu, Sep 18, 2014 at 4:51 PM, Paul Wais <pw...@yelp.com> wrote: >> > Well, it looks like Spark is just not loading my code into the >> > driver/executors.... E.g.: >> > >> > List<String> foo = JavaRDD<MyMessage> bars.map( >> > new Function< MyMessage, String>() { >> > >> > { >> > System.err.println("classpath: " + >> > System.getProperty("java.class.path")); >> > >> > CodeSource src = >> > >> > com.google.protobuf.GeneratedMessageLite.class.getProtectionDomain().getCodeSource(); >> > if (src2 != null) { >> > URL jar = src2.getLocation(); >> > >> > System.err.println("aaacom.google.protobuf.GeneratedMessageLite >> > from jar: " + jar.toString()); >> > } >> > >> > @Override >> > public String call(MyMessage v1) throws Exception { >> > return v1.getString(); >> > } >> > }).collect(); >> > >> > prints: >> > classpath: >> > ::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.1.jar:/opt/spark/lib/datanucleus-rdbms-3.2.1.jar:/opt/spark/lib/datanucleus-core-3.2.2.jar >> > com.google.protobuf.GeneratedMessageLite from jar: >> > file:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar >> > >> > I do see after those lines: >> > 14/09/18 23:28:09 INFO Executor: Adding >> > file:/tmp/spark-cc147338-183f-46f6-b698-5b897e808a08/uber.jar to class >> > loader >> > >> > >> > This is with: >> > >> > spart-submit --master local --class MyClass --jars uber.jar uber.jar >> > >> > >> > My uber.jar has protobuf 2.5; I expected GeneratedMessageLite would >> > come from there. I'm using spark 1.1 and hadoop 2.3; hadoop 2.3 >> > should use protobuf 2.5[1] and even shade it properly. I read claims >> > in this list that Spark shades protobuf correctly since 0.9.? and >> > looking thru the pom.xml on github it looks like Spark includes >> > protobuf 2.5 in the hadoop 2.3 profile. >> > >> > >> > I guess I'm still at "What's the deal with getting Spark to distribute >> > and load code from my jar correctly?" >> > >> > >> > [1] >> > http://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.3.0/hadoop-project/pom.xml >> > >> > On Thu, Sep 18, 2014 at 1:06 AM, Paul Wais <pw...@yelp.com> wrote: >> >> Dear List, >> >> >> >> I'm writing an application where I have RDDs of protobuf messages. >> >> When I run the app via bin/spar-submit with --master local >> >> --driver-class-path path/to/my/uber.jar, Spark is able to >> >> ser/deserialize the messages correctly. >> >> >> >> However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I >> >> try --master spark://my.master:7077 , then I run into errors that make >> >> it look like my protobuf message classes are not on the classpath: >> >> >> >> Exception in thread "main" org.apache.spark.SparkException: Job >> >> aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most >> >> recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost): >> >> java.lang.RuntimeException: Unable to find proto buffer class >> >> >> >> com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775) >> >> sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) >> >> >> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) >> >> >> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >> >> java.lang.reflect.Method.invoke(Method.java:606) >> >> >> >> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104) >> >> >> >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807) >> >> ... >> >> >> >> Why do I need --driver-class-path in the local scenario? And how can >> >> I ensure my classes are on the classpath no matter how my app is >> >> submitted via bin/spark-submit (e.g. --master spark://my.master:7077 ) >> >> ? I've tried poking through the shell scripts and SparkSubmit.scala >> >> and unfortunately I haven't been able to grok exactly what Spark is >> >> doing with the remote/local JVMs. >> >> >> >> Cheers, >> >> -Paul --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org