It turns out Kyro doesn't play well with protobuf.  Out of the box I see:

com.esotericsoftware.kryo.KryoException: java.lang.UnsupportedOperationException

Serialization trace:
extra_ (com.foo.bar.MyMessage)
        
com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:626)
...

Maybe I can fix this, not sure.




I dug a big deeper into the Java serialization issue and I believe
there's a bug in Spark introduced by a recent change in v1.1.


FMI the full stack trace I see in my app (sorry the one in the first
email was truncated) is:

java.lang.RuntimeException: Unable to find proto buffer class
        at 
com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1706)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1344)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at 
java.io.ObjectInputStream.defaultReadObject(ObjectInputStream.java:500)
        at 
org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:74)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at 
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at 
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
        at 
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)


So, the Executor appears to be using the right classloader here in spark 1.1:

https://github.com/apache/spark/blob/2f9b2bd7844ee8393dc9c319f4fefedf95f5e460/core/src/main/scala/org/apache/spark/executor/Executor.scala#L159


I added some code to print the available classes in the initializer of
the map() function and I *do* see my protobuf message class printed,
but *only* for the current thread classloader and *not* for SparkEnv's
class loader.

try {
  ClassLoader cl = Thread.currentThread().getContextClassLoader();
//SparkEnv.getThreadLocal().getClass().getClassLoader();
  Field f = ClassLoader.class.getDeclaredField("classes");
  f.setAccessible(true);

  Vector<Class> classes = (Vector<Class>) f.get(cl);
  for (int i = 0; i < classes.size(); ++i) {
    Class c = classes.get(i);
    String name = c.toString();
    if (name.indexOf("myPackage") != -1) {
       System.err.println("class: " + name);
    }
  }
} catch (NoSuchFieldException e1) {
e1.printStackTrace();
} catch (IllegalAccessException e1) {
e1.printStackTrace();
}



So my code is available to the classloader and there's perhaps
something wrong with the classloader that
ParallelCollectionPartition.readObject() is using.  I note that if I
run spark-submit with --driver-class-path uber.jar, then the error
goes away (for local master only tho.. for remote the error returns).


I think the root problem might be related to this change:
https://github.com/apache/spark/commit/cc3648774e9a744850107bb187f2828d447e0a48#diff-7b43397a89d8249663cbd13374a48db0R42

That change did not appear to touch ParallelCollectionRDD, which I
believe is using the root classloader (would explain why
--driver-class-path fixes the problem):
https://github.com/apache/spark/blob/2f9b2bd7844ee8393dc9c319f4fefedf95f5e460/core/src/main/scala/org/apache/spark/rdd/ParallelCollectionRDD.scala#L74

If uber.jar is on the classpath, then the root classloader would have
the code, hence why --driver-class-path fixes the bug.




On Thu, Sep 18, 2014 at 5:42 PM, Paul Wais <pw...@yelp.com> wrote:
> hmmmmmm would using kyro help me here?
>
>
> On Thursday, September 18, 2014, Paul Wais <pw...@yelp.com> wrote:
>>
>> Ah, can one NOT create an RDD of any arbitrary Serializable type?  It
>> looks like I might be getting bitten by the same
>> "java.io.ObjectInputStream uses root class loader only" bugs mentioned
>> in:
>>
>> *
>> http://apache-spark-user-list.1001560.n3.nabble.com/java-lang-ClassNotFoundException-td3259.html
>> * https://github.com/apache/spark/pull/181
>>
>> *
>> http://mail-archives.apache.org/mod_mbox/spark-user/201311.mbox/%3c7f6aa9e820f55d4a96946a87e086ef4a4bcdf...@eagh-erfpmbx41.erf.thomson.com%3E
>> * https://groups.google.com/forum/#!topic/spark-users/Q66UOeA2u-I
>>
>>
>>
>>
>> On Thu, Sep 18, 2014 at 4:51 PM, Paul Wais <pw...@yelp.com> wrote:
>> > Well, it looks like Spark is just not loading my code into the
>> > driver/executors.... E.g.:
>> >
>> > List<String> foo = JavaRDD<MyMessage> bars.map(
>> >     new Function< MyMessage, String>() {
>> >
>> >     {
>> >         System.err.println("classpath: " +
>> > System.getProperty("java.class.path"));
>> >
>> >         CodeSource src =
>> >
>> > com.google.protobuf.GeneratedMessageLite.class.getProtectionDomain().getCodeSource();
>> >         if (src2 != null) {
>> >            URL jar = src2.getLocation();
>> >
>> > System.err.println("aaacom.google.protobuf.GeneratedMessageLite
>> > from jar: " + jar.toString());
>> >     }
>> >
>> >     @Override
>> >     public String call(MyMessage v1) throws Exception {
>> >         return v1.getString();
>> >     }
>> > }).collect();
>> >
>> > prints:
>> > classpath:
>> > ::/opt/spark/conf:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar:/opt/spark/lib/datanucleus-api-jdo-3.2.1.jar:/opt/spark/lib/datanucleus-rdbms-3.2.1.jar:/opt/spark/lib/datanucleus-core-3.2.2.jar
>> > com.google.protobuf.GeneratedMessageLite from jar:
>> > file:/opt/spark/lib/spark-assembly-1.1.0-hadoop2.3.0.jar
>> >
>> > I do see after those lines:
>> > 14/09/18 23:28:09 INFO Executor: Adding
>> > file:/tmp/spark-cc147338-183f-46f6-b698-5b897e808a08/uber.jar to class
>> > loader
>> >
>> >
>> > This is with:
>> >
>> > spart-submit --master local --class MyClass --jars uber.jar  uber.jar
>> >
>> >
>> > My uber.jar has protobuf 2.5; I expected GeneratedMessageLite would
>> > come from there.  I'm using spark 1.1 and hadoop 2.3; hadoop 2.3
>> > should use protobuf 2.5[1] and even shade it properly.  I read claims
>> > in this list that Spark shades protobuf correctly since 0.9.? and
>> > looking thru the pom.xml on github it looks like Spark includes
>> > protobuf 2.5 in the hadoop 2.3 profile.
>> >
>> >
>> > I guess I'm still at "What's the deal with getting Spark to distribute
>> > and load code from my jar correctly?"
>> >
>> >
>> > [1]
>> > http://svn.apache.org/repos/asf/hadoop/common/branches/branch-2.3.0/hadoop-project/pom.xml
>> >
>> > On Thu, Sep 18, 2014 at 1:06 AM, Paul Wais <pw...@yelp.com> wrote:
>> >> Dear List,
>> >>
>> >> I'm writing an application where I have RDDs of protobuf messages.
>> >> When I run the app via bin/spar-submit with --master local
>> >> --driver-class-path path/to/my/uber.jar, Spark is able to
>> >> ser/deserialize the messages correctly.
>> >>
>> >> However, if I run WITHOUT --driver-class-path path/to/my/uber.jar or I
>> >> try --master spark://my.master:7077 , then I run into errors that make
>> >> it look like my protobuf message classes are not on the classpath:
>> >>
>> >> Exception in thread "main" org.apache.spark.SparkException: Job
>> >> aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most
>> >> recent failure: Lost task 0.0 in stage 1.0 (TID 0, localhost):
>> >> java.lang.RuntimeException: Unable to find proto buffer class
>> >>
>> >> com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
>> >>         sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>> >>
>> >> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>> >>
>> >> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>> >>         java.lang.reflect.Method.invoke(Method.java:606)
>> >>
>> >> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104)
>> >>
>> >> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807)
>> >>         ...
>> >>
>> >> Why do I need --driver-class-path in the local scenario?  And how can
>> >> I ensure my classes are on the classpath no matter how my app is
>> >> submitted via bin/spark-submit (e.g. --master spark://my.master:7077 )
>> >> ?  I've tried poking through the shell scripts and SparkSubmit.scala
>> >> and unfortunately I haven't been able to grok exactly what Spark is
>> >> doing with the remote/local JVMs.
>> >>
>> >> Cheers,
>> >> -Paul

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to