This is an interesting approach, Nilesh! Someone will correct me if I'm wrong, but I don't think this could go into ClosureCleaner as a default behavior (since Kryo apparently breaks on some classes that depend on custom Java serializers, as has come up on the list recently). But it does seem like having a function in Spark that did this for closures more transparently (to be called explicitly by clients in problem cases) could be pretty useful.
best, wb ----- Original Message ----- > From: "Nilesh" <nil...@nileshc.com> > To: d...@spark.incubator.apache.org > Sent: Saturday, May 24, 2014 10:32:57 AM > Subject: Kryo serialization for closures: a workaround > > Suppose my mappers can be functions (def) that internally call other classes > and create objects and do different things inside. (Or they can even be > classes that extend (Foo) => Bar and do the processing in their apply method > - but let's ignore this case for now) > > Spark supports only Java Serialization for closures and forces all the > classes inside to implement Serializable and coughs up errors when forced to > use Kryo for closures. But one cannot expect all 3rd party libraries to have > all classes extend Serializable! > > Here's a workaround that I thought I'd share in case anyone comes across > this problem: > > You simply need to serialize the objects before passing through the closure, > and de-serialize afterwards. This approach just works, even if your classes > aren't Serializable, because it uses Kryo behind the scenes. All you need is > some curry. ;) Here's an example of how I did it: > > def genMapper(kryoWrapper: KryoSerializationWrapper[(Foo => Bar)]) > (foo: Foo) : Bar = { kryoWrapper.value.apply(foo)}val mapper = > genMapper(KryoSerializationWrapper(new Blah(abc))) > _rdd.flatMap(mapper).collectAsMap()object Blah(abc: ABC) extends (Foo => > Bar) { def apply(foo: Foo) : Bar = { //This is the real function }} > Feel free to make Blah as complicated as you want, class, companion object, > nested classes, references to multiple 3rd party libs. > > KryoSerializationWrapper refers to this wrapper from amplab/shark > <https://github.com/amplab/shark/blob/master/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala> > > Don't you think it's a good idea to have something like this inside the > framework itself? :) > > > > -- > View this message in context: > http://apache-spark-developers-list.1001551.n3.nabble.com/Kryo-serialization-for-closures-a-workaround-tp6787.html > Sent from the Apache Spark Developers List mailing list archive at > Nabble.com.