Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2906#discussion_r19288871
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/HierarchicalClustering.scala
 ---
    @@ -0,0 +1,549 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm}
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.random.XORShiftRandom
    +
    +/**
    + * the configuration for a hierarchical clustering algorithm
    + *
    + * @param numClusters the number of clusters you want
    + * @param subIterations the number of iterations at digging
    + * @param epsilon the threshold to stop the sub-iterations
    + * @param randomSeed uses in sampling data for initializing centers in 
each sub iterations
    + * @param randomRange the range coefficient to generate random points in 
each clustering step
    + */
    +class HierarchicalClusteringConf(
    +  private var numClusters: Int,
    +  private var subIterations: Int,
    +  private var numRetries: Int,
    +  private var epsilon: Double,
    +  private var randomSeed: Int,
    +  private[mllib] var randomRange: Double) extends Serializable {
    +
    +  def this() = this(20, 5, 20, 10E-6, 1, 0.1)
    +
    +  def setNumClusters(numClusters: Int): this.type = {
    +    this.numClusters = numClusters
    +    this
    +  }
    +
    +  def getNumClusters(): Int = this.numClusters
    +
    +  def setSubIterations(iterations: Int): this.type = {
    +    this.subIterations = iterations
    +    this
    +  }
    +
    +  def setNumRetries(numRetries: Int): this.type = {
    +    this.numRetries = numRetries
    +    this
    +  }
    +
    +  def getNumRetries(): Int = this.numRetries
    +
    +  def getSubIterations(): Int = this.subIterations
    +
    +  def setEpsilon(epsilon: Double): this.type = {
    +    this.epsilon = epsilon
    +    this
    +  }
    +
    +  def getEpsilon(): Double = this.epsilon
    +
    +  def setRandomSeed(seed: Int): this.type = {
    +    this.randomSeed = seed
    +    this
    +  }
    +
    +  def getRandomSeed(): Int = this.randomSeed
    +
    +  def setRandomRange(range: Double): this.type = {
    +    this.randomRange = range
    +    this
    +  }
    +}
    +
    +
    +/**
    + * This is a divisive hierarchical clustering algorithm based on bi-sect 
k-means algorithm.
    + *
    + * @param conf the configuration class for the hierarchical clustering
    + */
    +class HierarchicalClustering(val conf: HierarchicalClusteringConf)
    +    extends Serializable with Logging {
    +
    +  /**
    +   * Constructs with the default configuration
    +   */
    +  def this() = this(new HierarchicalClusteringConf())
    +
    +  /**
    +   * Trains a hierarchical clustering model with the given configuration
    +   *
    +   * @param data training points
    +   * @return a model for hierarchical clustering
    +   */
    +  def run(data: RDD[Vector]): HierarchicalClusteringModel = {
    +    validateData(data)
    +    logInfo(s"Run with ${conf.toString}")
    +
    +    val startTime = System.currentTimeMillis() // to measure the execution 
time
    +    val clusterTree = ClusterTree.fromRDD(data) // make the root node
    +    val model = new HierarchicalClusteringModel(clusterTree)
    +    val statsUpdater = new ClusterTreeStatsUpdater()
    +
    +    var node: Option[ClusterTree] = Some(model.clusterTree)
    +    statsUpdater(node.get)
    +
    +    // If the followed conditions are satisfied, and then stop the 
training.
    +    //   1. There is no splittable cluster
    +    //   2. The number of the splitted clusters is greater than that of 
given clusters
    +    //   3. The total variance of all clusters increases, when a cluster 
is splitted
    +    var totalVariance = Double.MaxValue
    +    var newTotalVariance = model.clusterTree.getVariance().get
    +    var step = 1
    +    while (node != None
    +        && model.clusterTree.getTreeSize() < this.conf.getNumClusters
    +        && totalVariance >= newTotalVariance) {
    +
    +      // split some times in order not to be wrong clustering result
    +      var isMerged = false
    +      var isSingleCluster = false
    +      for (retry <- 1 to this.conf.getNumRetries()) {
    +        if (isMerged == false && isSingleCluster == false) {
    +          var subNodes = split(node.get).map(subNode => 
statsUpdater(subNode))
    +          // it seems that there is no splittable node
    +          if (subNodes.size == 1) isSingleCluster = false
    +          // add the sub nodes in to the tree
    +          // if the sum of variance of sub nodes is greater than that of 
pre-splitted node
    +          if (node.get.getVariance().get > 
subNodes.map(_.getVariance().get).sum) {
    +            // insert the nodes to the tree
    +            node.get.insert(subNodes.toList)
    +            // calculate the local dendrogram height
    +            val dist = breezeNorm(subNodes(0).center.toBreeze - 
subNodes(1).center.toBreeze, 2)
    +            node.get.height = Some(dist)
    +            isMerged = true
    +            logInfo(s"the number of cluster is 
${model.clusterTree.getTreeSize()} at step ${step}")
    +          }
    +        }
    +      }
    +      node.get.isVisited = true
    +
    +      // update the total variance and select the next splittable node
    +      totalVariance = newTotalVariance
    +      newTotalVariance = 
model.clusterTree.toSeq().filter(_.isLeaf()).map(_.getVariance().get).sum
    +      node = nextNode(model.clusterTree)
    +      step += 1
    +    }
    +
    +    model.isTrained = true
    +    model.trainTime = (System.currentTimeMillis() - startTime).toInt
    +    model
    +  }
    +
    +  /**
    +   * validate the given data to train
    +   */
    +  private def validateData(data: RDD[Vector]) {
    +    conf match {
    +      case conf if conf.getNumClusters() > data.count() =>
    +        throw new IllegalArgumentException("# clusters must be less than # 
input data records")
    +      case _ =>
    +    }
    +  }
    +
    +  /**
    +   * Selects the next node to split
    +   */
    +  private[clustering] def nextNode(clusterTree: ClusterTree): 
Option[ClusterTree] = {
    +    // select the max variance of clusters which are leafs of a tree
    +    clusterTree.toSeq().filter(tree => tree.isSplittable() && 
!tree.isVisited) match {
    +      case list if list.isEmpty => None
    +      case list => Some(list.maxBy(_.getVariance()))
    +    }
    +  }
    +
    +  /**
    +   * Takes the initial centers for bi-sect k-means
    +   */
    +  private[clustering] def takeInitCenters(centers: Vector): 
Array[BV[Double]] = {
    +    val random = new XORShiftRandom()
    +    Array(
    +      centers.toBreeze.map(elm => elm - random.nextDouble() * elm * 
this.conf.randomRange),
    +      centers.toBreeze.map(elm => elm + random.nextDouble() * elm * 
this.conf.randomRange)
    +    )
    +  }
    +
    +  /**
    +   * Splits the given cluster (tree) with bi-sect k-means
    +   *
    +   * @param clusterTree the splitted cluster
    +   * @return an array of ClusterTree. its size is generally 2, but its 
size can be 1
    +   */
    +  private def split(clusterTree: ClusterTree): Array[ClusterTree] = {
    +    val startTime = System.currentTimeMillis()
    +    val data = clusterTree.data
    +    var centers = takeInitCenters(clusterTree.center)
    +
    +    // TODO Supports distance metrics other Euclidean distance metric
    +    val metric = (bv1: BV[Double], bv2: BV[Double]) => breezeNorm(bv1 - 
bv2, 2.0)
    +    var finder = ClusterTree.findClosestCenter(metric)(centers) _
    +
    +    // If the following conditions are satisfied, the iteration is stopped
    +    //   1. the relative error is less than that of configuration
    +    //   2. the number of executed iteration is greater than that of 
configuration
    +    //   3. the number of centers is greater then 1. if 1 means that the 
cluster is not splittable
    +    var numIter = 0
    +    var error = Double.MaxValue
    +    while (error > conf.getEpsilon()
    +        && numIter < conf.getSubIterations()
    +        && centers.size > 1) {
    +
    +      val startTimeOfIter = System.currentTimeMillis()
    +      // finds the closest center of each point
    +      data.sparkContext.broadcast(finder)
    +      val newCenters = data.mapPartitions { iter =>
    +        // calculate the accumulation of the all point in a partition and 
count the rows
    +        val map = scala.collection.mutable.Map.empty[Int, (BV[Double], 
Int)]
    +        iter.foreach { point =>
    +          val idx = finder(point)
    +          val (sumBV, n) = 
map.get(idx).getOrElse((BV.zeros[Double](point.size), 0))
    +          map(idx) = (sumBV + point, n + 1)
    +        }
    +        map.toIterator
    +      }.reduceByKeyLocally {
    +        // sum the accumulation and the count in the all partition
    +        case ((p1, n1), (p2, n2)) => (p1 + p2, n1 + n2)
    +      }.map { case ((idx: Int, (center: BV[Double], counts: Int))) =>
    +        center :/ counts.toDouble
    +      }
    +
    +      val normSum = centers.map(v => breezeNorm(v, 2.0)).sum
    +      val newNormSum = newCenters.map(v => breezeNorm(v, 2.0)).sum
    +      error = Math.abs((normSum - newNormSum) / normSum)
    +      centers = newCenters.toArray
    +      numIter += 1
    +      finder = ClusterTree.findClosestCenter(metric)(centers) _
    +
    +      logInfo(s"${numIter} iterations is finished" +
    +          s" for ${System.currentTimeMillis() - startTimeOfIter}" +
    +          s" at ${getClass}.split")
    +    }
    +
    +    val vectors = centers.map(center => Vectors.fromBreeze(center))
    +    val nodes = centers.size match {
    +      case 1 => Array(new ClusterTree(vectors(0), data))
    +      case 2 => {
    +        val closest = data.map(point => (finder(point), point))
    +        centers.zipWithIndex.map { case (center, i) =>
    +          val subData = closest.filter(_._1 == i).map(_._2)
    +          subData.cache
    +          new ClusterTree(vectors(i), subData)
    +        }
    +      }
    +      case _ => throw new RuntimeException(s"something wrong with # 
centers:${centers.size}")
    +    }
    +    logInfo(s"${this.getClass.getSimpleName}.split end" +
    +        s" with total iterations" +
    +        s" for ${System.currentTimeMillis() - startTime}")
    +    nodes
    +  }
    +}
    +
    +/**
    + * top-level methods for calling the hierarchical clustering algorithm
    + */
    +object HierarchicalClustering {
    +
    +  /**
    +   * Trains a hierarchical clustering model with the given data and the 
number of clusters
    +   *
    +   * NOTE: If there is no splittable cluster, however the number of 
clusters is
    +   * less than the given that, the clustering is stopped
    +   *
    +   * @param data trained data
    +   * @param numClusters the maximum number of clusters you want
    +   * @return a hierarchical clustering model
    +   *
    +   *         TODO: The other parameters for the hierarchical clustering 
will be applied
    +   */
    +  def train(data: RDD[Vector], numClusters: Int): 
HierarchicalClusteringModel = {
    +    val conf = new HierarchicalClusteringConf()
    +        .setNumClusters(numClusters)
    +    val app = new HierarchicalClustering(conf)
    +    app.run(data)
    +  }
    +}
    +
    +
    +/**
    + * A cluster as a tree node which can have its sub nodes
    + *
    + * @param data the data in the cluster
    + * @param center the center of the cluster
    + * @param variance the statistics for splitting of the cluster
    + * @param dataSize the data size of its data
    + * @param children the sub node(s) of the cluster
    + * @param parent the parent node of the cluster
    + */
    +class ClusterTree(
    --- End diff --
    
    CAn this class be private to the package? I haven't looked that carefully


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to