Repository: spark
Updated Branches:
  refs/heads/master 3b2b785ec -> 4c9695598


[SPARK-16697][ML][MLLIB] improve LDA submitMiniBatch method to avoid redundant 
RDD computation

## What changes were proposed in this pull request?

In `LDAOptimizer.submitMiniBatch`, do persist on `stats: RDD[(BDM[Double], 
List[BDV[Double]])]`
and also move the place of unpersisting `expElogbetaBc` broadcast variable,
to avoid the `expElogbetaBc` broadcast variable to be unpersisted too early,
and update previous `expElogbetaBc.unpersist()` into 
`expElogbetaBc.destroy(false)`

## How was this patch tested?

Existing test.

Author: WeichenXu <weichenxu...@outlook.com>

Closes #14335 from WeichenXu123/improve_LDA.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c969559
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c969559
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c969559

Branch: refs/heads/master
Commit: 4c9695598ee00f68aff4eb32d4629edf6facb29f
Parents: 3b2b785
Author: WeichenXu <weichenxu...@outlook.com>
Authored: Tue Jul 26 10:41:41 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Jul 26 10:41:41 2016 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4c969559/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
index e2c6aca..ae324f8 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LDAOptimizer.scala
@@ -28,6 +28,7 @@ import org.apache.spark.graphx._
 import org.apache.spark.mllib.impl.PeriodicGraphCheckpointer
 import org.apache.spark.mllib.linalg.{DenseVector, Matrices, SparseVector, 
Vector, Vectors}
 import org.apache.spark.rdd.RDD
+import org.apache.spark.storage.StorageLevel
 
 /**
  * :: DeveloperApi ::
@@ -472,12 +473,13 @@ final class OnlineLDAOptimizer extends LDAOptimizer {
         gammaPart = gammad :: gammaPart
       }
       Iterator((stat, gammaPart))
-    }
+    }.persist(StorageLevel.MEMORY_AND_DISK)
     val statsSum: BDM[Double] = 
stats.map(_._1).treeAggregate(BDM.zeros[Double](k, vocabSize))(
       _ += _, _ += _)
-    expElogbetaBc.unpersist()
     val gammat: BDM[Double] = breeze.linalg.DenseMatrix.vertcat(
       stats.map(_._2).flatMap(list => list).collect().map(_.toDenseMatrix): _*)
+    stats.unpersist()
+    expElogbetaBc.destroy(false)
     val batchResult = statsSum :* expElogbeta.t
 
     // Note that this is an optimization to avoid batch.count


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to