The problem is this object can't be Serializerable, it holds a RDD field and
SparkContext. But Spark shows an error that it need Serialization.

The order of my debug output is really strange.

~~~~~
Training Start!
Round 0
Hehe?
Hehe?
started?
failed?
Round 1
Hehe?
~~~~~

here is my code

 69 import org.apache.spark.rdd
 70 import org.apache.spark.SparkContext
 71
 72 class ADLDAModel(val sc:SparkContext, var
mwz:rdd.RDD[(Long,String,Int)], val ntopics:Int) {
 73     //TODO: flexible save location and input location
 74     //TODO: Add Perplexity
 75
 76     val nterms = mwz.map(_._2).distinct.count.toInt
 77     val modelinfo = new ModelInfo(ntopics, nterms)
 78
 79     def nzw = {println("Hehe?");mwz.map(x=>(x._3,
x._2)).countByValue.toSeq}
 80     def nz = nzw.map(x=>(x._1._1,
x._2)).groupBy(_._1).mapValues(_.map(_._2).sum.toLong).toSeq
 81     def tinfo = new TopicInfo(this.nzw, this.nz)
 82
 83     def train(round:Int, innerRound:Int = 20, saveStep:Int = 5) {
 84         var step = 0
 85         val minfo = this.sc broadcast this.modelinfo
 86         println("Training Start!") //DEBUG
 87         for(i <- 0 until round) {
 88             println(s"Round ${i}")
 89             val tmptinfo = this.tinfo
 90             val runtinfo = this.sc broadcast tmptinfo
 91             println("started?") //DEBUG
 92             this.mwz =
this.mwz.mapPartitions(it=>GibbsMapper.mapper(minfo.value, runtinfo.value,
it.toSeq, innerRound).toIterator)
 93             println("failed?") //DEBUG
 94             step = (step + 1) % saveStep
 95             if (step == 0 && i+1 != round)
this.mwz.saveAsTextFile(s"hdfs://ns1/nlp/lda/solution.round.${i}")
 96         }
 97        
this.mwz.saveAsTextFile(s"hdfs://ns1/nlp/lda/solution.round.final")
 98     }
 99 }


Error Code Below:

14/04/29 09:48:42 INFO scheduler.DAGScheduler: Got job 3 (countByValue at
lda.scala:78) with 22 output partitions (allowLocal=false)
14/04/29 09:48:42 INFO scheduler.DAGScheduler: Final stage: Stage 4
(countByValue at lda.scala:78)
14/04/29 09:48:42 INFO scheduler.DAGScheduler: Parents of final stage:
List()
14/04/29 09:48:42 INFO scheduler.DAGScheduler: Missing parents: List()
14/04/29 09:48:42 INFO scheduler.DAGScheduler: Submitting Stage 4
(MapPartitionsRDD[15] at countByValue at lda.scala:78), which has no missing
parents
14/04/29 09:48:42 INFO scheduler.DAGScheduler: Failed to run countByValue at
lda.scala:78
14/04/29 09:48:42 INFO yarn.ApplicationMaster: finishApplicationMaster with
FAILED
14/04/29 09:48:42 INFO impl.AMRMClientImpl: Waiting for application to be
successfully unregistered.
Exception in thread "Thread-3" java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
        at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:601)
        at
org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:154)
Caused by: org.apache.spark.SparkException: Job aborted: Task not
serializable: java.io.NotSerializableException:
net.earthson.nlp.lda.ADLDAModel
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
        at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
        at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
        at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
        at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
        at akka.actor.ActorCell.invoke(ActorCell.scala:456)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
        at akka.dispatch.Mailbox.run(Mailbox.scala:219)
        at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
        at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
        at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
        at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
        at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/04/29 09:48:42 INFO yarn.ApplicationMaster: AppMaster received a signal.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-Spark-require-this-object-to-be-serializerable-tp5009.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to