Github user freeman-lab commented on a diff in the pull request:
    --- Diff: 
    @@ -0,0 +1,627 @@
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +package org.apache.spark.mllib.clustering
    +import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => 
BV, norm => breezeNorm}
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.random.XORShiftRandom
    + * This trait is used for the configuration of the hierarchical clustering
    + */
    +trait HierarchicalClusteringConf extends Serializable {
    +  this: HierarchicalClustering =>
    +  def setNumClusters(numClusters: Int): this.type = {
    +    this.numClusters = numClusters
    +    this
    +  }
    +  def getNumClusters(): Int = this.numClusters
    +  def setNumRetries(numRetries: Int): this.type = {
    +    this.numRetries = numRetries
    +    this
    +  }
    +  def getNumRetries(): Int = this.numRetries
    +  def setSubIterations(subIterations: Int): this.type = {
    +    this.subIterations = subIterations
    +    this
    +  }
    +  def getSubIterations(): Int = this.subIterations
    +  def setEpsilon(epsilon: Double): this.type = {
    +    this.epsilon = epsilon
    +    this
    +  }
    +  def getEpsilon(): Double = this.epsilon
    +  def setRandomSeed(seed: Int): this.type = {
    +    this.randomSeed = seed
    +    this
    +  }
    +  def getRandomSeed(): Int = this.randomSeed
    +  def setRandomRange(range: Double): this.type = {
    +    this.randomRange = range
    +    this
    +  }
    + * This is a divisive hierarchical clustering algorithm based on bi-sect 
k-means algorithm.
    + *
    + * The main idea of this algorithm is derived from:
    + * "A comparison of document clustering techniques",
    + * M. Steinbach, G. Karypis and V. Kumar. Workshop on Text Mining, KDD, 
    + *
    + *
    + * @param numClusters the number of clusters you want
    + * @param subIterations the number of iterations at digging
    + * @param epsilon the threshold to stop the sub-iterations
    + * @param randomSeed uses in sampling data for initializing centers in 
each sub iterations
    + * @param randomRange the range coefficient to generate random points in 
each clustering step
    + */
    +class HierarchicalClustering(
    +  private[mllib] var numClusters: Int,
    +  private[mllib] var subIterations: Int,
    +  private[mllib] var numRetries: Int,
    +  private[mllib] var epsilon: Double,
    +  private[mllib] var randomSeed: Int,
    +  private[mllib] var randomRange: Double)
    +    extends Serializable with Logging with HierarchicalClusteringConf {
    +  /**
    +   * Constructs with the default configuration
    +   */
    +  def this() = this(20, 20, 10, 10E-4, 1, 0.1)
    +  /** Shows the parameters */
    +  override def toString(): String = {
    +    Array(
    +      s"numClusters:${numClusters}",
    +      s"subIterations:${subIterations}",
    +      s"numRetries:${numRetries}",
    +      s"epsilon:${epsilon}",
    +      s"randomSeed:${randomSeed}",
    +      s"randomRange:${randomRange}"
    +    ).mkString(", ")
    +  }
    +  /**
    +   * Trains a hierarchical clustering model with the given configuration
    +   *
    +   * @param data training points
    +   * @return a model for hierarchical clustering
    +   */
    +  def run(data: RDD[Vector]): HierarchicalClusteringModel = {
    +    validateData(data)
    +    logInfo(s"Run with ${this}")
    +    val startTime = System.currentTimeMillis() // to measure the execution 
    +    val clusterTree = ClusterTree.fromRDD(data) // make the root node
    +    val model = new HierarchicalClusteringModel(clusterTree)
    +    val statsUpdater = new ClusterTreeStatsUpdater()
    +    var node: Option[ClusterTree] = Some(model.clusterTree)
    +    statsUpdater(node.get)
    +    // If the followed conditions are satisfied, and then stop the 
    +    //   1. There is no splittable cluster
    +    //   2. The number of the splitted clusters is greater than that of 
given clusters
    +    var totalVariance = Double.MaxValue
    +    var newTotalVariance = model.clusterTree.getVariance().get
    +    var step = 1
    +    while (node != None
    +        && model.clusterTree.getTreeSize() < this.numClusters) {
    +      // split some times in order not to be wrong clustering result
    +      var isMerged = false
    +      for (i <- 1 to this.numRetries) {
    +        if (node.get.getVariance().get > this.epsilon && isMerged == 
false) {
    +          var subNodes = split(node.get).map(subNode => 
    +          if (subNodes.size == 2) {
    +            // insert the nodes to the tree
    +            node.get.insert(subNodes.toList)
    +            // calculate the local dendrogram height
    +            val dist = breezeNorm(subNodes(0).center.toBreeze - 
subNodes(1).center.toBreeze, 2)
    +            node.get.height = Some(dist)
    +            // unpersist unnecessary cache because its children nodes are 
    +            logInfo(s"the number of cluster is 
${model.clusterTree.getTreeSize()} at step ${step}")
    +            isMerged = true
    +          }
    +        }
    +      }
    +      node.get.isVisited = true
    +      // update the total variance and select the next splittable node
    +      node = nextNode(model.clusterTree)
    +      step += 1
    +    }
    +    model.isTrained = true
    +    val trainTime = (System.currentTimeMillis() - startTime).toInt
    +    logInfo(s"Elapsed Time for Training: ${trainTime.toDouble / 1000} 
    +    model
    +  }
    +  /**
    +   * validate the given data to train
    +   */
    +  private def validateData(data: RDD[Vector]) {
    +    require(this.numClusters <= data.count(), "# clusters must be less 
than # data rows")
    +  }
    +  /**
    +   * Selects the next node to split
    +   */
    +  private[clustering] def nextNode(clusterTree: ClusterTree): 
Option[ClusterTree] = {
    +    // select the max variance of clusters which are leaves of a tree
    +    clusterTree.toSeq().filter(tree => tree.isSplittable() && 
!tree.isVisited) match {
    +      case list if list.isEmpty => None
    +      case list => Some(list.maxBy(_.getVariance()))
    +    }
    +  }
    +  /**
    +   * Takes the initial centers for bi-sect k-means
    +   */
    +  private[clustering] def takeInitCenters(centers: Vector): 
Array[BV[Double]] = {
    +    val random = new XORShiftRandom()
    +    Array(
    + => elm - random.nextDouble() * elm * 
    + => elm + random.nextDouble() * elm * 
    +    )
    +  }
    +  /**
    +   * Splits the given cluster (tree) with bi-sect k-means
    +   *
    +   * @param clusterTree the splitted cluster
    +   * @return an array of ClusterTree. its size is generally 2, but its 
size can be 1
    +   */
    +  private def split(clusterTree: ClusterTree): Array[ClusterTree] = {
    +    val startTime = System.currentTimeMillis()
    +    val data =
    +    val sc = data.sparkContext
    +    var centers = takeInitCenters(
    +    // TODO Supports distance metrics other Euclidean distance metric
    +    val metric = (bv1: BV[Double], bv2: BV[Double]) => breezeNorm(bv1 - 
bv2, 2.0)
    +    sc.broadcast(metric)
    +    // If the following conditions are satisfied, the iteration is stopped
    +    //   1. the relative error is less than that of configuration
    +    //   2. the number of executed iteration is greater than that of 
    +    //   3. the number of centers is equal to one. if one means that the 
cluster is not splittable
    +    var numIter = 0
    +    var error = Double.MaxValue
    +    while (error > this.epsilon
    +        && numIter < this.subIterations
    +        && centers.size > 1) {
    +      val startTimeOfIter = System.currentTimeMillis()
    +      sc.broadcast(centers)
    +      val newCenters = data.mapPartitions { iter =>
    +        // calculate the accumulation of the all point in a partition and 
count the rows
    +        val map = scala.collection.mutable.Map.empty[Int, (BV[Double], 
    +        iter.foreach { point =>
    +          val idx = ClusterTree.findClosestCenter(metric)(centers)(point)
    +          val (sumBV, n) = map.get(idx)
    +              .getOrElse((new BSV[Double](Array(), Array(), point.size), 
    +          map(idx) = (sumBV + point, n + 1)
    +        }
    +        map.toIterator
    +      }.reduceByKeyLocally {
    +        // sum the accumulation and the count in the all partition
    +        case ((p1, n1), (p2, n2)) => (p1 + p2, n1 + n2)
    +      }.map { case ((idx: Int, (center: BV[Double], counts: Int))) =>
    +        center :/ counts.toDouble
    +      }
    +      val normSum = => breezeNorm(v, 2.0)).sum
    +      val newNormSum = => breezeNorm(v, 2.0)).sum
    +      error = math.abs((normSum - newNormSum) / normSum)
    +      centers = newCenters.toArray
    +      numIter += 1
    +      logInfo(s"${numIter} iterations is finished" +
    +          s" for ${System.currentTimeMillis() - startTimeOfIter}" +
    +          s" at ${getClass}.split")
    +    }
    +    val vectors = => Vectors.fromBreeze(center))
    +    val nodes = centers.size match {
    +      case 1 => Array(new ClusterTree(vectors(0), data))
    +      case 2 => {
    +        val closest = => 
(ClusterTree.findClosestCenter(metric)(centers)(p), p))
    + { case (center, i) =>
    +          val subData = closest.filter(_._1 == i).map(_._2)
    +          subData.cache
    +          new ClusterTree(vectors(i), subData)
    +        }
    +      }
    +      case _ => throw new RuntimeException(s"something wrong with # 
    +    }
    +    logInfo(s"${this.getClass.getSimpleName}.split end" +
    +        s" with total iterations" +
    +        s" for ${System.currentTimeMillis() - startTime}")
    +    nodes
    +  }
    + * top-level methods for calling the hierarchical clustering algorithm
    + */
    +object HierarchicalClustering {
    +  /**
    +   * Trains a hierarchical clustering model with the given data
    +   *
    +   * @param data trained data
    +   * @param numClusters the maximum number of clusters you want
    +   * @return a hierarchical clustering model
    +   */
    +  def train(data: RDD[Vector], numClusters: Int): 
HierarchicalClusteringModel = {
    +    val app = new HierarchicalClustering().setNumClusters(numClusters)
    +  }
    +  /**
    +   * Trains a hierarchical clustering model with the given data
    +   *
    +   * @param data trained data
    +   * @param numClusters the maximum number of clusters you want
    +   * @param subIterations the iteration of
    +   * @param numRetries the number of retries when the clustering can't be 
    +   * @param epsilon the relative error that bisecting is satisfied
    +   * @param randomSeed the randomseed to generate the initial vectors for 
each bisecting
    +   * @param randomRange the range of error to genrate the initial vectors 
for each bisecting
    +   * @return a hierarchical clustering model
    +   */
    +  def train(
    +    data: RDD[Vector],
    +    numClusters: Int,
    +    subIterations: Int,
    +    numRetries: Int,
    +    epsilon: Double,
    +    randomSeed: Int,
    +    randomRange: Double): HierarchicalClusteringModel = {
    +    val algo = new HierarchicalClustering()
    +        .setNumClusters(numClusters)
    +        .setSubIterations(subIterations)
    +        .setNumRetries(numRetries)
    +        .setEpsilon(epsilon)
    +        .setRandomSeed(randomSeed)
    +        .setRandomRange(randomRange)
    +  }
    + * A cluster as a tree node which can have its sub nodes
    + *
    + * @param center the center of the cluster
    + * @param data the data in the cluster
    + * @param height distance between sub nodes
    + * @param variance the statistics for splitting of the cluster
    + * @param dataSize the data size of its data
    + * @param children the sub node(s) of the cluster
    + * @param parent the parent node of the cluster
    + * @param isVisited a flag to be searched
    + */
    +class ClusterTree private (
    +  val center: Vector,
    +  private[mllib] val data: RDD[BV[Double]],
    +  private[mllib] var height: Option[Double],
    +  private[mllib] var variance: Option[Double],
    +  private[mllib] var dataSize: Option[Long],
    +  private[mllib] var children: List[ClusterTree],
    +  private[mllib] var parent: Option[ClusterTree],
    +  private[mllib] var isVisited: Boolean) extends Serializable with 
Cloneable with Logging {
    +  def this(center: Vector, data: RDD[BV[Double]]) =
    +    this(center, data, None, None, None, List.empty[ClusterTree], None, 
    +  override def clone(): ClusterTree = {
    +    val cloned = new ClusterTree(
    +      this.height,
    +      this.variance,
    +      this.dataSize,
    +      List.empty[ClusterTree],
    +      None,
    +      this.isVisited
    +    )
    +    val clonedChildren = => child.clone()).toList
    +    cloned.insert(clonedChildren)
    +    cloned
    +  }
    +  override def toString(): String = {
    +    val elements = Array(
    +      s"hashCode:${this.hashCode()}",
    +      s"depth:${this.getDepth()}",
    +      s"dataSize:${this.dataSize.get}",
    +      s"variance:${this.variance.get}",
    +      s"parent:${this.parent.hashCode()}",
    +      s"children:${}",
    +      s"isLeaf:${this.isLeaf()}",
    +      s"isVisited:${this.isVisited}"
    +    )
    +    elements.mkString(", ")
    +  }
    +  /**
    +   * Cuts a cluster tree
    +   *
    +   * @param height the threshold of height to cut a cluster tree
    +   * @return a cut hierarchical clustering model
    +   */
    +  private[mllib] def cut(height: Double): ClusterTree = {
    +    this.children.foreach { child =>
    +      if (child.getHeight() < height && child.children.size > 0) {
    +        child.children.foreach(grandchild => child.delete(grandchild))
    +      }
    +    }
    +    this.children.foreach(child => child.cut(height))
    +    this
    +  }
    +  /**
    +   * Inserts sub nodes as its children
    +   *
    +   * @param children inserted sub nodes
    +   */
    +  def insert(children: List[ClusterTree]): Unit = {
    +    this.children = this.children ++ children
    +    children.foreach(child => child.parent = Some(this))
    +  }
    +  /**
    +   * Inserts a sub node as its child
    +   *
    +   * @param child inserted sub node
    +   */
    +  def insert(child: ClusterTree): Unit = insert(List(child))
    +  /** Deletes all child */
    +  def delete() = this.children = List.empty[ClusterTree]
    +  /** Deletes a child */
    +  def delete(target: ClusterTree) {
    +    this.children.contains(target) match {
    +      case true => this.children = this.children.filter(child => child != 
    +      case false => logWarning("You attempted to delete a node which is 
not contained")
    +    }
    +  }
    +  /**
    +   * Converts the tree into Seq class
    +   * the sub nodes are recursively expanded
    +   *
    +   * @return Seq class which the cluster tree is expanded
    +   */
    +  def toSeq(): Seq[ClusterTree] = {
    +    val seq = this.children.size match {
    +      case 0 => Seq(this)
    +      case _ => Seq(this) ++ => 
    +    }
    +    seq.sortWith { case (a, b) =>
    +      a.getDepth() < b.getDepth() &&
    +          breezeNorm(, 2) < breezeNorm(, 
    +    }
    +  }
    +  /**
    +   * Gets the all clusters which are leaves in the cluster tree
    +   * @return the Seq of the clusters
    +   */
    +  def getClusters(): Seq[ClusterTree] = toSeq().filter(_.isLeaf())
    +  /**
    +   * Gets the depth of the cluster in the tree
    +   *
    +   * @return the depth
    +   */
    +  def getDepth(): Int = {
    +    this.parent match {
    +      case None => 0
    +      case _ => 1 + this.parent.get.getDepth()
    +    }
    +  }
    +  /**
    +   * Gets the dendrogram height of the cluster at the cluster tree
    +   *
    +   * @return the dendrogram height
    +   */
    +  def getHeight(): Double = {
    +    this.children.size match {
    +      case 0 => 0.0
    +      case _ => this.height.get +
    +    }
    +  }
    +  /**
    +   * Assigns the closest cluster with a vector
    +   * @param metric distance metric
    --- End diff --
    Insert line break

If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at or file a JIRA ticket
with INFRA.

To unsubscribe, e-mail:
For additional commands, e-mail:

Reply via email to