Github user yanboliang commented on a diff in the pull request:

    https://github.com/apache/spark/pull/15413#discussion_r94920036
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/ml/clustering/GaussianMixture.scala ---
    @@ -356,13 +427,243 @@ class GaussianMixture @Since("2.0.0") (
       override def transformSchema(schema: StructType): StructType = {
         validateAndTransformSchema(schema)
       }
    +
    +  /**
    +   * Initialize weights and corresponding gaussian distributions at random.
    +   *
    +   * We start with uniform weights, a random mean from the data, and 
diagonal covariance matrices
    +   * using component variances derived from the samples.
    +   *
    +   * @param instances The training instances.
    +   * @param numClusters The number of clusters.
    +   * @param numFeatures The number of features of training instance.
    +   * @return The initialized weights and corresponding gaussian 
distributions. Note the
    +   *         covariance matrix of multivariate gaussian distribution is 
symmetric and
    +   *         we only save the upper triangular part as a dense vector.
    +   */
    +  private def initRandom(
    +      instances: RDD[Vector],
    +      numClusters: Int,
    +      numFeatures: Int): (Array[Double], Array[(DenseVector, 
DenseVector)]) = {
    +    val samples = instances.takeSample(withReplacement = true, numClusters 
* numSamples, $(seed))
    +    val weights: Array[Double] = Array.fill(numClusters)(1.0 / numClusters)
    +    val gaussians: Array[(DenseVector, DenseVector)] = 
Array.tabulate(numClusters) { i =>
    +      val slice = samples.view(i * numSamples, (i + 1) * numSamples)
    +      val mean = {
    +        val v = new DenseVector(new Array[Double](numFeatures))
    +        var i = 0
    +        while (i < numSamples) {
    +          BLAS.axpy(1.0, slice(i), v)
    +          i += 1
    +        }
    +        BLAS.scal(1.0 / numSamples, v)
    +        v
    +      }
    +      /*
    +         Construct matrix where diagonal entries are element-wise
    +         variance of input vectors (computes biased variance).
    +         Since the covariance matrix of multivariate gaussian distribution 
is symmetric,
    +         only the upper triangular part of the matrix will be saved as a 
dense vector
    +         in order to reduce the shuffled data size.
    +       */
    +      val cov = {
    +        val ss = new DenseVector(new Array[Double](numFeatures)).asBreeze
    +        slice.foreach(xi => ss += (xi.asBreeze - mean.asBreeze) :^ 2.0)
    +        val diagVec = Vectors.fromBreeze(ss)
    +        BLAS.scal(1.0 / numSamples, diagVec)
    +        val covVec = new DenseVector(Array.fill[Double](
    +          numFeatures * (numFeatures + 1) / 2)(0.0))
    +        diagVec.toArray.zipWithIndex.foreach { case (v: Double, i: Int) =>
    +          covVec.values(i + i * (i + 1) / 2) = v
    +        }
    +        covVec
    +      }
    +      (mean, cov)
    +    }
    +    (weights, gaussians)
    +  }
     }
     
     @Since("2.0.0")
     object GaussianMixture extends DefaultParamsReadable[GaussianMixture] {
     
       @Since("2.0.0")
       override def load(path: String): GaussianMixture = super.load(path)
    +
    +  /**
    +   * Heuristic to distribute the computation of the 
[[MultivariateGaussian]]s, approximately when
    +   * numFeatures > 25 except for when numClusters is very small.
    +   *
    +   * @param numClusters  Number of clusters
    +   * @param numFeatures  Number of features
    +   */
    +  private[clustering] def shouldDistributeGaussians(
    +      numClusters: Int,
    +      numFeatures: Int): Boolean = {
    +    ((numClusters - 1.0) / numClusters) * numFeatures > 25.0
    +  }
    +
    +  /**
    +   * Convert an n * (n + 1) / 2 dimension array representing the upper 
triangular part of a matrix
    +   * into an n * n array representing the full symmetric matrix.
    +   *
    +   * @param n The order of the n by n matrix.
    +   * @param triangularValues The upper triangular part of the matrix 
packed in an array
    +   *                         (column major).
    +   * @return An array which represents the symmetric matrix in column 
major.
    +   */
    +  private[clustering] def unpackUpperTriangularMatrix(
    +      n: Int,
    +      triangularValues: Array[Double]): Array[Double] = {
    +    val symmetricValues = new Array[Double](n * n)
    +    var r = 0
    +    var i = 0
    +    while(i < n) {
    +      var j = 0
    +      while (j <= i) {
    +        symmetricValues(i * n + j) = triangularValues(r)
    +        symmetricValues(j * n + i) = triangularValues(r)
    +        r += 1
    +        j += 1
    +      }
    +      i += 1
    +    }
    +    symmetricValues
    +  }
    +
    +  /**
    +   * Update the weight, mean and covariance of gaussian distribution.
    +   *
    +   * @param mean The mean of the gaussian distribution.
    +   * @param cov The covariance matrix of the gaussian distribution. Note 
we only
    +   *            save the upper triangular part as a dense vector.
    +   * @param weight The weight of the gaussian distribution.
    +   * @param sumWeights The sum of weights of all clusters.
    +   * @return The updated weight, mean and covariance.
    +   */
    +  private[clustering] def updateWeightsAndGaussians(
    +      mean: DenseVector,
    +      cov: DenseVector,
    +      weight: Double,
    +      sumWeights: Double): (Double, (DenseVector, DenseVector)) = {
    +    BLAS.scal(1.0 / weight, mean)
    +    BLAS.spr(-weight, mean, cov)
    +    BLAS.scal(1.0 / weight, cov)
    +    val newWeight = weight / sumWeights
    +    val newGaussian = (mean, cov)
    +    (newWeight, newGaussian)
    +  }
    +}
    +
    +/**
    + * ExpectationAggregator computes the partial expectation results.
    + *
    + * @param numFeatures The number of features.
    + * @param bcWeights The broadcast weights for each Gaussian distribution 
in the mixture.
    + * @param bcGaussians The broadcast array of Multivariate Gaussian 
(Normal) Distribution
    + *                    in the mixture. Note only upper triangular part of 
the covariance
    + *                    matrix of each distribution is stored as dense 
vector in order to
    + *                    reduce shuffled data size.
    + */
    +private class ExpectationAggregator(
    +    numFeatures: Int,
    +    bcWeights: Broadcast[Array[Double]],
    +    bcGaussians: Broadcast[Array[(DenseVector, DenseVector)]]) extends 
Serializable {
    +
    +  private val k: Int = bcWeights.value.length
    +  private var totalCnt: Long = 0L
    +  private var newLogLikelihood: Double = 0.0
    +  private val newWeights: Array[Double] = new Array[Double](k)
    +  private val newMeans: Array[DenseVector] = Array.fill(k)(
    +    new DenseVector(Array.fill[Double](numFeatures)(0.0)))
    +  private val newCovs: Array[DenseVector] = Array.fill(k)(
    +    new DenseVector(Array.fill[Double](numFeatures * (numFeatures + 1) / 
2)(0.0)))
    +
    +  @transient private lazy val oldGaussians = {
    +    bcGaussians.value.map { case (mean, covVec) =>
    +      val cov = new DenseMatrix(numFeatures, numFeatures,
    +        GaussianMixture.unpackUpperTriangularMatrix(numFeatures, 
covVec.values))
    +      new MultivariateGaussian(mean, cov)
    +    }
    +  }
    +
    +  def count: Long = totalCnt
    +
    +  def logLikelihood: Double = newLogLikelihood
    +
    +  def weights: Array[Double] = newWeights
    +
    +  def means: Array[DenseVector] = newMeans
    +
    +  def covs: Array[DenseVector] = newCovs
    +
    +  /**
    +   * Add a new training instance to this ExpectationAggregator, update the 
weights,
    +   * means and covariances for each distributions, and update the log 
likelihood.
    +   *
    +   * @param instance The instance of data point to be added.
    +   * @return This ExpectationAggregator object.
    +   */
    +  def add(instance: Vector): this.type = {
    +    val localWeights = bcWeights.value
    +    val localOldGaussians = oldGaussians
    +
    +    val prob = new Array[Double](k)
    +    var probSum = 0.0
    +    var i = 0
    +    while(i < k) {
    +      val p = EPSILON + localWeights(i) * 
localOldGaussians(i).pdf(instance)
    +      prob(i) = p
    +      probSum += p
    +      i += 1
    +    }
    +
    +    newLogLikelihood += math.log(probSum)
    +    val localNewWeights = newWeights
    +    val localNewMeans = newMeans
    +    val localNewCovs = newCovs
    +    i = 0
    +    while(i < k) {
    +      prob(i) /= probSum
    +      localNewWeights(i) += prob(i)
    +      BLAS.axpy(prob(i), instance, localNewMeans(i))
    +      BLAS.spr(prob(i), instance, localNewCovs(i))
    +      i += 1
    +    }
    +
    +    totalCnt += 1
    +    this
    +  }
    +
    +  /**
    +   * Merge another ExpectationAggregator, update the weights, means and 
covariances
    +   * for each distributions, and update the log likelihood.
    +   * (Note that it's in place merging; as a result, `this` object will be 
modified.)
    +   *
    +   * @param other The other ExpectationAggregator to be merged.
    +   * @return This ExpectationAggregator object.
    +   */
    +  def merge(other: ExpectationAggregator): this.type = {
    +    if (other.count != 0) {
    +      totalCnt += other.totalCnt
    +
    +      val localThisNewWeights = this.newWeights
    --- End diff --
    
    This is because we are actually accessing a ```getter``` method when we 
call ```this.newWeights```. To improve the performance of the loop at L655, we 
should use explicit pointers to the values rather than call ```getter``` each 
time. This probably isn't a big deal in this case since ```k``` is usually not 
very big, but I don't think it's a bad idea.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to