Repository: spark Updated Branches: refs/heads/master 8c32b2e87 -> f472b8cdc
[SPARK-5016] [MLLIB] Distribute GMM mixture components to executors Distribute expensive portions of computation for Gaussian mixture components (in particular, pre-computation of `MultivariateGaussian.rootSigmaInv`, the inverse covariance matrix and covariance determinant) across executors. Repost of PR#4654. Notes for reviewers: * What should be the policy for when to distribute computation. Always? When numClusters > threshold? User-specified param? TODO: * Performance testing and comparison for large number of clusters Author: Feynman Liang <fli...@databricks.com> Closes #7166 from feynmanliang/GMM_parallel_mixtures and squashes the following commits: 4f351fa [Feynman Liang] Update heuristic and scaladoc 5ea947e [Feynman Liang] Fix parallelization logic 00eb7db [Feynman Liang] Add helper method for GMM's M step, remove distributeGaussians flag e7c8127 [Feynman Liang] Add distributeGaussians flag and tests 1da3c7f [Feynman Liang] Distribute mixtures Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f472b8cd Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f472b8cd Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f472b8cd Branch: refs/heads/master Commit: f472b8cdc00839780dc79be0bbe53a098cde230c Parents: 8c32b2e Author: Feynman Liang <fli...@databricks.com> Authored: Wed Jul 8 16:32:00 2015 -0700 Committer: Xiangrui Meng <m...@databricks.com> Committed: Wed Jul 8 16:32:00 2015 -0700 ---------------------------------------------------------------------- .../mllib/clustering/GaussianMixture.scala | 44 ++++++++++++++++---- 1 file changed, 36 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/f472b8cd/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala index fc509d2..e459367 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/GaussianMixture.scala @@ -140,6 +140,10 @@ class GaussianMixture private ( // Get length of the input vectors val d = breezeData.first().length + // Heuristic to distribute the computation of the [[MultivariateGaussian]]s, approximately when + // d > 25 except for when k is very small + val distributeGaussians = ((k - 1.0) / k) * d > 25 + // Determine initial weights and corresponding Gaussians. // If the user supplied an initial GMM, we use those values, otherwise // we start with uniform weights, a random mean from the data, and @@ -171,14 +175,25 @@ class GaussianMixture private ( // Create new distributions based on the partial assignments // (often referred to as the "M" step in literature) val sumWeights = sums.weights.sum - var i = 0 - while (i < k) { - val mu = sums.means(i) / sums.weights(i) - BLAS.syr(-sums.weights(i), Vectors.fromBreeze(mu), - Matrices.fromBreeze(sums.sigmas(i)).asInstanceOf[DenseMatrix]) - weights(i) = sums.weights(i) / sumWeights - gaussians(i) = new MultivariateGaussian(mu, sums.sigmas(i) / sums.weights(i)) - i = i + 1 + + if (distributeGaussians) { + val numPartitions = math.min(k, 1024) + val tuples = + Seq.tabulate(k)(i => (sums.means(i), sums.sigmas(i), sums.weights(i))) + val (ws, gs) = sc.parallelize(tuples, numPartitions).map { case (mean, sigma, weight) => + updateWeightsAndGaussians(mean, sigma, weight, sumWeights) + }.collect.unzip + Array.copy(ws, 0, weights, 0, ws.length) + Array.copy(gs, 0, gaussians, 0, gs.length) + } else { + var i = 0 + while (i < k) { + val (weight, gaussian) = + updateWeightsAndGaussians(sums.means(i), sums.sigmas(i), sums.weights(i), sumWeights) + weights(i) = weight + gaussians(i) = gaussian + i = i + 1 + } } llhp = llh // current becomes previous @@ -192,6 +207,19 @@ class GaussianMixture private ( /** Java-friendly version of [[run()]] */ def run(data: JavaRDD[Vector]): GaussianMixtureModel = run(data.rdd) + private def updateWeightsAndGaussians( + mean: BDV[Double], + sigma: BreezeMatrix[Double], + weight: Double, + sumWeights: Double): (Double, MultivariateGaussian) = { + val mu = (mean /= weight) + BLAS.syr(-weight, Vectors.fromBreeze(mu), + Matrices.fromBreeze(sigma).asInstanceOf[DenseMatrix]) + val newWeight = weight / sumWeights + val newGaussian = new MultivariateGaussian(mu, sigma / weight) + (newWeight, newGaussian) + } + /** Average of dense breeze vectors */ private def vectorMean(x: IndexedSeq[BV[Double]]): BDV[Double] = { val v = BDV.zeros[Double](x(0).length) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org