Repository: spark Updated Branches: refs/heads/master 1c6cf1a56 -> 78015a8b7
[SPARK-12450][MLLIB] Un-persist broadcasted variables in KMeans SPARK-12450 . Un-persist broadcasted variables in KMeans. Author: RJ Nowling <rnowl...@gmail.com> Closes #10415 from rnowling/spark-12450. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78015a8b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78015a8b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78015a8b Branch: refs/heads/master Commit: 78015a8b7cc316343e302eeed6fe30af9f2961e8 Parents: 1c6cf1a Author: RJ Nowling <rnowl...@gmail.com> Authored: Tue Jan 5 15:05:04 2016 -0800 Committer: Joseph K. Bradley <jos...@databricks.com> Committed: Tue Jan 5 15:05:04 2016 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/mllib/clustering/KMeans.scala | 8 ++++++++ 1 file changed, 8 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/78015a8b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 2895db7..e47c4db 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -301,6 +301,8 @@ class KMeans private ( contribs.iterator }.reduceByKey(mergeContribs).collectAsMap() + bcActiveCenters.unpersist(blocking = false) + // Update the cluster centers and costs for each active run for ((run, i) <- activeRuns.zipWithIndex) { var changed = false @@ -419,7 +421,10 @@ class KMeans private ( s0 } ) + + bcNewCenters.unpersist(blocking = false) preCosts.unpersist(blocking = false) + val chosen = data.zip(costs).mapPartitionsWithIndex { (index, pointsWithCosts) => val rand = new XORShiftRandom(seed ^ (step << 16) ^ index) pointsWithCosts.flatMap { case (p, c) => @@ -448,6 +453,9 @@ class KMeans private ( ((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0) } }.reduceByKey(_ + _).collectAsMap() + + bcCenters.unpersist(blocking = false) + val finalCenters = (0 until runs).par.map { r => val myCenters = centers(r).toArray val myWeights = (0 until myCenters.length).map(i => weightMap.getOrElse((r, i), 0.0)).toArray --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org