If you want to stick with Java serialization and need to serialize a non-Serializable object, your best choices are probably to either subclass it with a Serializable one or wrap it in a class of your own which implements its own writeObject/readObject methods (see here: http://stackoverflow.com/questions/6163872/how-to-serialize-a-non-serializable-in-java )
Otherwise you can use Kryo to register custom serializers for other people's objects. On Mon, Jun 30, 2014 at 1:52 PM, Sameer Tilak <ssti...@live.com> wrote: > Hi everyone, > I was able to solve this issue. For now I changed the library code and > added the following to the class com.wcohen.ss.BasicStringWrapper: > > public class BasicStringWrapper implements Serializable > > However, I am still curious to know ho to get around the issue when you > don't have access to the code and you are using a 3rd party jar. > > > ------------------------------ > From: ssti...@live.com > To: u...@spark.incubator.apache.org > Subject: Serialization of objects > Date: Thu, 26 Jun 2014 09:30:31 -0700 > > > Hi everyone, > > Aaron, thanks for your help so far. I am trying to serialize objects that > I instantiate from a 3rd party library namely instances of > com.wcohen.ss.Jaccard, > and com.wcohen.ss.BasicStringWrapper. However, I am having problems with > serialization. I am (at least trying to) using Kryo for serialization. I > am still facing the serialization issue. I get > "org.apache.spark.SparkException: > Job aborted due to stage failure: Task not serializable: > java.io.NotSerializableException: com.wcohen.ss.BasicStringWrapper" Any > help with this will be great. > Scala code: > > package approxstrmatch > > import com.wcohen.ss.BasicStringWrapper; > import com.wcohen.ss.Jaccard; > > import java.util.Iterator; > > import org.apache.spark.SparkContext > import org.apache.spark.SparkContext._ > import org.apache.spark.SparkConf > > import org.apache.spark.rdd; > import org.apache.spark.rdd.RDD; > > import com.esotericsoftware.kryo.Kryo > import org.apache.spark.serializer.KryoRegistrator > > class MyRegistrator extends KryoRegistrator { > override def registerClasses(kryo: Kryo) { > kryo.register(classOf[approxstrmatch.JaccardScore]) > kryo.register(classOf[com.wcohen.ss.BasicStringWrapper]) > kryo.register(classOf[com.wcohen.ss.Jaccard]) > > } > } > > class JaccardScore { > > val mjc = new Jaccard() with Serializable > val conf = new > SparkConf().setMaster("spark://pzxnvm2018:7077").setAppName("ApproxStrMatch") > conf.set("spark.serializer", > "org.apache.spark.serializer.KryoSerializer") > conf.set("spark.kryo.registrator", "approxstrmatch.MyRegistrator") > > val sc = new SparkContext(conf) > > def calculateScoreSecond (sourcerdd: RDD[String], destrdd: RDD[String]) > { > val jc_ = this.mjc > > var i: Int = 0 > for (sentence <- sourcerdd.toLocalIterator) > { val str1 = new BasicStringWrapper (sentence) > var scorevector = destrdd.map(x => jc_.score(str1, new > BasicStringWrapper(x))) > val fileName = new > String("/apps/software/scala-approsstrmatch-sentence" + i) > scorevector.saveAsTextFile(fileName) > i += 1 > } > > } > > Here is the script: > val distFile = sc.textFile("hdfs://serverip:54310/data/dummy/sample.txt"); > val srcFile = sc.textFile("hdfs://serverip:54310/data/dummy/test.txt"); > val score = new approxstrmatch.JaccardScore() > score.calculateScoreSecond(srcFile, distFile) > > O/P: > > 14/06/25 12:32:05 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at > textFile at <console>:12), which has no missing parents > 14/06/25 12:32:05 INFO DAGScheduler: Submitting 1 missing tasks from Stage > 0 (MappedRDD[3] at textFile at <console>:12) > 14/06/25 12:32:05 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks > 14/06/25 12:32:05 INFO TaskSetManager: Starting task 0.0:0 as TID 0 on > executor localhost: localhost (PROCESS_LOCAL) > 14/06/25 12:32:05 INFO TaskSetManager: Serialized task 0.0:0 as 1879 bytes > in 4 ms > 14/06/25 12:32:05 INFO Executor: Running task ID 0 > 14/06/25 12:32:05 INFO Executor: Fetching > http://serverip:47417/jars/approxstrmatch.jar with timestamp 1403724701564 > 14/06/25 12:32:05 INFO Utils: Fetching > http://serverip:47417/jars/approxstrmatch.jar to > /tmp/fetchFileTemp8194323811657370518.tmp > 14/06/25 12:32:05 INFO Executor: Adding > file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/approxstrmatch.jar to > class loader > 14/06/25 12:32:05 INFO Executor: Fetching > http://serverip:47417/jars/secondstring-20140618.jar with timestamp > 1403724701562 > 14/06/25 12:32:05 INFO Utils: Fetching > http://serverip:47417/jars/secondstring-20140618.jar to > /tmp/fetchFileTemp8711755318201511766.tmp > 14/06/25 12:32:06 INFO Executor: Adding > file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/secondstring-20140618.jar > to class loader > 14/06/25 12:32:06 INFO BlockManager: Found block broadcast_1 locally > 14/06/25 12:32:06 INFO HadoopRDD: Input split: > hdfs://serverip:54310/data/dummy/test.txt:0+140 > 14/06/25 12:32:06 INFO Executor: Serialized size of result for 0 is 717 > 14/06/25 12:32:06 INFO Executor: Sending result for 0 directly to driver > 14/06/25 12:32:06 INFO Executor: Finished task ID 0 > 14/06/25 12:32:06 INFO TaskSetManager: Finished TID 0 in 227 ms on > localhost (progress: 1/1) > 14/06/25 12:32:06 INFO DAGScheduler: Completed ResultTask(0, 0) > 14/06/25 12:32:06 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks > have all completed, from pool > 14/06/25 12:32:06 INFO DAGScheduler: Stage 0 (apply at Iterator.scala:371) > finished in 0.242 s > 14/06/25 12:32:06 INFO SparkContext: Job finished: apply at > Iterator.scala:371, took 0.34204941 s > 14/06/25 12:32:06 INFO FileInputFormat: Total input paths to process : 1 > 14/06/25 12:32:06 INFO SparkContext: Starting job: saveAsTextFile at > JaccardScore.scala:52 > 14/06/25 12:32:06 INFO DAGScheduler: Got job 1 (saveAsTextFile at > JaccardScore.scala:52) with 2 output partitions (allowLocal=false) > 14/06/25 12:32:06 INFO DAGScheduler: Final stage: Stage 1(saveAsTextFile > at JaccardScore.scala:52) > 14/06/25 12:32:06 INFO DAGScheduler: Parents of final stage: List() > 14/06/25 12:32:06 INFO DAGScheduler: Missing parents: List() > 14/06/25 12:32:06 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[5] at > saveAsTextFile at JaccardScore.scala:52), which has no missing parents > 14/06/25 12:32:06 INFO DAGScheduler: Failed to run saveAsTextFile at > JaccardScore.scala:52 > org.apache.spark.SparkException: Job aborted due to stage failure: Task > not serializable: java.io.NotSerializableException: > com.wcohen.ss.BasicStringWrapper > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017) > at > org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015) > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770) > at org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713) > at > org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697) > at > org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > >