    --- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala ---
    @@ -17,429 +17,57 @@
     package org.apache.spark.mllib.clustering
    -import scala.collection.mutable.ArrayBuffer
    -import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm}
    -import org.apache.spark.annotation.Experimental
    -import org.apache.spark.Logging
    -import org.apache.spark.SparkContext._
    -import org.apache.spark.mllib.linalg.{Vector, Vectors}
    -import org.apache.spark.mllib.util.MLUtils
    +import org.apache.spark.mllib.base.{FP, PointOps}
    +import org.apache.spark.mllib.clustering.metrics.FastEuclideanOps
     import org.apache.spark.rdd.RDD
    -import org.apache.spark.util.random.XORShiftRandom
    - * K-means clustering with support for multiple parallel runs and a 
k-means++ like initialization
    - * mode (the k-means|| algorithm by Bahmani et al). When multiple 
concurrent runs are requested,
    - * they are executed together with joint passes over the data for 
    - *
    - * This is an iterative algorithm that will make multiple passes over the 
data, so any RDDs given
    - * to it should be cached by the user.
    - */
    -class KMeans private (
    -    private var k: Int,
    -    private var maxIterations: Int,
    -    private var runs: Int,
    -    private var initializationMode: String,
    -    private var initializationSteps: Int,
    -    private var epsilon: Double) extends Serializable with Logging {
    -  /**
    -   * Constructs a KMeans instance with default parameters: {k: 2, 
maxIterations: 20, runs: 1,
    -   * initializationMode: "k-means||", initializationSteps: 5, epsilon: 
    -   */
    -  def this() = this(2, 20, 1, KMeans.K_MEANS_PARALLEL, 5, 1e-4)
    -  /** Set the number of clusters to create (k). Default: 2. */
    -  def setK(k: Int): this.type = {
    -    this.k = k
    -    this
    -  }
    -  /** Set maximum number of iterations to run. Default: 20. */
    -  def setMaxIterations(maxIterations: Int): this.type = {
    -    this.maxIterations = maxIterations
    -    this
    -  }
    -  /**
    -   * Set the initialization algorithm. This can be either "random" to 
choose random points as
    -   * initial cluster centers, or "k-means||" to use a parallel variant of 
    -   * (Bahmani et al., Scalable K-Means++, VLDB 2012). Default: k-means||.
    -   */
    -  def setInitializationMode(initializationMode: String): this.type = {
    -    if (initializationMode != KMeans.RANDOM && initializationMode != 
    -      throw new IllegalArgumentException("Invalid initialization mode: " + 
    -    }
    -    this.initializationMode = initializationMode
    -    this
    -  }
    -  /**
    -   * :: Experimental ::
    -   * Set the number of runs of the algorithm to execute in parallel. We 
initialize the algorithm
    -   * this many times with random starting conditions (configured by the 
initialization mode), then
    -   * return the best clustering found over any run. Default: 1.
    -   */
    -  @Experimental
    -  def setRuns(runs: Int): this.type = {
    -    if (runs <= 0) {
    -      throw new IllegalArgumentException("Number of runs must be positive")
    -    }
    -    this.runs = runs
    -    this
    -  }
    -  /**
    -   * Set the number of steps for the k-means|| initialization mode. This 
is an advanced
    -   * setting -- the default of 5 is almost always enough. Default: 5.
    -   */
    -  def setInitializationSteps(initializationSteps: Int): this.type = {
    -    if (initializationSteps <= 0) {
    -      throw new IllegalArgumentException("Number of initialization steps 
must be positive")
    -    }
    -    this.initializationSteps = initializationSteps
    -    this
    -  }
    -  /**
    -   * Set the distance threshold within which we've consider centers to 
have converged.
    -   * If all centers move less than this Euclidean distance, we stop 
iterating one run.
    -   */
    -  def setEpsilon(epsilon: Double): this.type = {
    -    this.epsilon = epsilon
    -    this
    -  }
    -  /** Whether a warning should be logged if the input RDD is uncached. */
    -  private var warnOnUncachedInput = true
    -  /** Disable warnings about uncached input. */
    -  private[spark] def disableUncachedWarning(): this.type = {
    -    warnOnUncachedInput = false
    -    this
    -  }  
    -  /**
    -   * Train a K-means model on the given set of points; `data` should be 
cached for high
    -   * performance, because this is an iterative algorithm.
    -   */
    -  def run(data: RDD[Vector]): KMeansModel = {
    -    if (warnOnUncachedInput && data.getStorageLevel == StorageLevel.NONE) {
    -      logWarning("The input data is not directly cached, which may hurt 
performance if its"
    -        + " parent RDDs are also uncached.")
    -    }
    -    // Compute squared norms and cache them.
    -    val norms = => breezeNorm(v.toBreeze, 2.0))
    -    norms.persist()
    -    val breezeData = { case (v, norm) 
    -      new BreezeVectorWithNorm(v, norm)
    -    }
    -    val model = runBreeze(breezeData)
    -    norms.unpersist()
    -    // Warn at the end of the run as well, for increased visibility.
    -    if (warnOnUncachedInput && data.getStorageLevel == StorageLevel.NONE) {
    -      logWarning("The input data was not directly cached, which may hurt 
performance if its"
    -        + " parent RDDs are also uncached.")
    -    }
    -    model
    -  }
    -  /**
    -   * Implementation of K-Means using breeze.
    -   */
    -  private def runBreeze(data: RDD[BreezeVectorWithNorm]): KMeansModel = {
    -    val sc = data.sparkContext
    -    val initStartTime = System.nanoTime()
    -    val centers = if (initializationMode == KMeans.RANDOM) {
    -      initRandom(data)
    -    } else {
    -      initKMeansParallel(data)
    -    }
    -    val initTimeInSeconds = (System.nanoTime() - initStartTime) / 1e9
    -    logInfo(s"Initialization with $initializationMode took " + 
"%.3f".format(initTimeInSeconds) +
    -      " seconds.")
    -    val active = Array.fill(runs)(true)
    -    val costs = Array.fill(runs)(0.0)
    -    var activeRuns = new ArrayBuffer[Int] ++ (0 until runs)
    -    var iteration = 0
    -    val iterationStartTime = System.nanoTime()
    -    // Execute iterations of Lloyd's algorithm until all runs have 
    -    while (iteration < maxIterations && !activeRuns.isEmpty) {
    -      type WeightedPoint = (BV[Double], Long)
    -      def mergeContribs(p1: WeightedPoint, p2: WeightedPoint): 
WeightedPoint = {
    -        (p1._1 += p2._1, p1._2 + p2._2)
    -      }
    -      val activeCenters = => centers(r)).toArray
    -      val costAccums = => sc.accumulator(0.0))
    -      val bcActiveCenters = sc.broadcast(activeCenters)
    -      // Find the sum and count of points mapping to each center
    -      val totalContribs = data.mapPartitions { points =>
    -        val thisActiveCenters = bcActiveCenters.value
    -        val runs = thisActiveCenters.length
    -        val k = thisActiveCenters(0).length
    -        val dims = thisActiveCenters(0)(0).vector.length
    -        val sums = Array.fill(runs, 
    -        val counts = Array.fill(runs, k)(0L)
    -        points.foreach { point =>
    -          (0 until runs).foreach { i =>
    -            val (bestCenter, cost) = 
KMeans.findClosest(thisActiveCenters(i), point)
    -            costAccums(i) += cost
    -            sums(i)(bestCenter) += point.vector
    -            counts(i)(bestCenter) += 1
    -          }
    -        }
    -        val contribs = for (i <- 0 until runs; j <- 0 until k) yield {
    -          ((i, j), (sums(i)(j), counts(i)(j)))
    -        }
    -        contribs.iterator
    -      }.reduceByKey(mergeContribs).collectAsMap()
    -      // Update the cluster centers and costs for each active run
    -      for ((run, i) <- activeRuns.zipWithIndex) {
    -        var changed = false
    -        var j = 0
    -        while (j < k) {
    -          val (sum, count) = totalContribs((i, j))
    -          if (count != 0) {
    -            sum /= count.toDouble
    -            val newCenter = new BreezeVectorWithNorm(sum)
    -            if (KMeans.fastSquaredDistance(newCenter, centers(run)(j)) > 
epsilon * epsilon) {
    -              changed = true
    -            }
    -            centers(run)(j) = newCenter
    -          }
    -          j += 1
    -        }
    -        if (!changed) {
    -          active(run) = false
    -          logInfo("Run " + run + " finished in " + (iteration + 1) + " 
    -        }
    -        costs(run) = costAccums(i).value
    -      }
    -      activeRuns = activeRuns.filter(active(_))
    -      iteration += 1
    -    }
    -    val iterationTimeInSeconds = (System.nanoTime() - iterationStartTime) 
/ 1e9
    -    logInfo(s"Iterations took " + "%.3f".format(iterationTimeInSeconds) + 
" seconds.")
    -    if (iteration == maxIterations) {
    -      logInfo(s"KMeans reached the max number of iterations: 
    -    } else {
    -      logInfo(s"KMeans converged in $iteration iterations.")
    -    }
    -    val (minCost, bestRun) = costs.zipWithIndex.min
    -    logInfo(s"The cost for the best run is $minCost.")
    -    new KMeansModel(centers(bestRun).map(c => 
    -  }
    -  /**
    -   * Initialize `runs` sets of cluster centers at random.
    -   */
    -  private def initRandom(data: RDD[BreezeVectorWithNorm])
    -  : Array[Array[BreezeVectorWithNorm]] = {
    -    // Sample all the cluster centers in one pass to avoid repeated scans
    -    val sample = data.takeSample(true, runs * k, new 
    -    Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).map { v =>
    -      new BreezeVectorWithNorm(v.vector.toDenseVector, v.norm)
    -    }.toArray)
    -  }
    -  /**
    -   * Initialize `runs` sets of cluster centers using the k-means|| 
algorithm by Bahmani et al.
    -   * (Bahmani et al., Scalable K-Means++, VLDB 2012). This is a variant of 
k-means++ that tries
    -   * to find with dissimilar cluster centers by starting with a random 
center and then doing
    -   * passes where more centers are chosen with probability proportional to 
their squared distance
    -   * to the current cluster set. It results in a provable approximation to 
an optimal clustering.
    -   *
    -   * The original paper can be found at
    -   */
    -  private def initKMeansParallel(data: RDD[BreezeVectorWithNorm])
    -  : Array[Array[BreezeVectorWithNorm]] = {
    -    // Initialize each run's center to a random point
    -    val seed = new XORShiftRandom().nextInt()
    -    val sample = data.takeSample(true, runs, seed).toSeq
    -    val centers = Array.tabulate(runs)(r => ArrayBuffer(sample(r).toDense))
    -    // On each step, sample 2 * k points on average for each run with 
probability proportional
    -    // to their squared distance from that run's current centers
    -    var step = 0
    -    while (step < initializationSteps) {
    -      val bcCenters = data.context.broadcast(centers)
    -      val sumCosts = data.flatMap { point =>
    -        (0 until runs).map { r =>
    -          (r, KMeans.pointCost(bcCenters.value(r), point))
    -        }
    -      }.reduceByKey(_ + _).collectAsMap()
    -      val chosen = data.mapPartitionsWithIndex { (index, points) =>
    -        val rand = new XORShiftRandom(seed ^ (step << 16) ^ index)
    -        points.flatMap { p =>
    -          (0 until runs).filter { r =>
    -            rand.nextDouble() < 2.0 * KMeans.pointCost(bcCenters.value(r), 
p) * k / sumCosts(r)
    -          }.map((_, p))
    -        }
    -      }.collect()
    -      chosen.foreach { case (r, p) =>
    -        centers(r) += p.toDense
    -      }
    -      step += 1
    -    }
    +import org.apache.spark.Logging
    -    // Finally, we might have a set of more than k candidate centers for 
each run; weigh each
    -    // candidate by the number of points in the dataset mapping to it and 
run a local k-means++
    -    // on the weighted centers to pick just k of them
    -    val bcCenters = data.context.broadcast(centers)
    -    val weightMap = data.flatMap { p =>
    -      (0 until runs).map { r =>
    -        ((r, KMeans.findClosest(bcCenters.value(r), p)._1), 1.0)
    -      }
    -    }.reduceByKey(_ + _).collectAsMap()
    -    val finalCenters = (0 until runs).map { r =>
    -      val myCenters = centers(r).toArray
    -      val myWeights = (0 until myCenters.length).map(i => 
weightMap.getOrElse((r, i), 0.0)).toArray
    -      LocalKMeans.kMeansPlusPlus(r, myCenters, myWeights, k, 30)
    -    }
    -    finalCenters.toArray
    -  }
    +import scala.reflect.ClassTag
    +import org.apache.spark.mllib.linalg.Vector
    - * Top-level methods for calling K-means clustering.
    - */
    -object KMeans {
    +object KMeans extends Logging  {
       // Initialization mode names
       val RANDOM = "random"
       val K_MEANS_PARALLEL = "k-means||"
    -  /**
    -   * Trains a k-means model using the given set of parameters.
    -   *
    -   * @param data training points stored as `RDD[Array[Double]]`
    -   * @param k number of clusters
    -   * @param maxIterations max number of iterations
    -   * @param runs number of parallel runs, defaults to 1. The best model is 
    -   * @param initializationMode initialization model, either "random" or 
"k-means||" (default).
    -   */
    -  def train(
    -      data: RDD[Vector],
    -      k: Int,
    -      maxIterations: Int,
    -      runs: Int,
    -      initializationMode: String): KMeansModel = {
    -    new KMeans().setK(k)
    -      .setMaxIterations(maxIterations)
    -      .setRuns(runs)
    -      .setInitializationMode(initializationMode)
    -      .run(data)
    -  }
    +  def train(data: RDD[Vector], k: Int, maxIterations: Int, runs: Int, 
mode: String): KMeansModel =
    +    new KMeansModel(doTrain(new FastEuclideanOps)(data, k, maxIterations, 
runs, mode)._2)
        * Trains a k-means model using specified parameters and the default 
values for unspecified.
    -  def train(
    -      data: RDD[Vector],
    -      k: Int,
    -      maxIterations: Int): KMeansModel = {
    -    train(data, k, maxIterations, 1, K_MEANS_PARALLEL)
    -  }
    +  def train(data: RDD[Vector], k: Int, maxIterations: Int): KMeansModel =
    +    new KMeansModel(doTrain(new FastEuclideanOps)(data, k, 
    -  /**
    -   * Trains a k-means model using specified parameters and the default 
values for unspecified.
    -   */
    -  def train(
    -      data: RDD[Vector],
    -      k: Int,
    -      maxIterations: Int,
    -      runs: Int): KMeansModel = {
    -    train(data, k, maxIterations, runs, K_MEANS_PARALLEL)
    -  }
    -   * Returns the index of the closest center to the given point, as well 
as the squared distance.
    +   * Trains a k-means model using specified parameters and the default 
values for unspecified.
    -  private[mllib] def findClosest(
    -      centers: TraversableOnce[BreezeVectorWithNorm],
    -      point: BreezeVectorWithNorm): (Int, Double) = {
    -    var bestDistance = Double.PositiveInfinity
    -    var bestIndex = 0
    -    var i = 0
    -    centers.foreach { center =>
    -      // Since `\|a - b\| \geq |\|a\| - \|b\||`, we can use this lower 
bound to avoid unnecessary
    -      // distance computation.
    -      var lowerBoundOfSqDist = center.norm - point.norm
    -      lowerBoundOfSqDist = lowerBoundOfSqDist * lowerBoundOfSqDist
    -      if (lowerBoundOfSqDist < bestDistance) {
    -        val distance: Double = fastSquaredDistance(center, point)
    -        if (distance < bestDistance) {
    -          bestDistance = distance
    -          bestIndex = i
    -        }
    -      }
    -      i += 1
    +  def train( data: RDD[Vector], k: Int, maxIterations: Int, runs: Int): 
KMeansModel =
    +    new KMeansModel(doTrain(new FastEuclideanOps)(data, k, maxIterations, 
    +  def doTrain[P <: FP, C <: FP](pointOps: PointOps[P, C])(
    +    raw: RDD[Vector],
    --- End diff --
    4-space indentation

