The problem is this object can't be Serializerable, it holds a RDD field and SparkContext. But Spark shows an error that it need Serialization.
The order of my debug output is really strange. ~~~~~ Training Start! Round 0 Hehe? Hehe? started? failed? Round 1 Hehe? ~~~~~ here is my code 69 import org.apache.spark.rdd 70 import org.apache.spark.SparkContext 71 72 class ADLDAModel(val sc:SparkContext, var mwz:rdd.RDD[(Long,String,Int)], val ntopics:Int) { 73 //TODO: flexible save location and input location 74 //TODO: Add Perplexity 75 76 val nterms = mwz.map(_._2).distinct.count.toInt 77 val modelinfo = new ModelInfo(ntopics, nterms) 78 79 def nzw = {println("Hehe?");mwz.map(x=>(x._3, x._2)).countByValue.toSeq} 80 def nz = nzw.map(x=>(x._1._1, x._2)).groupBy(_._1).mapValues(_.map(_._2).sum.toLong).toSeq 81 def tinfo = new TopicInfo(this.nzw, this.nz) 82 83 def train(round:Int, innerRound:Int = 20, saveStep:Int = 5) { 84 var step = 0 85 val minfo = this.sc broadcast this.modelinfo 86 println("Training Start!") //DEBUG 87 for(i <- 0 until round) { 88 println(s"Round ${i}") 89 val tmptinfo = this.tinfo 90 val runtinfo = this.sc broadcast tmptinfo 91 println("started?") //DEBUG 92 this.mwz = this.mwz.mapPartitions(it=>GibbsMapper.mapper(minfo.value, runtinfo.value, it.toSeq, innerRound).toIterator) 93 println("failed?") //DEBUG 94 step = (step + 1) % saveStep 95 if (step == 0 && i+1 != round) this.mwz.saveAsTextFile(s"hdfs://ns1/nlp/lda/solution.round.${i}") 96 } 97 this.mwz.saveAsTextFile(s"hdfs://ns1/nlp/lda/solution.round.final") 98 } 99 } Error Code Below: 14/04/29 09:48:42 INFO scheduler.DAGScheduler: Got job 3 (countByValue at lda.scala:78) with 22 output partitions (allowLocal=false) 14/04/29 09:48:42 INFO scheduler.DAGScheduler: Final stage: Stage 4 (countByValue at lda.scala:78) 14/04/29 09:48:42 INFO scheduler.DAGScheduler: Parents of final stage: List() 14/04/29 09:48:42 INFO scheduler.DAGScheduler: Missing parents: List() 14/04/29 09:48:42 INFO scheduler.DAGScheduler: Submitting Stage 4 (MapPartitionsRDD[15] at countByValue at lda.scala:78), which has no missing parents 14/04/29 09:48:42 INFO scheduler.DAGScheduler: Failed to run countByValue at lda.scala:78 14/04/29 09:48:42 INFO yarn.ApplicationMaster: finishApplicationMaster with FAILED 14/04/29 09:48:42 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered. Exception in thread "Thread-3" java.lang.reflect.InvocationTargetException at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:601) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:154) Caused by: org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: net.earthson.nlp.lda.ADLDAModel at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794) at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737) at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569) at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) at akka.actor.ActorCell.invoke(ActorCell.scala:456) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) at akka.dispatch.Mailbox.run(Mailbox.scala:219) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 14/04/29 09:48:42 INFO yarn.ApplicationMaster: AppMaster received a signal. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-Spark-require-this-object-to-be-serializerable-tp5009.html Sent from the Apache Spark User List mailing list archive at Nabble.com.