You are using some objects outside the scope of train method so spark has
to serialize ADLDA model. You can just have those objects having a local
copy or reference in train method.
On Apr 28, 2014 8:48 PM, "Earthson" <earthson...@gmail.com> wrote:

> The problem is this object can't be Serializerable, it holds a RDD field
> and
> SparkContext. But Spark shows an error that it need Serialization.
>
> The order of my debug output is really strange.
>
> ~~~~~
> Training Start!
> Round 0
> Hehe?
> Hehe?
> started?
> failed?
> Round 1
> Hehe?
> ~~~~~
>
> here is my code
>
>  69 import org.apache.spark.rdd
>  70 import org.apache.spark.SparkContext
>  71
>  72 class ADLDAModel(val sc:SparkContext, var
> mwz:rdd.RDD[(Long,String,Int)], val ntopics:Int) {
>  73     //TODO: flexible save location and input location
>  74     //TODO: Add Perplexity
>  75
>  76     val nterms = mwz.map(_._2).distinct.count.toInt
>  77     val modelinfo = new ModelInfo(ntopics, nterms)
>  78
>  79     def nzw = {println("Hehe?");mwz.map(x=>(x._3,
> x._2)).countByValue.toSeq}
>  80     def nz = nzw.map(x=>(x._1._1,
> x._2)).groupBy(_._1).mapValues(_.map(_._2).sum.toLong).toSeq
>  81     def tinfo = new TopicInfo(this.nzw, this.nz)
>  82
>  83     def train(round:Int, innerRound:Int = 20, saveStep:Int = 5) {
>  84         var step = 0
>  85         val minfo = this.sc broadcast this.modelinfo
>  86         println("Training Start!") //DEBUG
>  87         for(i <- 0 until round) {
>  88             println(s"Round ${i}")
>  89             val tmptinfo = this.tinfo
>  90             val runtinfo = this.sc broadcast tmptinfo
>  91             println("started?") //DEBUG
>  92             this.mwz =
> this.mwz.mapPartitions(it=>GibbsMapper.mapper(minfo.value, runtinfo.value,
> it.toSeq, innerRound).toIterator)
>  93             println("failed?") //DEBUG
>  94             step = (step + 1) % saveStep
>  95             if (step == 0 && i+1 != round)
> this.mwz.saveAsTextFile(s"hdfs://ns1/nlp/lda/solution.round.${i}")
>  96         }
>  97
> this.mwz.saveAsTextFile(s"hdfs://ns1/nlp/lda/solution.round.final")
>  98     }
>  99 }
>
>
> Error Code Below:
>
> 14/04/29 09:48:42 INFO scheduler.DAGScheduler: Got job 3 (countByValue at
> lda.scala:78) with 22 output partitions (allowLocal=false)
> 14/04/29 09:48:42 INFO scheduler.DAGScheduler: Final stage: Stage 4
> (countByValue at lda.scala:78)
> 14/04/29 09:48:42 INFO scheduler.DAGScheduler: Parents of final stage:
> List()
> 14/04/29 09:48:42 INFO scheduler.DAGScheduler: Missing parents: List()
> 14/04/29 09:48:42 INFO scheduler.DAGScheduler: Submitting Stage 4
> (MapPartitionsRDD[15] at countByValue at lda.scala:78), which has no
> missing
> parents
> 14/04/29 09:48:42 INFO scheduler.DAGScheduler: Failed to run countByValue
> at
> lda.scala:78
> 14/04/29 09:48:42 INFO yarn.ApplicationMaster: finishApplicationMaster with
> FAILED
> 14/04/29 09:48:42 INFO impl.AMRMClientImpl: Waiting for application to be
> successfully unregistered.
> Exception in thread "Thread-3" java.lang.reflect.InvocationTargetException
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:601)
>         at
>
> org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:154)
> Caused by: org.apache.spark.SparkException: Job aborted: Task not
> serializable: java.io.NotSerializableException:
> net.earthson.nlp.lda.ADLDAModel
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>         at
>
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at
> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>         at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>         at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>         at
> org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>         at
>
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
>         at
>
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>         at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>         at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>         at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>         at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>         at
>
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>         at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>         at
>
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>         at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>         at
>
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 14/04/29 09:48:42 INFO yarn.ApplicationMaster: AppMaster received a signal.
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Why-Spark-require-this-object-to-be-serializerable-tp5009.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Reply via email to