Repository: spark Updated Branches: refs/heads/branch-1.3 e8b0564e7 -> a58c1afe8
[SPARK-100354] [MLLIB] fix some apparent memory issues in k-means|| initializaiton * do not cache first cost RDD * change following cost RDD cache level to MEMORY_AND_DISK * remove Vector wrapper to save a object per instance Further improvements will be addressed in SPARK-10329 cc: yu-iskw HuJiayin Author: Xiangrui Meng <m...@databricks.com> Closes #8526 from mengxr/SPARK-10354. (cherry picked from commit f0f563a3c43fc9683e6920890cce44611c0c5f4b) Signed-off-by: Xiangrui Meng <m...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a58c1afe Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a58c1afe Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a58c1afe Branch: refs/heads/branch-1.3 Commit: a58c1afe8fcf599993537c18a53591343ff33461 Parents: e8b0564 Author: Xiangrui Meng <m...@databricks.com> Authored: Sun Aug 30 23:20:03 2015 -0700 Committer: Xiangrui Meng <m...@databricks.com> Committed: Sun Aug 30 23:21:09 2015 -0700 ---------------------------------------------------------------------- .../apache/spark/mllib/clustering/KMeans.scala | 21 +++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a58c1afe/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 11633e8..21bf3cd 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -281,7 +281,7 @@ class KMeans private ( : Array[Array[VectorWithNorm]] = { // Initialize empty centers and point costs. val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm]) - var costs = data.map(_ => Vectors.dense(Array.fill(runs)(Double.PositiveInfinity))).cache() + var costs = data.map(_ => Array.fill(runs)(Double.PositiveInfinity)) // Initialize each run's first center to a random point. val seed = new XORShiftRandom(this.seed).nextInt() @@ -306,21 +306,28 @@ class KMeans private ( val bcNewCenters = data.context.broadcast(newCenters) val preCosts = costs costs = data.zip(preCosts).map { case (point, cost) => - Vectors.dense( Array.tabulate(runs) { r => math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r)) - }) - }.cache() + } + }.persist(StorageLevel.MEMORY_AND_DISK) val sumCosts = costs - .aggregate(Vectors.zeros(runs))( + .aggregate(new Array[Double](runs))( seqOp = (s, v) => { // s += v - axpy(1.0, v, s) + var r = 0 + while (r < runs) { + s(r) += v(r) + r += 1 + } s }, combOp = (s0, s1) => { // s0 += s1 - axpy(1.0, s1, s0) + var r = 0 + while (r < runs) { + s0(r) += s1(r) + r += 1 + } s0 } ) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org