Below is the code with standard MLlib class. Apparently this issue can happen 
in the same Spark instance.

import java.io._

import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.classification.NaiveBayesModel
import org.apache.spark.mllib.util.MLUtils

val data = MLUtils.loadLibSVMFile(sc, "hdfs://myserver:9000/data/mnist.scale")
val nb = NaiveBayes.train(data)
// RDD map works fine
val predictionAndLabels = data.map( lp => 
(nb.classifierModel.predict(lp.features), lp.label))

// serialize the model to file and immediately load it
val oos = new ObjectOutputStream(new FileOutputStream("/home/myuser/nb.bin"))
oos.writeObject(nb)
oos.close
val ois = new ObjectInputStream(new FileInputStream("/home/myuser/nb.bin"))
val nbSerialized = ois.readObject.asInstanceOf[NaiveBayesModel]
ois.close
// RDD map fails
val predictionAndLabels = data.map( lp => (nbSerialized.predict(lp.features), 
lp.label))
org.apache.spark.SparkException: Task not serializable
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1453)
        at org.apache.spark.rdd.RDD.map(RDD.scala:273)


From: Akhil Das [mailto:ak...@sigmoidanalytics.com]
Sent: Sunday, March 08, 2015 3:17 AM
To: Ulanov, Alexander
Cc: dev
Subject: Re: Loading previously serialized object to Spark

Can you paste the complete code?

Thanks
Best Regards

On Sat, Mar 7, 2015 at 2:25 AM, Ulanov, Alexander 
<alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote:
Hi,

I've implemented class MyClass in MLlib that does some operation on 
LabeledPoint. MyClass extends serializable, so I can map this operation on data 
of RDD[LabeledPoints], such as data.map(lp => MyClass.operate(lp)). I write 
this class in file with ObjectOutputStream.writeObject. Then I stop and restart 
Spark. I load this class from file with 
ObjectInputStream.readObject.asInstanceOf[MyClass]. When I try to map the same 
operation of this class to RDD, Spark throws not serializable exception:
org.apache.spark.SparkException: Task not serializable
        at 
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
        at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
        at org.apache.spark.SparkContext.clean(SparkContext.scala:1453)
        at org.apache.spark.rdd.RDD.map(RDD.scala:273)

Could you suggest why it throws this exception while MyClass is serializable by 
definition?

Best regards, Alexander

Reply via email to