Github user witgo commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1983#discussion_r17490277
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/LDA.scala 
---
    @@ -0,0 +1,397 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import java.util.Random
    +import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => 
BV}
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.mllib.linalg.{DenseVector => SDV, SparseVector => 
SSV, Vector => SV}
    +import org.apache.spark.mllib.linalg.Vectors
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.storage.StorageLevel
    +
    +case class Document(docId: Int, content: Array[Int]) {
    +  private[spark] var topics: Array[Int] = null
    +  private[spark] var topicsDist: BV[Double] = null
    +}
    +
    +class TopicModel private[spark](
    +  private[spark] val topicCounts_ : BDV[Double],
    +  private[spark] val topicTermCounts_ : Array[BSV[Double]],
    +  private[spark] val alpha: Double,
    +  private[spark] val beta: Double
    +) extends Serializable {
    +
    +  def this(topicCounts_ : SDV, topicTermCounts_ : Array[SSV], alpha: 
Double, beta: Double) {
    +    this(new BDV[Double](topicCounts_.toArray), topicTermCounts_.map(t =>
    +      new BSV(t.indices, t.values, t.size)), alpha, beta)
    +  }
    +
    +  def topicCounts = Vectors.dense(topicCounts_.toArray)
    +
    +  def topicTermCounts = topicTermCounts_.map(t => Vectors.sparse(t.size, 
t.activeIterator.toSeq))
    +
    +  def update(term: Int, topic: Int, inc: Int) = {
    +    topicCounts_(topic) += inc
    +    topicTermCounts_(topic)(term) += inc
    +    this
    +  }
    +
    +  def merge(other: TopicModel) = {
    +    topicCounts_ += other.topicCounts_
    +    var i = 0
    +    while (i < topicTermCounts_.length) {
    +      topicTermCounts_(i) += other.topicTermCounts_(i)
    +      i += 1
    +    }
    +    this
    +  }
    +
    +  def infer(
    +    doc: Document, totalIter: Int = 10,
    +    burnInIter: Int = 5, rand: Random = new Random()
    +  ): SV = {
    +    require(totalIter > burnInIter, "totalIter is less than burnInIter")
    +    require(totalIter > 0, "totalIter is less than 0")
    +    require(burnInIter > 0, "burnInIter is less than 0")
    +    val numTopics = topicCounts_.size
    +    val topicThisTerm = BDV.zeros[Double](numTopics)
    +    val topicThisDoc = BSV.zeros[Double](numTopics)
    +
    +    for (i <- 1 to totalIter) {
    +      generateTopicDistForDocument(doc, topicThisTerm, rand)
    +      if (i > burnInIter) {
    +        topicThisDoc :+= doc.topicsDist
    +      }
    +    }
    +    topicThisDoc :/= (totalIter - burnInIter).toDouble
    +
    +    new SDV(topicThisDoc.toArray)
    +  }
    +
    +  /**
    +   * This function used for computing the new distribution after drop one 
from current document,
    +   * which is a really essential part of Gibbs sampling for LDA, you can 
refer to the paper:
    +   * <I>Parameter estimation for text analysis<I>
    +   */
    +  private[spark] def dropOneDistSampler(
    +    docTopicCount: BV[Double],
    +    topicThisTerm: BDV[Double],
    +    rand: Random, term: Int,
    +    currentTopic: Int
    +  ): Int = {
    +    val ttm = generateTopicDistForTerm(docTopicCount, topicThisTerm, term,
    +      currentTopic, isTrainModel = true)
    +    CGS.multinomialDistSampler(rand, ttm)
    +  }
    +
    +  private[spark] def generateTopicDistForTerm(
    +    docTopicCount: BV[Double],
    +    topicThisTerm: BDV[Double],
    +    term: Int,
    +    currentTopic: Int = -1,
    +    isTrainModel: Boolean = false
    +  ): BDV[Double] = {
    +    val numTopics = topicCounts_.size
    +    val numTerms = topicTermCounts_.head.size
    +    var i = 0
    +    while (i < numTopics) {
    +      val adjustment = if (isTrainModel && i == currentTopic) -1 else 0
    +      topicThisTerm(i) = (topicTermCounts_(i)(term) + adjustment + beta) /
    +        (topicCounts_(i) + adjustment + (numTerms * beta)) *
    +        (docTopicCount(i) + adjustment + alpha)
    +      i += 1
    +    }
    +    topicThisTerm
    +  }
    +
    +  private[spark] def generateTopicDistForDocument(
    +    doc: Document, topicThisTerm: BDV[Double],
    +    rand: Random
    +  ): Unit = {
    +    val content = doc.content
    +    val numTopics = topicCounts_.size
    +    var currentTopicDist = doc.topicsDist
    +    var currentTopics = doc.topics
    +    if (currentTopicDist != null) {
    +      for (i <- 0 until content.length) {
    +        val term = content(i)
    +        val topic = currentTopics(i)
    +        val dist = generateTopicDistForTerm(currentTopicDist, 
topicThisTerm, term)
    +        val lastTopic = CGS.multinomialDistSampler(rand, dist)
    +        if (lastTopic != topic) {
    +          currentTopics(i) = lastTopic
    +          currentTopicDist(topic) += -1
    +          currentTopicDist(lastTopic) += 1
    +        }
    +      }
    +    }
    +    else {
    +      currentTopicDist = BSV.zeros[Double](numTopics)
    +      currentTopics = content.map { term =>
    +        val lastTopic = CGS.uniformDistSampler(rand, numTopics)
    +        currentTopicDist(lastTopic) += 1
    +        lastTopic
    +      }
    +      doc.topicsDist = currentTopicDist
    +      doc.topics = currentTopics
    +    }
    +  }
    +
    +  private[spark] def phi(topic: Int, term: Int): Double = {
    +    val numTerms = topicTermCounts_.head.size
    +    (topicTermCounts_(topic)(term) + beta) / (topicCounts_(topic) + 
numTerms * beta)
    +  }
    +}
    +
    +object TopicModel {
    +  def apply(numTopics: Int, numTerms: Int, alpha: Double = 0.1, beta: 
Double = 0.01) = {
    +    new TopicModel(
    +      BDV.zeros[Double](numTopics),
    +      Array(0 until numTopics: _*).map(_ => BSV.zeros[Double](numTerms)),
    +      alpha, beta)
    +  }
    +}
    +
    +class LDA private(
    +  var numTopics: Int,
    +  var numTerms: Int,
    +  totalIter: Int,
    +  burnInIter: Int,
    +  var alpha: Double,
    +  var beta: Double
    +) extends Serializable with Logging {
    +  def run(input: RDD[Document]): (TopicModel, RDD[Document]) = {
    +    val initModel = TopicModel(numTopics, numTerms, alpha, beta)
    +    CGS.runGibbsSampling(input, initModel, totalIter, burnInIter)
    +  }
    +}
    +
    +object LDA extends Logging {
    +  def train(
    +    data: RDD[Document],
    +    numTerms: Int,
    +    numTopics: Int,
    +    totalIter: Int,
    +    burnInIter: Int,
    +    alpha: Double,
    +    beta: Double
    +  ): (TopicModel, RDD[Document]) = {
    +    val lda = new LDA(numTopics, numTerms, totalIter, burnInIter, alpha, 
beta)
    +    lda.run(data)
    +  }
    +
    +  /**
    +   * Perplexity is a kind of evaluation method of LDA. Usually it is used 
on unseen data. But here
    +   * we use it for current documents, which is also OK. If using it on 
unseen data, you must do an
    +   * iteration of Gibbs sampling before calling this. Small perplexity 
means good result.
    +   */
    +  def perplexity(data: RDD[Document], computedModel: TopicModel): Double = 
{
    +    val broadcastModel = data.context.broadcast(computedModel)
    +    val (termProb, totalNum) = data.mapPartitions { docs =>
    +      val model = broadcastModel.value
    +      val numTopics = model.topicCounts_.size
    +      val numTerms = model.topicTermCounts_.head.size
    +      val rand = new Random
    +      val alpha = model.alpha
    +      val currentTheta = BDV.zeros[Double](numTerms)
    +      val topicThisTerm = BDV.zeros[Double](numTopics)
    +      docs.flatMap { doc =>
    +        val content = doc.content
    +        if (doc.topicsDist != null) {
    +          model.generateTopicDistForDocument(doc, topicThisTerm, rand)
    +        }
    +        content.foreach(term => currentTheta(term) = 0)
    +        content.foreach { term =>
    +          (0 until numTopics).foreach { topic =>
    +            currentTheta(term) += model.phi(topic, term) * 
((doc.topicsDist(topic) + alpha) /
    +              (content.size + alpha * numTopics))
    +          }
    +        }
    +        content.map(x => (math.log(currentTheta(x)), 1))
    +      }
    +    }.reduce { (lhs, rhs) =>
    +      (lhs._1 + rhs._1, lhs._2 + rhs._2)
    +    }
    +    math.exp(-1 * termProb / totalNum)
    +  }
    +
    +}
    +
    +/**
    + * Collapsed Gibbs sampling from for Latent Dirichlet Allocation.
    + */
    +private[spark] object CGS extends Logging {
    +
    +  /**
    +   * Main function of running a Gibbs sampling method. It contains two 
phases of total Gibbs
    +   * sampling: first is initialization, second is real sampling.
    +   */
    +  def runGibbsSampling(
    +    data: RDD[Document], initModel: TopicModel,
    +    totalIter: Int, burnInIter: Int
    +  ): (TopicModel, RDD[Document]) = {
    +    require(totalIter > burnInIter, "totalIter is less than burnInIter")
    +    require(totalIter > 0, "totalIter is less than 0")
    +    require(burnInIter > 0, "burnInIter is less than 0")
    +
    +    val (numTopics, numTerms, alpha, beta) = (initModel.topicCounts_.size,
    +      initModel.topicTermCounts_.head.size,
    +      initModel.alpha, initModel.beta)
    +    val probModel = TopicModel(numTopics, numTerms, alpha, beta)
    +
    +    logInfo("Start initialization")
    +    var (topicModel, corpus) = sampleTermAssignment(data, initModel)
    +
    +    for (iter <- 1 to totalIter) {
    +      logInfo("Start Gibbs sampling (Iteration %d/%d)".format(iter, 
totalIter))
    +      val broadcastModel = data.context.broadcast(topicModel)
    +      val previousCorpus = corpus
    +      corpus = corpus.mapPartitions { docs =>
    --- End diff --
    
    @rxin @mengxr
     `mapPartitions`  方法似乎没有正确清理. 
序列化后的`corpus`的RDD和序列化后`topicModel ` broadcast 
差不多一样大.
    ` mapPartitions ` method seems to be no correct cleaning. The serialized  
`corpus` RDD and serialized ` topicModel ` broadcast almost as big.
    ` cat spark.log | grep 'stored as values in memory` =>
    ```
    14/09/13 00:47:59 INFO MemoryStore: Block broadcast_0 stored as values in 
memory (estimated size 218.2 KB, free 2.8 GB)
    14/09/13 00:48:04 INFO MemoryStore: Block broadcast_1 stored as values in 
memory (estimated size 3.1 KB, free 2.8 GB)
    14/09/13 00:48:08 INFO MemoryStore: Block broadcast_2 stored as values in 
memory (estimated size 3.7 KB, free 2.8 GB)
    14/09/13 00:48:20 INFO MemoryStore: Block broadcast_3 stored as values in 
memory (estimated size 2.4 KB, free 2.8 GB)
    14/09/13 00:48:23 INFO MemoryStore: Block broadcast_4 stored as values in 
memory (estimated size 2.6 KB, free 2.8 GB)
    14/09/13 00:48:25 INFO MemoryStore: Block broadcast_5 stored as values in 
memory (estimated size 2.6 KB, free 2.8 GB)
    14/09/13 00:48:25 INFO MemoryStore: Block broadcast_6 stored as values in 
memory (estimated size 3.1 KB, free 2.8 GB)
    14/09/13 00:48:30 INFO MemoryStore: Block broadcast_7 stored as values in 
memory (estimated size 2.9 KB, free 2.8 GB)
    14/09/13 00:48:35 INFO MemoryStore: Block broadcast_8 stored as values in 
memory (estimated size 3.2 KB, free 2.8 GB)
    14/09/13 00:48:44 INFO MemoryStore: Block broadcast_9 stored as values in 
memory (estimated size 68.6 KB, free 2.8 GB)
    14/09/13 00:48:45 INFO MemoryStore: Block broadcast_10 stored as values in 
memory (estimated size 41.7 KB, free 2.8 GB)
    14/09/13 00:49:21 INFO MemoryStore: Block broadcast_11 stored as values in 
memory (estimated size 197.5 MB, free 2.6 GB)
    14/09/13 00:49:24 INFO MemoryStore: Block broadcast_12 stored as values in 
memory (estimated size 197.7 MB, free 2.3 GB)
    14/09/13 00:53:25 INFO MemoryStore: Block broadcast_13 stored as values in 
memory (estimated size 163.9 MB, free 2.1 GB)
    14/09/13 00:53:28 INFO MemoryStore: Block broadcast_14 stored as values in 
memory (estimated size 164.0 MB, free 1878.0 MB)
    14/09/13 00:57:34 INFO MemoryStore: Block broadcast_15 stored as values in 
memory (estimated size 149.7 MB, free 1658.5 MB)
    14/09/13 00:57:36 INFO MemoryStore: Block broadcast_16 stored as values in 
memory (estimated size 150.0 MB, free 1444.0 MB)
    14/09/13 01:01:34 INFO MemoryStore: Block broadcast_17 stored as values in 
memory (estimated size 141.1 MB, free 1238.3 MB)
    14/09/13 01:01:36 INFO MemoryStore: Block broadcast_18 stored as values in 
memory (estimated size 141.2 MB, free 1036.2 MB)
    14/09/13 01:05:12 INFO MemoryStore: Block broadcast_19 stored as values in 
memory (estimated size 134.5 MB, free 840.7 MB)
    14/09/13 01:05:14 INFO MemoryStore: Block broadcast_20 stored as values in 
memory (estimated size 134.7 MB, free 647.8 MB)
    14/09/13 01:08:39 INFO MemoryStore: Block broadcast_21 stored as values in 
memory (estimated size 218.3 KB, free 589.5 MB)
    14/09/13 01:08:39 INFO MemoryStore: Block broadcast_22 stored as values in 
memory (estimated size 218.3 KB, free 589.2 MB)
    14/09/13 01:08:40 INFO MemoryStore: Block broadcast_23 stored as values in 
memory (estimated size 134.6 MB, free 454.6 MB)
    14/09/13 01:08:53 INFO MemoryStore: Block broadcast_24 stored as values in 
memory (estimated size 129.3 MB, free 267.1 MB)
    14/09/13 01:08:55 INFO MemoryStore: Block broadcast_25 stored as values in 
memory (estimated size 129.4 MB, free 82.0 MB)
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to