Github user freeman-lab commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2906#discussion_r22632512
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/HierarchicalClustering.scala
 ---
    @@ -0,0 +1,627 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.clustering
    +
    +import breeze.linalg.{DenseVector => BDV, SparseVector => BSV, Vector => 
BV, norm => breezeNorm}
    +import org.apache.spark.Logging
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.linalg.{Vector, Vectors}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.random.XORShiftRandom
    +
    +/**
    + * This trait is used for the configuration of the hierarchical clustering
    + */
    +sealed
    +trait HierarchicalClusteringConf extends Serializable {
    +  this: HierarchicalClustering =>
    +
    +  def setNumClusters(numClusters: Int): this.type = {
    +    this.numClusters = numClusters
    +    this
    +  }
    +
    +  def getNumClusters(): Int = this.numClusters
    +
    +  def setNumRetries(numRetries: Int): this.type = {
    +    this.numRetries = numRetries
    +    this
    +  }
    +
    +  def getNumRetries(): Int = this.numRetries
    +
    +  def setSubIterations(subIterations: Int): this.type = {
    +    this.subIterations = subIterations
    +    this
    +  }
    +
    +  def getSubIterations(): Int = this.subIterations
    +
    +  def setEpsilon(epsilon: Double): this.type = {
    +    this.epsilon = epsilon
    +    this
    +  }
    +
    +  def getEpsilon(): Double = this.epsilon
    +
    +  def setRandomSeed(seed: Int): this.type = {
    +    this.randomSeed = seed
    +    this
    +  }
    +
    +  def getRandomSeed(): Int = this.randomSeed
    +
    +  def setRandomRange(range: Double): this.type = {
    +    this.randomRange = range
    +    this
    +  }
    +}
    +
    +
    +/**
    + * This is a divisive hierarchical clustering algorithm based on bi-sect 
k-means algorithm.
    + *
    + * The main idea of this algorithm is derived from:
    + * "A comparison of document clustering techniques",
    + * M. Steinbach, G. Karypis and V. Kumar. Workshop on Text Mining, KDD, 
2000.
    + * http://cs.fit.edu/~pkc/classes/ml-internet/papers/steinbach00tr.pdf
    + *
    + * @param numClusters the number of clusters you want
    + * @param subIterations the number of iterations at digging
    + * @param epsilon the threshold to stop the sub-iterations
    + * @param randomSeed uses in sampling data for initializing centers in 
each sub iterations
    + * @param randomRange the range coefficient to generate random points in 
each clustering step
    + */
    +class HierarchicalClustering(
    +  private[mllib] var numClusters: Int,
    +  private[mllib] var subIterations: Int,
    +  private[mllib] var numRetries: Int,
    +  private[mllib] var epsilon: Double,
    +  private[mllib] var randomSeed: Int,
    +  private[mllib] var randomRange: Double)
    +    extends Serializable with Logging with HierarchicalClusteringConf {
    +
    +  /**
    +   * Constructs with the default configuration
    +   */
    +  def this() = this(20, 20, 10, 10E-4, 1, 0.1)
    +
    +  /** Shows the parameters */
    +  override def toString(): String = {
    +    Array(
    +      s"numClusters:${numClusters}",
    +      s"subIterations:${subIterations}",
    +      s"numRetries:${numRetries}",
    +      s"epsilon:${epsilon}",
    +      s"randomSeed:${randomSeed}",
    +      s"randomRange:${randomRange}"
    +    ).mkString(", ")
    +  }
    +
    +  /**
    +   * Trains a hierarchical clustering model with the given configuration
    +   *
    +   * @param data training points
    +   * @return a model for hierarchical clustering
    +   */
    +  def run(data: RDD[Vector]): HierarchicalClusteringModel = {
    +    validateData(data)
    +    logInfo(s"Run with ${this}")
    +
    +    val startTime = System.currentTimeMillis() // to measure the execution 
time
    +    val clusterTree = ClusterTree.fromRDD(data) // make the root node
    +    val model = new HierarchicalClusteringModel(clusterTree)
    +    val statsUpdater = new ClusterTreeStatsUpdater()
    +
    +    var node: Option[ClusterTree] = Some(model.clusterTree)
    +    statsUpdater(node.get)
    +
    +    // If the followed conditions are satisfied, and then stop the 
training.
    +    //   1. There is no splittable cluster
    +    //   2. The number of the splitted clusters is greater than that of 
given clusters
    +    var totalVariance = Double.MaxValue
    +    var newTotalVariance = model.clusterTree.getVariance().get
    +    var step = 1
    +    while (node != None
    +        && model.clusterTree.getTreeSize() < this.numClusters) {
    +
    +      // split some times in order not to be wrong clustering result
    +      var isMerged = false
    +      for (i <- 1 to this.numRetries) {
    +        if (node.get.getVariance().get > this.epsilon && isMerged == 
false) {
    +          var subNodes = split(node.get).map(subNode => 
statsUpdater(subNode))
    +          if (subNodes.size == 2) {
    +            // insert the nodes to the tree
    +            node.get.insert(subNodes.toList)
    +            // calculate the local dendrogram height
    +            val dist = breezeNorm(subNodes(0).center.toBreeze - 
subNodes(1).center.toBreeze, 2)
    +            node.get.height = Some(dist)
    +            // unpersist unnecessary cache because its children nodes are 
cached
    +            node.get.data.unpersist()
    +            logInfo(s"the number of cluster is 
${model.clusterTree.getTreeSize()} at step ${step}")
    +            isMerged = true
    +          }
    +        }
    +      }
    +      node.get.isVisited = true
    +
    +      // update the total variance and select the next splittable node
    +      node = nextNode(model.clusterTree)
    +      step += 1
    +    }
    +
    +    model.isTrained = true
    +    val trainTime = (System.currentTimeMillis() - startTime).toInt
    +    logInfo(s"Elapsed Time for Training: ${trainTime.toDouble / 1000} 
[sec]")
    +    model
    +  }
    +
    +  /**
    +   * validate the given data to train
    +   */
    +  private def validateData(data: RDD[Vector]) {
    +    require(this.numClusters <= data.count(), "# clusters must be less 
than # data rows")
    +  }
    +
    +  /**
    +   * Selects the next node to split
    +   */
    +  private[clustering] def nextNode(clusterTree: ClusterTree): 
Option[ClusterTree] = {
    +    // select the max variance of clusters which are leaves of a tree
    +    clusterTree.toSeq().filter(tree => tree.isSplittable() && 
!tree.isVisited) match {
    +      case list if list.isEmpty => None
    +      case list => Some(list.maxBy(_.getVariance()))
    +    }
    +  }
    +
    +  /**
    +   * Takes the initial centers for bi-sect k-means
    +   */
    +  private[clustering] def takeInitCenters(centers: Vector): 
Array[BV[Double]] = {
    +    val random = new XORShiftRandom()
    +    Array(
    +      centers.toBreeze.map(elm => elm - random.nextDouble() * elm * 
this.randomRange),
    +      centers.toBreeze.map(elm => elm + random.nextDouble() * elm * 
this.randomRange)
    +    )
    +  }
    +
    +  /**
    +   * Splits the given cluster (tree) with bi-sect k-means
    +   *
    +   * @param clusterTree the splitted cluster
    +   * @return an array of ClusterTree. its size is generally 2, but its 
size can be 1
    +   */
    +  private def split(clusterTree: ClusterTree): Array[ClusterTree] = {
    +    val startTime = System.currentTimeMillis()
    +    val data = clusterTree.data
    +    val sc = data.sparkContext
    +    var centers = takeInitCenters(clusterTree.center)
    +
    +    // TODO Supports distance metrics other Euclidean distance metric
    +    val metric = (bv1: BV[Double], bv2: BV[Double]) => breezeNorm(bv1 - 
bv2, 2.0)
    +    sc.broadcast(metric)
    +
    +    // If the following conditions are satisfied, the iteration is stopped
    +    //   1. the relative error is less than that of configuration
    +    //   2. the number of executed iteration is greater than that of 
configuration
    +    //   3. the number of centers is equal to one. if one means that the 
cluster is not splittable
    +    var numIter = 0
    +    var error = Double.MaxValue
    +    while (error > this.epsilon
    +        && numIter < this.subIterations
    +        && centers.size > 1) {
    +      val startTimeOfIter = System.currentTimeMillis()
    +
    +      sc.broadcast(centers)
    --- End diff --
    
    This use of `sc.broadcast` has no effect because the output isn't assigned 
or used. Instead, you want something like `val bcCenters = 
sc.broadcast(Centers)` and then access within the `map` as `bcCenters.value`. 
See 
[KMeans](https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/clustering/KMeans.scala#L181-185)
 for an example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to