Repository: spark Updated Branches: refs/heads/branch-2.0 a122a3e70 -> fe0a0686c
[SPARK-15346][MLLIB] Reduce duplicate computation in picking initial points mateiz srowen I state that the contribution is my original work and that I license the work to the project under the project's open source license There's some format problems with my last PR, with HyukjinKwon 's help I read the guidance, re-check my code and PR, then run the tests, finally re-submit the PR request here. The related JIRA issue though marked as resolved, this change may relate to it I think. ## Proposed Change After picking each new initial centers, it's unnecessary to compute the distances between all the points and the old ones. Instead this change keeps the distance between all the points and their closest centers, and compare to the distance of them with the new center then update them. ## Test result One can find an easy test way in (https://issues.apache.org/jira/browse/SPARK-6706) I test the KMeans++ method for a small dataset with 16k points, and the whole KMeans|| with a large one with 240k points. The data has 4096 features and I tunes K from 100 to 500. The test environment was on my 4 machine cluster, I also tested a 3M points data on a larger cluster with 25 machines and got similar results, which I would not draw the detail curve. The result of the first two exps are shown below ### Local KMeans++ test: Dataset:4m_ini_center Data_size:16234 Dimension:4096 Lloyd's Iteration = 10 The y-axis is time in sec, the x-axis is tuning the K. ![image](https://cloud.githubusercontent.com/assets/10915169/15175831/d0c92b82-179a-11e6-8b68-4e165fc2fdff.png) ![local_total](https://cloud.githubusercontent.com/assets/10915169/15175957/6b21c3b0-179b-11e6-9741-66dfe4e23eb7.jpg) ### On a larger dataset An improve show in the graph but not commit in this file: In this experiment I also have an improvement for calculation in normalization data (the distance is convert to the cosine distance). As if the data is normalized into (0,1), one improvement in the original vesion for util.MLUtils.fastSauaredDistance would have no effect (the precisionBound 2.0 * EPSILON * sumSquaredNorm / (normDiff * normDiff + EPSILON) will never less then precision in this case). Therefore I design an early terminal method when comparing two distance (used for findClosest). But I don't include this improve in this file, you may only refer to the curves without "normalize" for comparing the results. Dataset:4k24 Data_size:243960 Dimension:4096 Normlize Enlarge Initialize Lloyd's_Iteration NO 1 3 5 YES 10000 3 5 Notice: the normlized data is enlarged to ensure precision The cost time: x-for value of K, y-for time in sec ![4k24_total](https://cloud.githubusercontent.com/assets/10915169/15176635/9a54c0bc-179e-11e6-81c5-238e0c54bce2.jpg) SE for unnormalized data between two version, to ensure the correctness ![4k24_unnorm_se](https://cloud.githubusercontent.com/assets/10915169/15176661/b85dabc8-179e-11e6-9269-fe7d2101dd48.jpg) Here is the SE between normalized data just for reference, it's also correct. ![4k24_norm_se](https://cloud.githubusercontent.com/assets/10915169/15176742/1fbde940-179f-11e6-8290-d24b0dd4a4f7.jpg) Author: DLucky <mouendl...@gmail.com> Closes #13133 from mouendless/patch-2. (cherry picked from commit 420b700695fe8bcdda406c34ad48230b9dfc07f1) Signed-off-by: Sean Owen <so...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fe0a0686 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fe0a0686 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fe0a0686 Branch: refs/heads/branch-2.0 Commit: fe0a0686c50743272a841b909cbfe4534350fc18 Parents: a122a3e Author: DLucky <mouendl...@gmail.com> Authored: Wed May 18 12:05:21 2016 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Wed May 18 12:05:29 2016 +0100 ---------------------------------------------------------------------- .../apache/spark/mllib/clustering/LocalKMeans.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/fe0a0686/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala index adf20dc..5358767 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/LocalKMeans.scala @@ -46,17 +46,15 @@ private[mllib] object LocalKMeans extends Logging { // Initialize centers by sampling using the k-means++ procedure. centers(0) = pickWeighted(rand, points, weights).toDense + val costArray = points.map(KMeans.fastSquaredDistance(_, centers(0))) + for (i <- 1 until k) { - // Pick the next center with a probability proportional to cost under current centers - val curCenters = centers.view.take(i) - val sum = points.view.zip(weights).map { case (p, w) => - w * KMeans.pointCost(curCenters, p) - }.sum + val sum = costArray.zip(weights).map(p => p._1 * p._2).sum val r = rand.nextDouble() * sum var cumulativeScore = 0.0 var j = 0 while (j < points.length && cumulativeScore < r) { - cumulativeScore += weights(j) * KMeans.pointCost(curCenters, points(j)) + cumulativeScore += weights(j) * costArray(j) j += 1 } if (j == 0) { @@ -66,6 +64,12 @@ private[mllib] object LocalKMeans extends Logging { } else { centers(i) = points(j - 1).toDense } + + // update costArray + for (p <- points.indices) { + costArray(p) = math.min(KMeans.fastSquaredDistance(points(p), centers(i)), costArray(p)) + } + } // Run up to maxIterations iterations of Lloyd's algorithm --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org