Repository: spark Updated Branches: refs/heads/branch-1.4 d6fd80570 -> 15beccb71
[SPARK-100354] [MLLIB] fix some apparent memory issues in k-means|| initializaiton * do not cache first cost RDD * change following cost RDD cache level to MEMORY_AND_DISK * remove Vector wrapper to save a object per instance Further improvements will be addressed in SPARK-10329 cc: yu-iskw HuJiayin Author: Xiangrui Meng <m...@databricks.com> Closes #8526 from mengxr/SPARK-10354. (cherry picked from commit f0f563a3c43fc9683e6920890cce44611c0c5f4b) Signed-off-by: Xiangrui Meng <m...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/15beccb7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/15beccb7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/15beccb7 Branch: refs/heads/branch-1.4 Commit: 15beccb716a3d4bb533ecf3e81e26e757fb0f844 Parents: d6fd805 Author: Xiangrui Meng <m...@databricks.com> Authored: Sun Aug 30 23:20:03 2015 -0700 Committer: Xiangrui Meng <m...@databricks.com> Committed: Sun Aug 30 23:20:30 2015 -0700 ---------------------------------------------------------------------- .../apache/spark/mllib/clustering/KMeans.scala | 21 +++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/15beccb7/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala index 0f8d6a3..555b98d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala @@ -318,7 +318,7 @@ class KMeans private ( : Array[Array[VectorWithNorm]] = { // Initialize empty centers and point costs. val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm]) - var costs = data.map(_ => Vectors.dense(Array.fill(runs)(Double.PositiveInfinity))).cache() + var costs = data.map(_ => Array.fill(runs)(Double.PositiveInfinity)) // Initialize each run's first center to a random point. val seed = new XORShiftRandom(this.seed).nextInt() @@ -343,21 +343,28 @@ class KMeans private ( val bcNewCenters = data.context.broadcast(newCenters) val preCosts = costs costs = data.zip(preCosts).map { case (point, cost) => - Vectors.dense( Array.tabulate(runs) { r => math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r)) - }) - }.cache() + } + }.persist(StorageLevel.MEMORY_AND_DISK) val sumCosts = costs - .aggregate(Vectors.zeros(runs))( + .aggregate(new Array[Double](runs))( seqOp = (s, v) => { // s += v - axpy(1.0, v, s) + var r = 0 + while (r < runs) { + s(r) += v(r) + r += 1 + } s }, combOp = (s0, s1) => { // s0 += s1 - axpy(1.0, s1, s0) + var r = 0 + while (r < runs) { + s0(r) += s1(r) + r += 1 + } s0 } ) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org