Using
<dependency>
<groupId>com.esotericsoftware</groupId>
<artifactId>kryo-shaded</artifactId>
<version>3.0.0</version>
</dependency>
Instead of
<dependency>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
<version>2.24.0</version>
</dependency>
fixed this
On 2014-12-03 18:15, Robin Keunen wrote:
Hi all,
I am having troubles using Kryo and being new to this kind of
serialization, I am not sure where to look. Can someone please help
me? :-)
Here is my custom class:
public class *DummyClass* implements KryoSerializable {
private static final Logger LOGGER =
LoggerFactory.getLogger(DummyClass.class);
int value;
public DummyClass() {
}
public DummyClass(int value) {
LOGGER.info("hey I'm dum {}!", value);
this.value = value;
}
public int getValue() {
return value;
}
public void setValue(int value) {
this.value = value;
}
@Override
public void write(Kryo kryo, Output output) {
output.writeInt(value);
}
@Override
public void read(Kryo kryo, Input input) {
this.value = input.readInt();
}
}
Here is my registrator:
public class MyKryoRegistrator implements KryoRegistrator {
@Override
public void registerClasses(Kryo kryo) {
kryo.register(DummyClass.class);
}
}
And the *Spark* code:
SparkConf sparkConf = new SparkConf()
.setAppName(appName)
.setMaster(master)
.setJars(jars)
.set("spark.serializer",
"org.apache.spark.serializer.KryoSerializer")
.set("spark.kryo.registrator", "org.roke.main.MyKryoRegistrator");
JavaSparkContext sparkContext = new JavaSparkContext(sparkConf);
List<DummyClass> dummyClasses = Arrays.asList(
new DummyClass(1),
new DummyClass(2),
new DummyClass(3),
new DummyClass(4)
);
JavaRDD<DummyClass> rdd = sparkContext.parallelize(dummyClasses);
for (DummyClass dummyClass: rdd.collect())
LOGGER.info("driver collected {}", dummyClass);
The program fails with the following NullPointerException:
Exception in thread "main" org.apache.spark.SparkException: Job
aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most
recent failure: Lost task 0.3 in stage 0.0 (TID 6, 10.21.6.68):
java.lang.NullPointerException:
com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:36)
com.twitter.chill.WrappedArraySerializer.read(WrappedArraySerializer.scala:21)
com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
org.apache.spark.serializer.KryoDeserializationStream.readObject(KryoSerializer.scala:133)
org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80)
org.apache.spark.rdd.ParallelCollectionPartition$$anonfun$readObject$1.apply(ParallelCollectionRDD.scala:80)
org.apache.spark.util.Utils$.deserializeViaNestedStream(Utils.scala:123)
org.apache.spark.rdd.ParallelCollectionPartition.readObject(ParallelCollectionRDD.scala:80)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:606)
java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1017)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1893)
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:62)
org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:87)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:159)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)
--
Robin Keunen
Software Engineer
robin.keu...@lampiris.be
www.lampiris.be
--
Robin Keunen
Software Engineer
robin.keu...@lampiris.be
www.lampiris.be