Github user hhbyyh commented on a diff in the pull request:

    https://github.com/apache/spark/pull/18924#discussion_r143081342
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---
    @@ -462,31 +463,60 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         val expElogbetaBc = batch.sparkContext.broadcast(expElogbeta)
         val alpha = this.alpha.asBreeze
         val gammaShape = this.gammaShape
    -
    -    val stats: RDD[(BDM[Double], List[BDV[Double]])] = batch.mapPartitions 
{ docs =>
    +    val optimizeDocConcentration = this.optimizeDocConcentration
    +    // If and only if optimizeDocConcentration is set true,
    +    // we calculate logphat in the same pass as other statistics.
    +    // No calculation of loghat happens otherwise.
    +    val logphatPartOptionBase = () => if (optimizeDocConcentration) {
    +                                          Some(BDV.zeros[Double](k))
    +                                        } else {
    +                                          None
    +                                        }
    +
    +    val stats: RDD[(BDM[Double], Option[BDV[Double]], Long)] = 
batch.mapPartitions { docs =>
           val nonEmptyDocs = docs.filter(_._2.numNonzeros > 0)
     
           val stat = BDM.zeros[Double](k, vocabSize)
    -      var gammaPart = List[BDV[Double]]()
    +      val logphatPartOption = logphatPartOptionBase()
    +      var nonEmptyDocCount : Long = 0L
           nonEmptyDocs.foreach { case (_, termCounts: Vector) =>
    +        nonEmptyDocCount += 1
             val (gammad, sstats, ids) = 
OnlineLDAOptimizer.variationalTopicInference(
               termCounts, expElogbetaBc.value, alpha, gammaShape, k)
    -        stat(::, ids) := stat(::, ids).toDenseMatrix + sstats
    -        gammaPart = gammad :: gammaPart
    +        stat(::, ids) := stat(::, ids) + sstats
    +        logphatPartOption.foreach(_ += 
LDAUtils.dirichletExpectation(gammad))
           }
    -      Iterator((stat, gammaPart))
    -    }.persist(StorageLevel.MEMORY_AND_DISK)
    -    val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
    -      _ += _, _ += _)
    -    val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
    -      stats.map(_._2).flatMap(list => 
list).collect().map(_.toDenseMatrix): _*)
    -    stats.unpersist()
    -    expElogbetaBc.destroy(false)
    -    val batchResult = statsSum *:* expElogbeta.t
    +      Iterator((stat, logphatPartOption, nonEmptyDocCount))
    +    }
     
    +    val elementWiseSum = (u : (BDM[Double], Option[BDV[Double]], Long),
    +                          v : (BDM[Double], Option[BDV[Double]], Long)) => 
{
    +      u._1 += v._1
    +      u._2.foreach(_ += v._2.get)
    +      (u._1, u._2, u._3 + v._3)
    +    }
    +
    +    val (statsSum: BDM[Double], logphatOption: Option[BDV[Double]], 
nonEmptyDocsN: Long) = stats
    +      .treeAggregate((BDM.zeros[Double](k, vocabSize), 
logphatPartOptionBase(), 0L))(
    +        elementWiseSum, elementWiseSum
    +      )
    +
    +    if (nonEmptyDocsN == 0) {
    --- End diff --
    
    I don't think that's the case here. But as long as all the cleanup work is 
done, I would not mind it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to