I've moved SparkContext and RDD as parameter of train. And now it tells me that SparkContext need to serialize!
I think the the problem is RDD is trying to make itself lazy. and some BroadCast Object need to be generate dynamicly, so the closure have SparkContext inside, so the task complete failed. Is this true? I have no idea about how to solve such a problem. I don't know how to make a RDD non-lazy. rdd.cache seems to be lazy too. So my only choice seems to be save every step into hdfs and load it everytime? 72 object LDAInfo { 73 type LDAData = rdd.RDD[(Long,String,Int)] 74 75 def topicInfo(mwz:LDAInfo.LDAData) = { 76 val nzw = mwz.map(x=>(x._3, x._2)).countByValue.toSeq 77 val nz = nzw.map(x=>(x._1._1, x._2)).groupBy(_._1).mapValues(_.map(_._2).sum.toLong).toSeq 78 new TopicInfo(nzw, nz) 79 } 80 } 81 82 class ADLDAModel(val ntopics:Int, mwz:LDAInfo.LDAData) { 83 //TODO: flexible save location and input location 84 //TODO: Add Perplexity 85 86 val nterms = mwz.map(_._2).distinct.count.toInt 87 val modelinfo = new ModelInfo(ntopics, nterms) 88 89 def train(sc:SparkContext, mwzo:LDAInfo.LDAData, round:Int, innerRound:Int = 20, saveStep:Int = 5) { 90 var step = 0 91 var mwz = mwzo 92 val minfo = sc broadcast this.modelinfo 93 println("Training Start!") //DEBUG 94 for(i <- 0 until round) { 95 println(s"Round ${i}") 96 val runtinfo = sc broadcast LDAInfo.topicInfo(mwz) 97 println("started?") //DEBUG 98 mwz = mwz.mapPartitions(it=>GibbsMapper.mapper(minfo.value, runtinfo.value, it.toSeq, innerRound).toIterator).cache 99 println("failed?") //DEBUG 100 step = (step + 1) % saveStep 101 if (step == 0 && i+1 != round) mwz.saveAsTextFile(s"hdfs://ns1/nlp/lda/solution.round.${i}") 102 } 103 mwz.saveAsTextFile(s"hdfs://ns1/nlp/lda/solution.round.final") 104 } 105 } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Why-Spark-require-this-object-to-be-serializerable-tp5009p5020.html Sent from the Apache Spark User List mailing list archive at Nabble.com.