Thanks Renato.

I forgot to reply all last time. I apologize for the rather confusing
example.
All that the snipet code did was
1. Make an RDD of LinkedHashMaps with size 2
2. On the worker side get the sizes of the HashMaps (via a map(hash =>
hash.size))
3. On the driver call collect on the RDD[Ints] which is the RDD of hashmap
sizes giving you an Array[Ints]
4. On the driver call collect on the RDD[LinkedHashMap] giving you an
Array[LinkedHashMap]
5. Check the size of a hashmap in Array[LinkedHashMap] with any size value
in Array[Ints] (they're all going to be the same size).
6. The sizes differ because the elements of the LinkedHashMap were never
copied over

Anyway I think I've tracked down the issue and it doesn't seem to be a
spark or kryo issue.

For those it concerns LinkedHashMap has this serialization issue because it
has transient members for firstEntry and lastEntry.
Take a look here :
https://github.com/scala/scala/blob/v2.11.8/src/library/scala/collection/mutable/LinkedHashMap.scala#L62

Those attributes are not going to be serialized.
Furthermore, the iterator on LinkedHashMap depends on the firstEntry
variable
Since that member is not serialized it is null.
The iterator requires the firstEntry variable to walk the LinkedHashMap
https://github.com/scala/scala/blob/v2.11.8/src/library/scala/collection/mutable/LinkedHashMap.scala#L94-L100

I wonder why these two variables were made transient.

Best,
Rahul Palamuttam


On Thu, Aug 25, 2016 at 11:13 PM, Renato Marroquín Mogrovejo <
renatoj.marroq...@gmail.com> wrote:

> Hi Rahul,
>
> You have probably already figured this one out, but anyway...
> You need to register the classes that you'll be using with Kryo because it
> does not support all Serializable types and requires you to register the
> classes you’ll use in the program in advance. So when you don't register
> the class, Kryo doesn't know how to serialize/deserialize it.
>
>
> Best,
>
> Renato M.
>
> 2016-08-22 17:12 GMT+02:00 Rahul Palamuttam <rahulpala...@gmail.com>:
>
>> Hi,
>>
>> Just sending this again to see if others have had this issue.
>>
>> I recently switched to using kryo serialization and I've been running
>> into errors
>> with the mutable.LinkedHashMap class.
>>
>> If I don't register the mutable.LinkedHashMap class then I get an
>> ArrayStoreException seen below.
>> If I do register the class, then when the LinkedHashMap is collected on
>> the driver, it does not contain any elements.
>>
>> Here is the snippet of code I used :
>>
>> val sc = new SparkContext(new SparkConf()
>>   .setMaster("local[*]")
>>   .setAppName("Sample")
>>   .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
>>   .registerKryoClasses(Array(classOf[mutable.LinkedHashMap[String, 
>> String]])))
>>
>> val collect = sc.parallelize(0 to 10)
>>   .map(p => new mutable.LinkedHashMap[String, String]() ++= Array(("hello", 
>> "bonjour"), ("good", "bueno")))
>>
>> val mapSideSizes = collect.map(p => p.size).collect()(0)
>> val driverSideSizes = collect.collect()(0).size
>>
>> println("The sizes before collect : " + mapSideSizes)
>> println("The sizes after collect : " + driverSideSizes)
>>
>>
>> ** The following only occurs if I did not register the
>> mutable.LinkedHashMap class **
>> 16/08/20 18:10:38 ERROR TaskResultGetter: Exception while getting task
>> result
>> java.lang.ArrayStoreException: scala.collection.mutable.HashMap
>> at com.esotericsoftware.kryo.serializers.DefaultArraySerializer
>> s$ObjectArraySerializer.read(DefaultArraySerializers.java:338)
>> at com.esotericsoftware.kryo.serializers.DefaultArraySerializer
>> s$ObjectArraySerializer.read(DefaultArraySerializers.java:293)
>> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
>> at org.apache.spark.serializer.KryoSerializerInstance.deseriali
>> ze(KryoSerializer.scala:311)
>> at org.apache.spark.scheduler.DirectTaskResult.value(TaskResult.scala:97)
>> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun
>> $run$1.apply$mcV$sp(TaskResultGetter.scala:60)
>> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun
>> $run$1.apply(TaskResultGetter.scala:51)
>> at org.apache.spark.scheduler.TaskResultGetter$$anon$2$$anonfun
>> $run$1.apply(TaskResultGetter.scala:51)
>> at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1741)
>> at org.apache.spark.scheduler.TaskResultGetter$$anon$2.run(Task
>> ResultGetter.scala:50)
>> at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>> Executor.java:1142)
>> at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>> lExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> I hope this is a known issue and/or I'm missing something important in my
>> setup.
>> Appreciate any help or advice!
>>
>> Best,
>>
>> Rahul Palamuttam
>>
>
>

Reply via email to