Suppose my mappers can be functions (def) that internally call other classes
and create objects and do different things inside. (Or they can even be
classes that extend (Foo) => Bar and do the processing in their apply method
- but let's ignore this case for now)

Spark supports only Java Serialization for closures and forces all the
classes inside to implement Serializable and coughs up errors when forced to
use Kryo for closures. But one cannot expect all 3rd party libraries to have
all classes extend Serializable!

Here's a workaround that I thought I'd share in case anyone comes across
this problem:

You simply need to serialize the objects before passing through the closure,
and de-serialize afterwards. This approach just works, even if your classes
aren't Serializable, because it uses Kryo behind the scenes. All you need is
some curry. ;) Here's an example of how I did it:

def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)])              
(foo: Foo) : Bar = {    kryoWrapper.value.apply(foo)}val mapper =
genMapper(KryoSerializationWrapper(new Blah(abc)))
_rdd.flatMap(mapper).collectAsMap()object Blah(abc: ABC) extends (Foo =>
Bar) {    def apply(foo: Foo) : Bar = { //This is the real function }}
Feel free to make Blah as complicated as you want, class, companion object,
nested classes, references to multiple 3rd party libs.

KryoSerializationWrapper refers to  this wrapper from amplab/shark
<https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala>
  

Don't you think it's a good idea to have something like this inside the
framework itself? :)



--
View this message in context: 
http://apache-spark-developers-list.1001551.n3.nabble.com/Kryo-serialization-for-closures-a-workaround-tp6787.html
Sent from the Apache Spark Developers List mailing list archive at Nabble.com.

Reply via email to