Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/15413#discussion_r94920036 --- Diff: mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala --- @@ -356,13 +427,243 @@ class GaussianMixture @Since("2.0.0") ( override def transformSchema(schema: StructType): StructType = { validateAndTransformSchema(schema) } + + /** + * Initialize weights and corresponding gaussian distributions at random. + * + * We start with uniform weights, a random mean from the data, and diagonal covariance matrices + * using component variances derived from the samples. + * + * @param instances The training instances. + * @param numClusters The number of clusters. + * @param numFeatures The number of features of training instance. + * @return The initialized weights and corresponding gaussian distributions. Note the + * covariance matrix of multivariate gaussian distribution is symmetric and + * we only save the upper triangular part as a dense vector. + */ + private def initRandom( + instances: RDD[Vector], + numClusters: Int, + numFeatures: Int): (Array[Double], Array[(DenseVector, DenseVector)]) = { + val samples = instances.takeSample(withReplacement = true, numClusters * numSamples, $(seed)) + val weights: Array[Double] = Array.fill(numClusters)(1.0 / numClusters) + val gaussians: Array[(DenseVector, DenseVector)] = Array.tabulate(numClusters) { i => + val slice = samples.view(i * numSamples, (i + 1) * numSamples) + val mean = { + val v = new DenseVector(new Array[Double](numFeatures)) + var i = 0 + while (i < numSamples) { + BLAS.axpy(1.0, slice(i), v) + i += 1 + } + BLAS.scal(1.0 / numSamples, v) + v + } + /* + Construct matrix where diagonal entries are element-wise + variance of input vectors (computes biased variance). + Since the covariance matrix of multivariate gaussian distribution is symmetric, + only the upper triangular part of the matrix will be saved as a dense vector + in order to reduce the shuffled data size. + */ + val cov = { + val ss = new DenseVector(new Array[Double](numFeatures)).asBreeze + slice.foreach(xi => ss += (xi.asBreeze - mean.asBreeze) :^ 2.0) + val diagVec = Vectors.fromBreeze(ss) + BLAS.scal(1.0 / numSamples, diagVec) + val covVec = new DenseVector(Array.fill[Double]( + numFeatures * (numFeatures + 1) / 2)(0.0)) + diagVec.toArray.zipWithIndex.foreach { case (v: Double, i: Int) => + covVec.values(i + i * (i + 1) / 2) = v + } + covVec + } + (mean, cov) + } + (weights, gaussians) + } } @Since("2.0.0") object GaussianMixture extends DefaultParamsReadable[GaussianMixture] { @Since("2.0.0") override def load(path: String): GaussianMixture = super.load(path) + + /** + * Heuristic to distribute the computation of the [[MultivariateGaussian]]s, approximately when + * numFeatures > 25 except for when numClusters is very small. + * + * @param numClusters Number of clusters + * @param numFeatures Number of features + */ + private[clustering] def shouldDistributeGaussians( + numClusters: Int, + numFeatures: Int): Boolean = { + ((numClusters - 1.0) / numClusters) * numFeatures > 25.0 + } + + /** + * Convert an n * (n + 1) / 2 dimension array representing the upper triangular part of a matrix + * into an n * n array representing the full symmetric matrix. + * + * @param n The order of the n by n matrix. + * @param triangularValues The upper triangular part of the matrix packed in an array + * (column major). + * @return An array which represents the symmetric matrix in column major. + */ + private[clustering] def unpackUpperTriangularMatrix( + n: Int, + triangularValues: Array[Double]): Array[Double] = { + val symmetricValues = new Array[Double](n * n) + var r = 0 + var i = 0 + while(i < n) { + var j = 0 + while (j <= i) { + symmetricValues(i * n + j) = triangularValues(r) + symmetricValues(j * n + i) = triangularValues(r) + r += 1 + j += 1 + } + i += 1 + } + symmetricValues + } + + /** + * Update the weight, mean and covariance of gaussian distribution. + * + * @param mean The mean of the gaussian distribution. + * @param cov The covariance matrix of the gaussian distribution. Note we only + * save the upper triangular part as a dense vector. + * @param weight The weight of the gaussian distribution. + * @param sumWeights The sum of weights of all clusters. + * @return The updated weight, mean and covariance. + */ + private[clustering] def updateWeightsAndGaussians( + mean: DenseVector, + cov: DenseVector, + weight: Double, + sumWeights: Double): (Double, (DenseVector, DenseVector)) = { + BLAS.scal(1.0 / weight, mean) + BLAS.spr(-weight, mean, cov) + BLAS.scal(1.0 / weight, cov) + val newWeight = weight / sumWeights + val newGaussian = (mean, cov) + (newWeight, newGaussian) + } +} + +/** + * ExpectationAggregator computes the partial expectation results. + * + * @param numFeatures The number of features. + * @param bcWeights The broadcast weights for each Gaussian distribution in the mixture. + * @param bcGaussians The broadcast array of Multivariate Gaussian (Normal) Distribution + * in the mixture. Note only upper triangular part of the covariance + * matrix of each distribution is stored as dense vector in order to + * reduce shuffled data size. + */ +private class ExpectationAggregator( + numFeatures: Int, + bcWeights: Broadcast[Array[Double]], + bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends Serializable { + + private val k: Int = bcWeights.value.length + private var totalCnt: Long = 0L + private var newLogLikelihood: Double = 0.0 + private val newWeights: Array[Double] = new Array[Double](k) + private val newMeans: Array[DenseVector] = Array.fill(k)( + new DenseVector(Array.fill[Double](numFeatures)(0.0))) + private val newCovs: Array[DenseVector] = Array.fill(k)( + new DenseVector(Array.fill[Double](numFeatures * (numFeatures + 1) / 2)(0.0))) + + @transient private lazy val oldGaussians = { + bcGaussians.value.map { case (mean, covVec) => + val cov = new DenseMatrix(numFeatures, numFeatures, + GaussianMixture.unpackUpperTriangularMatrix(numFeatures, covVec.values)) + new MultivariateGaussian(mean, cov) + } + } + + def count: Long = totalCnt + + def logLikelihood: Double = newLogLikelihood + + def weights: Array[Double] = newWeights + + def means: Array[DenseVector] = newMeans + + def covs: Array[DenseVector] = newCovs + + /** + * Add a new training instance to this ExpectationAggregator, update the weights, + * means and covariances for each distributions, and update the log likelihood. + * + * @param instance The instance of data point to be added. + * @return This ExpectationAggregator object. + */ + def add(instance: Vector): this.type = { + val localWeights = bcWeights.value + val localOldGaussians = oldGaussians + + val prob = new Array[Double](k) + var probSum = 0.0 + var i = 0 + while(i < k) { + val p = EPSILON + localWeights(i) * localOldGaussians(i).pdf(instance) + prob(i) = p + probSum += p + i += 1 + } + + newLogLikelihood += math.log(probSum) + val localNewWeights = newWeights + val localNewMeans = newMeans + val localNewCovs = newCovs + i = 0 + while(i < k) { + prob(i) /= probSum + localNewWeights(i) += prob(i) + BLAS.axpy(prob(i), instance, localNewMeans(i)) + BLAS.spr(prob(i), instance, localNewCovs(i)) + i += 1 + } + + totalCnt += 1 + this + } + + /** + * Merge another ExpectationAggregator, update the weights, means and covariances + * for each distributions, and update the log likelihood. + * (Note that it's in place merging; as a result, `this` object will be modified.) + * + * @param other The other ExpectationAggregator to be merged. + * @return This ExpectationAggregator object. + */ + def merge(other: ExpectationAggregator): this.type = { + if (other.count != 0) { + totalCnt += other.totalCnt + + val localThisNewWeights = this.newWeights --- End diff -- This is because we are actually accessing a ```getter``` method when we call ```this.newWeights```. To improve the performance of the loop at L655, we should use explicit pointers to the values rather than call ```getter``` each time. This probably isn't a big deal in this case since ```k``` is usually not very big, but I don't think it's a bad idea.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org