Suppose my mappers can be functions (def) that internally call other classes and create objects and do different things inside. (Or they can even be classes that extend (Foo) => Bar and do the processing in their apply method - but let's ignore this case for now)
Spark supports only Java Serialization for closures and forces all the classes inside to implement Serializable and coughs up errors when forced to use Kryo for closures. But one cannot expect all 3rd party libraries to have all classes extend Serializable! Here's a workaround that I thought I'd share in case anyone comes across this problem: You simply need to serialize the objects before passing through the closure, and de-serialize afterwards. This approach just works, even if your classes aren't Serializable, because it uses Kryo behind the scenes. All you need is some curry. ;) Here's an example of how I did it: def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)]) (foo: Foo) : Bar = { kryoWrapper.value.apply(foo)}val mapper = genMapper(KryoSerializationWrapper(new Blah(abc))) _rdd.flatMap(mapper).collectAsMap()object Blah(abc: ABC) extends (Foo => Bar) { def apply(foo: Foo) : Bar = { //This is the real function }} Feel free to make Blah as complicated as you want, class, companion object, nested classes, references to multiple 3rd party libs. KryoSerializationWrapper refers to this wrapper from amplab/shark <https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala> Don't you think it's a good idea to have something like this inside the framework itself? :) -- View this message in context: http://apache-spark-developers-list.1001551.n3.nabble.com/Kryo-serialization-for-closures-a-workaround-tp6787.html Sent from the Apache Spark Developers List mailing list archive at Nabble.com.