Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/14937#discussion_r77489220
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
    @@ -232,110 +303,127 @@ class KMeans private (
        * Implementation of K-Means algorithm.
        */
       private def runAlgorithm(
    -      data: RDD[VectorWithNorm],
    +      data: RDD[(Matrix, Array[Double])],
    +      centers: Array[VectorWithNorm],
           instr: Option[Instrumentation[NewKMeans]]): KMeansModel = {
     
         val sc = data.sparkContext
    -
    -    val initStartTime = System.nanoTime()
    -
    -    // Only one run is allowed when initialModel is given
    -    val numRuns = if (initialModel.nonEmpty) {
    -      if (runs > 1) logWarning("Ignoring runs; one run is allowed when 
initialModel is given.")
    -      1
    -    } else {
    -      runs
    -    }
    -
    -    val centers = initialModel match {
    -      case Some(kMeansCenters) =>
    -        Array(kMeansCenters.clusterCenters.map(s => new VectorWithNorm(s)))
    -      case None =>
    -        if (initializationMode == KMeans.RANDOM) {
    -          initRandom(data)
    -        } else {
    -          initKMeansParallel(data)
    -        }
    -    }
    -    val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
    -    logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
    -      " seconds.")
    -
    -    val active = Array.fill(numRuns)(true)
    -    val costs = Array.fill(numRuns)(0.0)
    -
    -    var activeRuns = new ArrayBuffer[Int] ++ (0 until numRuns)
    +    val dim = centers(0).vector.size
    +    var done = false
    +    var costs = 0.0
         var iteration = 0
     
    -    val iterationStartTime = System.nanoTime()
    +    instr.foreach(_.logNumFeatures(dim))
     
    -    instr.foreach(_.logNumFeatures(centers(0)(0).vector.size))
    +    val iterationStartTime = System.nanoTime()
     
    -    // Execute iterations of Lloyd's algorithm until all runs have 
converged
    -    while (iteration < maxIterations && !activeRuns.isEmpty) {
    +    // Execute Lloyd's algorithm until converged or reached the max number 
of iterations.
    +    while (iteration < maxIterations && !done) {
           type WeightedPoint = (Vector, Long)
           def mergeContribs(x: WeightedPoint, y: WeightedPoint): WeightedPoint 
= {
             axpy(1.0, x._1, y._1)
             (y._1, x._2 + y._2)
           }
     
    -      val activeCenters = activeRuns.map(r => centers(r)).toArray
    -      val costAccums = activeRuns.map(_ => sc.doubleAccumulator)
    -
    -      val bcActiveCenters = sc.broadcast(activeCenters)
    +      val costAccums = sc.doubleAccumulator
     
    -      // Find the sum and count of points mapping to each center
    -      val totalContribs = data.mapPartitions { points =>
    -        val thisActiveCenters = bcActiveCenters.value
    -        val runs = thisActiveCenters.length
    -        val k = thisActiveCenters(0).length
    -        val dims = thisActiveCenters(0)(0).vector.size
    +      // Construct center array and broadcast it.
    +      val centerArray = new Array[Double](k * dim)
    +      val centerNormArray = new Array[Double](k)
    +      var i = 0
    +      centers.foreach { center =>
    +        System.arraycopy(center.vector.toArray, 0, centerArray, i * dim, 
dim)
    +        centerNormArray(i) = center.norm
    +        i += 1
    +      }
    +      val bcCenterArray = sc.broadcast(centerArray)
    +      val bcCenterNormArray = sc.broadcast(centerNormArray)
    +
    +      // Find the sum and count of points mapping to each center.
    +      val totalContribs = data.flatMap { case (pointMatrix, 
pointNormArray) =>
    +        val thisCenterArray = bcCenterArray.value
    +        val thisCenterNormArray = bcCenterNormArray.value
    +
    +        val k = thisCenterNormArray.length
    --- End diff --
    
    This is another tiny nit, but this value may be less than k, actually, if 
fewer than k centroids were found. I had renamed it to avoid confusion, but 
it's minor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to