Repository: spark
Updated Branches:
  refs/heads/master 8694c3ad7 -> f0f563a3c


[SPARK-100354] [MLLIB] fix some apparent memory issues in k-means|| 
initializaiton

* do not cache first cost RDD
* change following cost RDD cache level to MEMORY_AND_DISK
* remove Vector wrapper to save a object per instance

Further improvements will be addressed in SPARK-10329

cc: yu-iskw HuJiayin

Author: Xiangrui Meng <m...@databricks.com>

Closes #8526 from mengxr/SPARK-10354.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f0f563a3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f0f563a3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f0f563a3

Branch: refs/heads/master
Commit: f0f563a3c43fc9683e6920890cce44611c0c5f4b
Parents: 8694c3a
Author: Xiangrui Meng <m...@databricks.com>
Authored: Sun Aug 30 23:20:03 2015 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Sun Aug 30 23:20:03 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/mllib/clustering/KMeans.scala  | 21 +++++++++++++-------
 1 file changed, 14 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f0f563a3/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
index 46920ff..7168aac 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala
@@ -369,7 +369,7 @@ class KMeans private (
   : Array[Array[VectorWithNorm]] = {
     // Initialize empty centers and point costs.
     val centers = Array.tabulate(runs)(r => ArrayBuffer.empty[VectorWithNorm])
-    var costs = data.map(_ => 
Vectors.dense(Array.fill(runs)(Double.PositiveInfinity))).cache()
+    var costs = data.map(_ => Array.fill(runs)(Double.PositiveInfinity))
 
     // Initialize each run's first center to a random point.
     val seed = new XORShiftRandom(this.seed).nextInt()
@@ -394,21 +394,28 @@ class KMeans private (
       val bcNewCenters = data.context.broadcast(newCenters)
       val preCosts = costs
       costs = data.zip(preCosts).map { case (point, cost) =>
-        Vectors.dense(
           Array.tabulate(runs) { r =>
             math.min(KMeans.pointCost(bcNewCenters.value(r), point), cost(r))
-          })
-      }.cache()
+          }
+        }.persist(StorageLevel.MEMORY_AND_DISK)
       val sumCosts = costs
-        .aggregate(Vectors.zeros(runs))(
+        .aggregate(new Array[Double](runs))(
           seqOp = (s, v) => {
             // s += v
-            axpy(1.0, v, s)
+            var r = 0
+            while (r < runs) {
+              s(r) += v(r)
+              r += 1
+            }
             s
           },
           combOp = (s0, s1) => {
             // s0 += s1
-            axpy(1.0, s1, s0)
+            var r = 0
+            while (r < runs) {
+              s0(r) += s1(r)
+              r += 1
+            }
             s0
           }
         )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to