You are using some objects outside the scope of train method so spark has to serialize ADLDA model. You can just have those objects having a local copy or reference in train method. On Apr 28, 2014 8:48 PM, "Earthson" <earthson...@gmail.com> wrote:
> The problem is this object can't be Serializerable, it holds a RDD field > and > SparkContext. But Spark shows an error that it need Serialization. > > The order of my debug output is really strange. > > ~~~~~ > Training Start! > Round 0 > Hehe? > Hehe? > started? > failed? > Round 1 > Hehe? > ~~~~~ > > here is my code > > 69 import org.apache.spark.rdd > 70 import org.apache.spark.SparkContext > 71 > 72 class ADLDAModel(val sc:SparkContext, var > mwz:rdd.RDD[(Long,String,Int)], val ntopics:Int) { > 73 //TODO: flexible save location and input location > 74 //TODO: Add Perplexity > 75 > 76 val nterms = mwz.map(_._2).distinct.count.toInt > 77 val modelinfo = new ModelInfo(ntopics, nterms) > 78 > 79 def nzw = {println("Hehe?");mwz.map(x=>(x._3, > x._2)).countByValue.toSeq} > 80 def nz = nzw.map(x=>(x._1._1, > x._2)).groupBy(_._1).mapValues(_.map(_._2).sum.toLong).toSeq > 81 def tinfo = new TopicInfo(this.nzw, this.nz) > 82 > 83 def train(round:Int, innerRound:Int = 20, saveStep:Int = 5) { > 84 var step = 0 > 85 val minfo = this.sc broadcast this.modelinfo > 86 println("Training Start!") //DEBUG > 87 for(i <- 0 until round) { > 88 println(s"Round ${i}") > 89 val tmptinfo = this.tinfo > 90 val runtinfo = this.sc broadcast tmptinfo > 91 println("started?") //DEBUG > 92 this.mwz = > this.mwz.mapPartitions(it=>GibbsMapper.mapper(minfo.value, runtinfo.value, > it.toSeq, innerRound).toIterator) > 93 println("failed?") //DEBUG > 94 step = (step + 1) % saveStep > 95 if (step == 0 && i+1 != round) > this.mwz.saveAsTextFile(s"hdfs://ns1/nlp/lda/solution.round.${i}") > 96 } > 97 > this.mwz.saveAsTextFile(s"hdfs://ns1/nlp/lda/solution.round.final") > 98 } > 99 } > > > Error Code Below: > > 14/04/29 09:48:42 INFO scheduler.DAGScheduler: Got job 3 (countByValue at > lda.scala:78) with 22 output partitions (allowLocal=false) > 14/04/29 09:48:42 INFO scheduler.DAGScheduler: Final stage: Stage 4 > (countByValue at lda.scala:78) > 14/04/29 09:48:42 INFO scheduler.DAGScheduler: Parents of final stage: > List() > 14/04/29 09:48:42 INFO scheduler.DAGScheduler: Missing parents: List() > 14/04/29 09:48:42 INFO scheduler.DAGScheduler: Submitting Stage 4 > (MapPartitionsRDD[15] at countByValue at lda.scala:78), which has no > missing > parents > 14/04/29 09:48:42 INFO scheduler.DAGScheduler: Failed to run countByValue > at > lda.scala:78 > 14/04/29 09:48:42 INFO yarn.ApplicationMaster: finishApplicationMaster with > FAILED > 14/04/29 09:48:42 INFO impl.AMRMClientImpl: Waiting for application to be > successfully unregistered. > Exception in thread "Thread-3" java.lang.reflect.InvocationTargetException > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) > at > > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:601) > at > > org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:154) > Caused by: org.apache.spark.SparkException: Job aborted: Task not > serializable: java.io.NotSerializableException: > net.earthson.nlp.lda.ADLDAModel > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026) > at > > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at > scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026) > at > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794) > at > org.apache.spark.scheduler.DAGScheduler.org > $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737) > at > > org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569) > at > > org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) > at akka.actor.ActorCell.invoke(ActorCell.scala:456) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) > at akka.dispatch.Mailbox.run(Mailbox.scala:219) > at > > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) > at > scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at > scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > 14/04/29 09:48:42 INFO yarn.ApplicationMaster: AppMaster received a signal. > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Why-Spark-require-this-object-to-be-serializerable-tp5009.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. >