Well it looks like this is indeed a protobuf issue.  Poked a little more
with Kryo.  Since protobuf messages are serializable, I tried just making
Kryo use the JavaSerializer for my messages.  The resulting stack trace
made it look like protobuf GeneratedMessageLite is actually using the
classloader that loaded it, which I believe would be the root loader?

 *
https://code.google.com/p/protobuf/source/browse/trunk/java/src/main/java/com/google/protobuf/GeneratedMessageLite.java?r=425#775
 *
http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l186
 *
http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/ClassLoader.java#l1529
 * See note:
http://hg.openjdk.java.net/jdk7u/jdk7u6/jdk/file/8c2c5d63a17e/src/share/classes/java/lang/Class.java#l220

So I guess protobuf java serialization is sensitive to the class loader.  I
wonder if Kenton ever saw this one coming :)  I do have a solution, though
(see way below)


Here's the code and stack trace:

SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("myapp");
sparkConf.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer");
sparkConf.set("spark.kryo.registrator", "MyKryoRegistrator");

...

public class MyKryoRegistrator implements KryoRegistrator {
public void registerClasses(Kryo kryo) {
        kryo.register(MyProtoMessage.class, new JavaSerializer());
    }
}

...

14/09/19 05:39:12 ERROR Executor: Exception in task 2.0 in stage 1.0 (TID 2)
com.esotericsoftware.kryo.KryoException: Error during Java deserialization.
        at
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
        at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
        at
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
        at
com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:34)
        at
com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:21)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
        at
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
        at
org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80)
        at
org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80)
        at
org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:123)
        at
org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:80)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
        at
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
        at
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
        at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
        at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:744)
Caused by: java.lang.RuntimeException: Unable to find proto buffer class
        at
com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:775)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:606)
        at
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1104)
        at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1807)
        at
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
        at
com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:40)
        ... 31 more
Caused by: java.lang.ClassNotFoundException: MyProtos$MyProto
        at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
        at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
        at java.security.AccessController.doPrivileged(Native Method)
        at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
        at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308)
        at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
        at java.lang.Class.forName0(Native Method)
        at java.lang.Class.forName(Class.java:190)
        at
com.google.protobuf.GeneratedMessageLite$SerializedForm.readResolve(GeneratedMessageLite.java:768)
        ... 40 more
14/09/19 05:39:12 WARN TaskSetManager: Lost task 1.0 in stage 1.0 (TID 1,
localhost): com.esotericsoftware.kryo.KryoException: Error during Java
deserialization.

com.esotericsoftware.kryo.serializers.JavaSerializer.read(JavaSerializer.java:42)
        com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)

com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
        com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:34)

com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:21)
        com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)

org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)

org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80)

org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80)

org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:123)

org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:80)
        sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        java.lang.reflect.Method.invoke(Method.java:606)

java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)

java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)

java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)

java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)

java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
        java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
        java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)

org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)

org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        java.lang.Thread.run(Thread.java:744)








I was able to work around this by writing a Kyro Serializer that just uses
protobuf's delimited binary message stream protocol.  This is super brittle
(protobuf lib could break, protobuf generated code could break, could fail
only at runtime), but appears to work:



public class MyKryoRegistrator implements KryoRegistrator {
public void registerClasses(Kryo kryo) {
            kryo.register(MyProtobufMessage.class, new
ProtobufSerializer<MyProtobufMessage.class>());
        }
 public class ProtobufSerializer<T extends
com.google.protobuf.GeneratedMessage> extends Serializer<T> {

public void write(Kryo kryo, Output output, T msg) {
try {
msg.writeDelimitedTo(output);
} catch (Exception ex) {
throw new KryoException("Error during Java serialization.", ex);
}
}

@SuppressWarnings("unchecked")
public T read(Kryo kryo, Input input, Class<T> type) {
try {
return (T) type.getDeclaredMethod("parseDelimitedFrom",
java.io.InputStream.class).invoke(null, input);
} catch (Exception ex) {
throw new KryoException("Error during Java deserialization.", ex);
}
}
}
}



Note that one needs to use this wrapper/serializer for *all* protobuf
classes used in the app, not just if the pb class is used in an RDD.


The only other real alternative to the above hack would be to ship my
uber.jar to all my worker machines somehow make Spark load it... that would
be pretty ugly and a decent amount of work/complexity even if I had the
help of Mesos, libswarm/Docker, etc.   (I'm actually already running this
Spark app in a cluster of Docker containers, but the assets in these
containers are static to app runs / recompiles and it makes no sense to get
into the nitty of pushing assets per-build just yet).   Spark's code
shipping features are certainly crucial for its REPL features, but it might
be worth adding some notes in the docs for app developers detailing how
Spark ships code, when it serializes things, etc.  It's not super obvious
that Spark would interact this badly with protobuf, especially since
protobuf is technically one of its dependencies :)

Reply via email to