Hi,

Just sending this again to see if others have had this issue.

I recently switched to using kryo serialization and I've been running into 
errors
with the mutable.LinkedHashMap class.

If I don't register the mutable.LinkedHashMap class then I get an 
ArrayStoreException seen below.
If I do register the class, then when the LinkedHashMap is collected on the 
driver, it does not contain any elements.

Here is the snippet of code I used : 
val sc = new SparkContext(new SparkConf()
  .setMaster("local[*]")
  .setAppName("Sample")
  .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
  .registerKryoClasses(Array(classOf[mutable.LinkedHashMap[String, String]])))

val collect = sc.parallelize(0 to 10)
  .map(p => new mutable.LinkedHashMap[String, String]() ++= Array(("hello", 
"bonjour"), ("good", "bueno")))

val mapSideSizes = collect.map(p => p.size).collect()(0)
val driverSideSizes = collect.collect()(0).size

println("The sizes before collect : " + mapSideSizes)
println("The sizes after collect : " + driverSideSizes)

** The following only occurs if I did not register the mutable.LinkedHashMap 
class **
16/08/20 18:10:38 ERROR TaskResultGetter: Exception while getting task result
java.lang.ArrayStoreException: scala.collection.mutable.HashMap
        at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
        at 
com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
        at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
        at 
org.apache.spark.serializer.KryoSerializerInstance.deserialize(KryoSerializer.scala:311)
        at 
org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97)
        at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply$mcV$sp(TaskResultGetter.scala:60)
        at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
        at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun$run$1.apply(TaskResultGetter.scala:51)
        at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
        at 
org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(TaskResultGetter.scala:50)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)

I hope this is a known issue and/or I'm missing something important in my setup.
Appreciate any help or advice!

Best,

Rahul Palamuttam

Reply via email to