Repository: spark
Updated Branches:
  refs/heads/branch-2.0 a122a3e70 -> fe0a0686c


[SPARK-15346][MLLIB] Reduce duplicate computation in picking initial points

mateiz srowen

I state that the contribution is my original work and that I license the work 
to the project under the project's open source license

There's some format problems with my last PR, with HyukjinKwon 's help I read 
the guidance, re-check my code and PR, then run the tests, finally re-submit 
the PR request here.

The related JIRA issue though marked as resolved, this change may relate to it 
I think.

## Proposed Change

After picking each new initial centers, it's unnecessary to compute the 
distances between all the points and the old ones.
Instead this change keeps the distance between all the points and their closest 
centers, and compare to the distance of them with the new center then update 
them.

## Test result

One can find an easy test way in 
(https://issues.apache.org/jira/browse/SPARK-6706)

I test the KMeans++ method for a small dataset with 16k points, and the whole 
KMeans|| with a large one with 240k points.
The data has 4096 features and I tunes K from 100 to 500.
The test environment was on my 4 machine cluster, I also tested a 3M points 
data on a larger cluster with 25 machines and got similar results, which I 
would not draw the detail curve. The result of the first two exps are shown 
below

### Local KMeans++ test:

Dataset:4m_ini_center
Data_size:16234
Dimension:4096

Lloyd's Iteration = 10
The y-axis is time in sec, the x-axis is tuning the K.

![image](https://cloud.githubusercontent.com/assets/10915169/15175831/d0c92b82-179a-11e6-8b68-4e165fc2fdff.png)

![local_total](https://cloud.githubusercontent.com/assets/10915169/15175957/6b21c3b0-179b-11e6-9741-66dfe4e23eb7.jpg)

### On a larger dataset

An improve show in the graph but not commit in this file: In this experiment I 
also have an improvement for calculation in normalization data (the distance is 
convert to the cosine distance). As if the data is normalized into (0,1), one 
improvement in the original vesion for util.MLUtils.fastSauaredDistance would 
have no effect (the precisionBound 2.0 * EPSILON * sumSquaredNorm / (normDiff * 
normDiff + EPSILON) will never less then precision in this case). Therefore I 
design an early terminal method when comparing two distance (used for 
findClosest). But I don't include this improve in this file, you may only refer 
to the curves without "normalize" for comparing the results.

Dataset:4k24
Data_size:243960
Dimension:4096

Normlize        Enlarge         Initialize      Lloyd's_Iteration
NO      1                3                5
YES             10000    3                5

Notice: the normlized data is enlarged to ensure precision

The cost time: x-for value of K, y-for time in sec
![4k24_total](https://cloud.githubusercontent.com/assets/10915169/15176635/9a54c0bc-179e-11e6-81c5-238e0c54bce2.jpg)

SE for unnormalized data between two version, to ensure the correctness
![4k24_unnorm_se](https://cloud.githubusercontent.com/assets/10915169/15176661/b85dabc8-179e-11e6-9269-fe7d2101dd48.jpg)

Here is the SE between normalized data just for reference, it's also correct.
![4k24_norm_se](https://cloud.githubusercontent.com/assets/10915169/15176742/1fbde940-179f-11e6-8290-d24b0dd4a4f7.jpg)

Author: DLucky <mouendl...@gmail.com>

Closes #13133 from mouendless/patch-2.

(cherry picked from commit 420b700695fe8bcdda406c34ad48230b9dfc07f1)
Signed-off-by: Sean Owen <so...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe0a0686
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe0a0686
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe0a0686

Branch: refs/heads/branch-2.0
Commit: fe0a0686c50743272a841b909cbfe4534350fc18
Parents: a122a3e
Author: DLucky <mouendl...@gmail.com>
Authored: Wed May 18 12:05:21 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed May 18 12:05:29 2016 +0100

----------------------------------------------------------------------
 .../apache/spark/mllib/clustering/LocalKMeans.scala | 16 ++++++++++------
 1 file changed, 10 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fe0a0686/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala
index adf20dc..5358767 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala
@@ -46,17 +46,15 @@ private[mllib] object LocalKMeans extends Logging {
 
     // Initialize centers by sampling using the k-means++ procedure.
     centers(0) = pickWeighted(rand, points, weights).toDense
+    val costArray = points.map(KMeans.fastSquaredDistance(_, centers(0)))
+
     for (i <- 1 until k) {
-      // Pick the next center with a probability proportional to cost under 
current centers
-      val curCenters = centers.view.take(i)
-      val sum = points.view.zip(weights).map { case (p, w) =>
-        w * KMeans.pointCost(curCenters, p)
-      }.sum
+      val sum = costArray.zip(weights).map(p => p._1 * p._2).sum
       val r = rand.nextDouble() * sum
       var cumulativeScore = 0.0
       var j = 0
       while (j < points.length && cumulativeScore < r) {
-        cumulativeScore += weights(j) * KMeans.pointCost(curCenters, points(j))
+        cumulativeScore += weights(j) * costArray(j)
         j += 1
       }
       if (j == 0) {
@@ -66,6 +64,12 @@ private[mllib] object LocalKMeans extends Logging {
       } else {
         centers(i) = points(j - 1).toDense
       }
+
+      // update costArray
+      for (p <- points.indices) {
+        costArray(p) = math.min(KMeans.fastSquaredDistance(points(p), 
centers(i)), costArray(p))
+      }
+
     }
 
     // Run up to maxIterations iterations of Lloyd's algorithm


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to