Github user jkbradley commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4419#discussion_r29296395
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -208,3 +225,224 @@ class EMLDAOptimizer extends LDAOptimizer{
         new DistributedLDAModel(this, iterationTimes)
       }
     }
    +
    +
    +/**
    + * :: Experimental ::
    + *
    + * An online optimizer for LDA. The Optimizer implements the Online LDA 
algorithm, which
    + * processes a subset of the corpus by each call to next, and update the 
term-topic
    + * distribution adaptively.
    + *
    + * References:
    + *   Hoffman, Blei and Bach, "Online Learning for Latent Dirichlet 
Allocation." NIPS, 2010.
    + */
    +@Experimental
    +class OnlineLDAOptimizer extends LDAOptimizer {
    +
    +  // LDA common parameters
    +  private var k: Int = 0
    +  private var D: Int = 0
    +  private var vocabSize: Int = 0
    +  private var alpha: Double = 0
    +  private var eta: Double = 0
    +  private var randomSeed: Long = 0
    +
    +  // Online LDA specific parameters
    +  private var tau_0: Double = -1
    +  private var kappa: Double = -1
    +  private var batchSize: Int = -1
    +
    +  // internal data structure
    +  private var docs: RDD[(Long, Vector)] = null
    +  private var lambda: BDM[Double] = null
    +  private var Elogbeta: BDM[Double]= null
    +  private var expElogbeta: BDM[Double] = null
    +
    +  // count of invocation to next, used to help deciding the weight for 
each iteration
    +  private var iteration = 0
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   */
    +  def getTau_0: Double = {
    +    if (this.tau_0 == -1) {
    +      1024
    +    } else {
    +      this.tau_0
    +    }
    +  }
    +
    +  /**
    +   * A (positive) learning parameter that downweights early iterations
    +   * Automatic setting of parameter:
    +   *  - default = 1024, which follows the recommendation from OnlineLDA 
paper.
    +   */
    +  def setTau_0(tau_0: Double): this.type = {
    +    require(tau_0 > 0 || tau_0 == -1.0,  s"LDA tau_0 must be positive, but 
was set to $tau_0")
    +    this.tau_0 = tau_0
    +    this
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate
    +   */
    +  def getKappa: Double = {
    +    if (this.kappa == -1) {
    +      0.5
    +    } else {
    +      this.kappa
    +    }
    +  }
    +
    +  /**
    +   * Learning rate: exponential decay rate---should be between
    +   * (0.5, 1.0] to guarantee asymptotic convergence.
    +   *  - default = 0.5, which follows the recommendation from OnlineLDA 
paper.
    +   */
    +  def setKappa(kappa: Double): this.type = {
    +    require(kappa >= 0 || kappa == -1.0,
    +      s"Online LDA kappa must be nonnegative (or -1 for auto), but was set 
to $kappa")
    +    this.kappa = kappa
    +    this
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each 
iteration
    +   */
    +  def getBatchSize: Int = {
    +    if (this.batchSize == -1) {
    +      D / 100
    +    } else {
    +      this.batchSize
    +    }
    +  }
    +
    +  /**
    +   * Mini-batch size, which controls how many documents are used in each 
iteration
    +   * default = 1% from total documents.
    +   */
    +  def setBatchSize(batchSize: Int): this.type = {
    +    this.batchSize = batchSize
    +    this
    +  }
    +
    +  private[clustering] override def initialize(docs: RDD[(Long, Vector)], 
lda: LDA): LDAOptimizer = {
    +
    +    this.k = lda.getK
    +    this.D = docs.count().toInt
    +    this.vocabSize = docs.first()._2.size
    +    this.alpha = if (lda.getDocConcentration == -1) 1.0 / k else 
lda.getDocConcentration
    +    this.eta = if (lda.getTopicConcentration == -1) 1.0 / k else 
lda.getTopicConcentration
    +    this.randomSeed = randomSeed
    +
    +    this.docs = docs
    +
    +    // Initialize the variational distribution q(beta|lambda)
    +    this.lambda = getGammaMatrix(k, vocabSize)
    +    this.Elogbeta = dirichlet_expectation(lambda)
    +    this.expElogbeta = exp(Elogbeta)
    +    this.iteration = 0
    +    this
    +  }
    +
    +  /**
    +   * Submit a a subset (like 1%, decide by the batchSize) of the corpus to 
the Online LDA model,
    +   * and it will update the topic distribution adaptively for the terms 
appearing in the subset.
    +   *
    +   * @return  Inferred LDA model
    +   */
    +  private[clustering] override def next(): OnlineLDAOptimizer = {
    +    iteration += 1
    +    val batchSize = getBatchSize
    +    val batch = docs.sample(true, batchSize.toDouble / D, 
randomSeed).cache()
    +    if(batch.isEmpty()) return this
    +
    +    val k = this.k
    +    val vocabSize = this.vocabSize
    +    val expElogbeta = this.expElogbeta
    +    val alpha = this.alpha
    +
    +    val stats = batch.mapPartitions(docs =>{
    +      val stat = BDM.zeros[Double](k, vocabSize)
    +      docs.foreach(doc =>{
    +        val termCounts = doc._2
    +        val (ids, cts) = termCounts match {
    +          case v: DenseVector => (((0 until v.size).toList), v.values)
    +          case v: SparseVector => (v.indices.toList, v.values)
    +          case v => throw new IllegalArgumentException("Do not support 
vector type " + v.getClass)
    +        }
    +
    +        // Initialize the variational distribution q(theta|gamma) for the 
mini-batch
    +        var gammad = new Gamma(100, 1.0 / 100.0).samplesVector(k).t // 1 * 
K
    +        var Elogthetad = digamma(gammad) - digamma(sum(gammad))     // 1 * 
K
    +        var expElogthetad = exp(Elogthetad)                         // 1 * 
K
    +        val expElogbetad = expElogbeta(::, ids).toDenseMatrix       // K * 
ids
    +
    +        var phinorm = expElogthetad * expElogbetad + 1e-100         // 1 * 
ids
    +        var meanchange = 1D
    +        val ctsVector = new BDV[Double](cts).t                      // 1 * 
ids
    +
    +        // Iterate between gamma and phi until convergence
    +        while (meanchange > 1e-5) {
    +          val lastgamma = gammad
    +          //        1*K                  1 * ids               ids * k
    +          gammad = (expElogthetad :* ((ctsVector / phinorm) * 
(expElogbetad.t))) + alpha
    +          Elogthetad = digamma(gammad) - digamma(sum(gammad))
    +          expElogthetad = exp(Elogthetad)
    +          phinorm = expElogthetad * expElogbetad + 1e-100
    +          meanchange = sum(abs(gammad - lastgamma)) / k
    +        }
    +
    +        val m1 = expElogthetad.t.toDenseMatrix.t
    +        val m2 = (ctsVector / phinorm).t.toDenseMatrix
    +        val outerResult = kron(m1, m2) // K * ids
    +        for (i <- 0 until ids.size) {
    +          stat(::, ids(i)) := (stat(::, ids(i)) + outerResult(::, i))
    +        }
    +        stat
    +      })
    +      Iterator(stat)
    +    })
    +
    +    val batchResult = stats.reduce(_ += _)
    +    update(batchResult, iteration, batchSize)
    +    batch.unpersist()
    +    this
    +  }
    +
    +  private[clustering] override def getLDAModel(iterationTimes: 
Array[Double]): LDAModel = {
    +    new LocalLDAModel(Matrices.fromBreeze(lambda).transpose)
    +  }
    +
    +  private def update(raw: BDM[Double], iter:Int, batchSize: Int): Unit ={
    --- End diff --
    
    Please add a little doc, even though it's an internal method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to