Your code is unformatted. Can u paste the whole file in gist and i can take
a look for u.
On Apr 28, 2014 10:42 PM, "Earthson" <earthson...@gmail.com> wrote:

> I've moved SparkContext and RDD as parameter of train. And now it tells me
> that SparkContext need to serialize!
>
> I think the the problem is RDD is trying to make itself lazy. and some
> BroadCast Object need to be generate dynamicly, so the closure have
> SparkContext inside, so the task complete failed. Is this true?
>
> I have no idea about how to solve such a problem. I don't know how to make
> a
> RDD non-lazy. rdd.cache seems to be lazy too.
>
> So my only choice seems to be save every step into hdfs and load it
> everytime?
>
>
>  72 object LDAInfo {
>  73     type LDAData = rdd.RDD[(Long,String,Int)]
>  74
>  75     def topicInfo(mwz:LDAInfo.LDAData) = {
>  76         val nzw = mwz.map(x=>(x._3, x._2)).countByValue.toSeq
>  77         val nz = nzw.map(x=>(x._1._1,
> x._2)).groupBy(_._1).mapValues(_.map(_._2).sum.toLong).toSeq
>  78         new TopicInfo(nzw, nz)
>  79     }
>  80 }
>  81
>  82 class ADLDAModel(val ntopics:Int, mwz:LDAInfo.LDAData) {
>  83     //TODO: flexible save location and input location
>  84     //TODO: Add Perplexity
>  85
>  86     val nterms = mwz.map(_._2).distinct.count.toInt
>  87     val modelinfo = new ModelInfo(ntopics, nterms)
>  88
>  89     def train(sc:SparkContext, mwzo:LDAInfo.LDAData, round:Int,
> innerRound:Int = 20, saveStep:Int = 5) {
>  90         var step = 0
>  91         var mwz = mwzo
>  92         val minfo = sc broadcast this.modelinfo
>  93         println("Training Start!") //DEBUG
>  94         for(i <- 0 until round) {
>  95             println(s"Round ${i}")
>  96             val runtinfo = sc broadcast LDAInfo.topicInfo(mwz)
>  97             println("started?") //DEBUG
>  98             mwz = mwz.mapPartitions(it=>GibbsMapper.mapper(minfo.value,
> runtinfo.value, it.toSeq, innerRound).toIterator).cache
>  99             println("failed?") //DEBUG
> 100             step = (step + 1) % saveStep
> 101             if (step == 0 && i+1 != round)
> mwz.saveAsTextFile(s"hdfs://ns1/nlp/lda/solution.round.${i}")
> 102         }
> 103         mwz.saveAsTextFile(s"hdfs://ns1/nlp/lda/solution.round.final")
> 104     }
> 105 }
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Why-Spark-require-this-object-to-be-serializerable-tp5009p5020.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>

Reply via email to