I've moved SparkContext and RDD as parameter of train. And now it tells me
that SparkContext need to serialize!

I think the the problem is RDD is trying to make itself lazy. and some
BroadCast Object need to be generate dynamicly, so the closure have
SparkContext inside, so the task complete failed. Is this true?

I have no idea about how to solve such a problem. I don't know how to make a
RDD non-lazy. rdd.cache seems to be lazy too. 

So my only choice seems to be save every step into hdfs and load it
everytime?


 72 object LDAInfo {
 73     type LDAData = rdd.RDD[(Long,String,Int)]
 74
 75     def topicInfo(mwz:LDAInfo.LDAData) = {
 76         val nzw = mwz.map(x=>(x._3, x._2)).countByValue.toSeq
 77         val nz = nzw.map(x=>(x._1._1,
x._2)).groupBy(_._1).mapValues(_.map(_._2).sum.toLong).toSeq
 78         new TopicInfo(nzw, nz)
 79     }
 80 }
 81
 82 class ADLDAModel(val ntopics:Int, mwz:LDAInfo.LDAData) {
 83     //TODO: flexible save location and input location
 84     //TODO: Add Perplexity
 85
 86     val nterms = mwz.map(_._2).distinct.count.toInt
 87     val modelinfo = new ModelInfo(ntopics, nterms)
 88
 89     def train(sc:SparkContext, mwzo:LDAInfo.LDAData, round:Int,
innerRound:Int = 20, saveStep:Int = 5) {
 90         var step = 0
 91         var mwz = mwzo
 92         val minfo = sc broadcast this.modelinfo
 93         println("Training Start!") //DEBUG
 94         for(i <- 0 until round) {
 95             println(s"Round ${i}")
 96             val runtinfo = sc broadcast LDAInfo.topicInfo(mwz)
 97             println("started?") //DEBUG
 98             mwz = mwz.mapPartitions(it=>GibbsMapper.mapper(minfo.value,
runtinfo.value, it.toSeq, innerRound).toIterator).cache
 99             println("failed?") //DEBUG
100             step = (step + 1) % saveStep
101             if (step == 0 && i+1 != round)
mwz.saveAsTextFile(s"hdfs://ns1/nlp/lda/solution.round.${i}")
102         }
103         mwz.saveAsTextFile(s"hdfs://ns1/nlp/lda/solution.round.final")
104     }
105 }



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Why-Spark-require-this-object-to-be-serializerable-tp5009p5020.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply via email to