Repository: spark
Updated Branches:
  refs/heads/master 1c6cf1a56 -> 78015a8b7


[SPARK-12450][MLLIB] Un-persist broadcasted variables in KMeans

SPARK-12450 . Un-persist broadcasted variables in KMeans.

Author: RJ Nowling <rnowl...@gmail.com>

Closes #10415 from rnowling/spark-12450.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78015a8b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78015a8b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78015a8b

Branch: refs/heads/master
Commit: 78015a8b7cc316343e302eeed6fe30af9f2961e8
Parents: 1c6cf1a
Author: RJ Nowling <rnowl...@gmail.com>
Authored: Tue Jan 5 15:05:04 2016 -0800
Committer: Joseph K. Bradley <jos...@databricks.com>
Committed: Tue Jan 5 15:05:04 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/mllib/clustering/KMeans.scala     | 8 ++++++++
 1 file changed, 8 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/78015a8b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index 2895db7..e47c4db 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -301,6 +301,8 @@ class KMeans private (
         contribs.iterator
       }.reduceByKey(mergeContribs).collectAsMap()
 
+      bcActiveCenters.unpersist(blocking = false)
+
       // Update the cluster centers and costs for each active run
       for ((run, i) <- activeRuns.zipWithIndex) {
         var changed = false
@@ -419,7 +421,10 @@ class KMeans private (
             s0
           }
         )
+
+      bcNewCenters.unpersist(blocking = false)
       preCosts.unpersist(blocking = false)
+
       val chosen = data.zip(costs).mapPartitionsWithIndex { (index, 
pointsWithCosts) =>
         val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
         pointsWithCosts.flatMap { case (p, c) =>
@@ -448,6 +453,9 @@ class KMeans private (
         ((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0)
       }
     }.reduceByKey(_ + _).collectAsMap()
+
+    bcCenters.unpersist(blocking = false)
+
     val finalCenters = (0 until runs).par.map { r =>
       val myCenters = centers(r).toArray
       val myWeights = (0 until myCenters.length).map(i => 
weightMap.getOrElse((r, i), 0.0)).toArray


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to