Repository: spark Updated Branches: refs/heads/master 3b2b785ec -> 4c9695598
[SPARK-16697][ML][MLLIB] improve LDA submitMiniBatch method to avoid redundant RDD computation ## What changes were proposed in this pull request? In `LDAOptimizer.submitMiniBatch`, do persist on `stats: RDD[(BDM[Double], List[BDV[Double]])]` and also move the place of unpersisting `expElogbetaBc` broadcast variable, to avoid the `expElogbetaBc` broadcast variable to be unpersisted too early, and update previous `expElogbetaBc.unpersist()` into `expElogbetaBc.destroy(false)` ## How was this patch tested? Existing test. Author: WeichenXu <weichenxu...@outlook.com> Closes #14335 from WeichenXu123/improve_LDA. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c969559 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c969559 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c969559 Branch: refs/heads/master Commit: 4c9695598ee00f68aff4eb32d4629edf6facb29f Parents: 3b2b785 Author: WeichenXu <weichenxu...@outlook.com> Authored: Tue Jul 26 10:41:41 2016 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Tue Jul 26 10:41:41 2016 +0100 ---------------------------------------------------------------------- .../scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/4c969559/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala index e2c6aca..ae324f8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala @@ -28,6 +28,7 @@ import org.apache.spark.graphx._ import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, Vector, Vectors} import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel /** * :: DeveloperApi :: @@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer { gammaPart = gammad :: gammaPart } Iterator((stat, gammaPart)) - } + }.persist(StorageLevel.MEMORY_AND_DISK) val statsSum: BDM[Double] = stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))( _ += _, _ += _) - expElogbetaBc.unpersist() val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat( stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*) + stats.unpersist() + expElogbetaBc.destroy(false) val batchResult = statsSum :* expElogbeta.t // Note that this is an optimization to avoid batch.count --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org