Hi everyone,
Aaron, thanks for your help so far. I am trying to serialize objects that I 
instantiate from a 3rd party library namely instances of com.wcohen.ss.Jaccard, 
and com.wcohen.ss.BasicStringWrapper. However, I am having problems with 
serialization. I am (at least trying to) using Kryo for serialization. I  am 
still facing the serialization issue. I get "org.apache.spark.SparkException: 
Job aborted due to stage failure: Task not serializable: 
java.io.NotSerializableException: com.wcohen.ss.BasicStringWrapper" Any help 
with this will be great.  Scala code:
package approxstrmatch
import com.wcohen.ss.BasicStringWrapper;import com.wcohen.ss.Jaccard;
import java.util.Iterator;
import org.apache.spark.SparkContextimport 
org.apache.spark.SparkContext._import org.apache.spark.SparkConf
import org.apache.spark.rdd;import org.apache.spark.rdd.RDD;
import com.esotericsoftware.kryo.Kryoimport 
org.apache.spark.serializer.KryoRegistrator
class MyRegistrator extends KryoRegistrator {  override def 
registerClasses(kryo: Kryo) {    
kryo.register(classOf[approxstrmatch.JaccardScore])    
kryo.register(classOf[com.wcohen.ss.BasicStringWrapper])    
kryo.register(classOf[com.wcohen.ss.Jaccard])
  }}
class JaccardScore  {
  val mjc = new Jaccard()  with Serializable  val conf = new 
SparkConf().setMaster("spark://pzxnvm2018:7077").setAppName("ApproxStrMatch")  
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")  
conf.set("spark.kryo.registrator", "approxstrmatch.MyRegistrator")
  val sc = new SparkContext(conf)
  def calculateScoreSecond (sourcerdd: RDD[String], destrdd: RDD[String])  {  
val jc_ = this.mjc
  var i: Int = 0  for (sentence <- sourcerdd.toLocalIterator)   {    val str1 = 
new BasicStringWrapper (sentence)        var scorevector = destrdd.map(x => 
jc_.score(str1, new BasicStringWrapper(x)))        val fileName = new 
String("/apps/software/scala-approsstrmatch-sentence" + i)        
scorevector.saveAsTextFile(fileName)        i += 1   }
  }
Here is the script: val distFile = 
sc.textFile("hdfs://serverip:54310/data/dummy/sample.txt"); val srcFile = 
sc.textFile("hdfs://serverip:54310/data/dummy/test.txt"); val score = new 
approxstrmatch.JaccardScore() score.calculateScoreSecond(srcFile, distFile) 
O/P:
14/06/25 12:32:05 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at 
textFile at <console>:12), which has no missing parents14/06/25 12:32:05 INFO 
DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[3] at textFile 
at <console>:12)14/06/25 12:32:05 INFO TaskSchedulerImpl: Adding task set 0.0 
with 1 tasks14/06/25 12:32:05 INFO TaskSetManager: Starting task 0.0:0 as TID 0 
on executor localhost: localhost (PROCESS_LOCAL)14/06/25 12:32:05 INFO 
TaskSetManager: Serialized task 0.0:0 as 1879 bytes in 4 ms14/06/25 12:32:05 
INFO Executor: Running task ID 014/06/25 12:32:05 INFO Executor: Fetching 
http://serverip:47417/jars/approxstrmatch.jar with timestamp 
140372470156414/06/25 12:32:05 INFO Utils: Fetching 
http://serverip:47417/jars/approxstrmatch.jar to 
/tmp/fetchFileTemp8194323811657370518.tmp14/06/25 12:32:05 INFO Executor: 
Adding file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/approxstrmatch.jar 
to class loader14/06/25 12:32:05 INFO Executor: Fetching 
http://serverip:47417/jars/secondstring-20140618.jar with timestamp 
140372470156214/06/25 12:32:05 INFO Utils: Fetching 
http://serverip:47417/jars/secondstring-20140618.jar to 
/tmp/fetchFileTemp8711755318201511766.tmp14/06/25 12:32:06 INFO Executor: 
Adding 
file:/tmp/spark-397828b5-3e0e-4bb4-b56b-58895eb4d6df/secondstring-20140618.jar 
to class loader14/06/25 12:32:06 INFO BlockManager: Found block broadcast_1 
locally14/06/25 12:32:06 INFO HadoopRDD: Input split: 
hdfs://serverip:54310/data/dummy/test.txt:0+14014/06/25 12:32:06 INFO Executor: 
Serialized size of result for 0 is 71714/06/25 12:32:06 INFO Executor: Sending 
result for 0 directly to driver14/06/25 12:32:06 INFO Executor: Finished task 
ID 014/06/25 12:32:06 INFO TaskSetManager: Finished TID 0 in 227 ms on 
localhost (progress: 1/1)14/06/25 12:32:06 INFO DAGScheduler: Completed 
ResultTask(0, 0)14/06/25 12:32:06 INFO TaskSchedulerImpl: Removed TaskSet 0.0, 
whose tasks have all completed, from pool14/06/25 12:32:06 INFO DAGScheduler: 
Stage 0 (apply at Iterator.scala:371) finished in 0.242 s14/06/25 12:32:06 INFO 
SparkContext: Job finished: apply at Iterator.scala:371, took 0.34204941 
s14/06/25 12:32:06 INFO FileInputFormat: Total input paths to process : 
114/06/25 12:32:06 INFO SparkContext: Starting job: saveAsTextFile at 
JaccardScore.scala:5214/06/25 12:32:06 INFO DAGScheduler: Got job 1 
(saveAsTextFile at JaccardScore.scala:52) with 2 output partitions 
(allowLocal=false)14/06/25 12:32:06 INFO DAGScheduler: Final stage: Stage 
1(saveAsTextFile at JaccardScore.scala:52)14/06/25 12:32:06 INFO DAGScheduler: 
Parents of final stage: List()14/06/25 12:32:06 INFO DAGScheduler: Missing 
parents: List()14/06/25 12:32:06 INFO DAGScheduler: Submitting Stage 1 
(MappedRDD[5] at saveAsTextFile at JaccardScore.scala:52), which has no missing 
parents14/06/25 12:32:06 INFO DAGScheduler: Failed to run saveAsTextFile at 
JaccardScore.scala:52org.apache.spark.SparkException: Job aborted due to stage 
failure: Task not serializable: java.io.NotSerializableException: 
com.wcohen.ss.BasicStringWrapper        at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1033)
        at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1017)
 at 
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1015)
 at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)  
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)   at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1015)  at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:770)
  at 
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
 at 
org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
   at 
org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)     at 
akka.actor.ActorCell.invoke(ActorCell.scala:456)     at 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)      at 
akka.dispatch.Mailbox.run(Mailbox.scala:219) at 
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
       at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
     at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)     at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
                                          

Reply via email to