Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/2906#discussion_r19288713 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/HierarchicalClustering.scala --- @@ -0,0 +1,549 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import breeze.linalg.{DenseVector => BDV, Vector => BV, norm => breezeNorm} +import org.apache.spark.Logging +import org.apache.spark.SparkContext._ +import org.apache.spark.mllib.linalg.{Vector, Vectors} +import org.apache.spark.rdd.RDD +import org.apache.spark.util.random.XORShiftRandom + +/** + * the configuration for a hierarchical clustering algorithm + * + * @param numClusters the number of clusters you want + * @param subIterations the number of iterations at digging + * @param epsilon the threshold to stop the sub-iterations + * @param randomSeed uses in sampling data for initializing centers in each sub iterations + * @param randomRange the range coefficient to generate random points in each clustering step + */ +class HierarchicalClusteringConf( + private var numClusters: Int, + private var subIterations: Int, + private var numRetries: Int, + private var epsilon: Double, + private var randomSeed: Int, + private[mllib] var randomRange: Double) extends Serializable { + + def this() = this(20, 5, 20, 10E-6, 1, 0.1) + + def setNumClusters(numClusters: Int): this.type = { + this.numClusters = numClusters + this + } + + def getNumClusters(): Int = this.numClusters + + def setSubIterations(iterations: Int): this.type = { + this.subIterations = iterations + this + } + + def setNumRetries(numRetries: Int): this.type = { + this.numRetries = numRetries + this + } + + def getNumRetries(): Int = this.numRetries + + def getSubIterations(): Int = this.subIterations + + def setEpsilon(epsilon: Double): this.type = { + this.epsilon = epsilon + this + } + + def getEpsilon(): Double = this.epsilon + + def setRandomSeed(seed: Int): this.type = { + this.randomSeed = seed + this + } + + def getRandomSeed(): Int = this.randomSeed + + def setRandomRange(range: Double): this.type = { + this.randomRange = range + this + } +} + + +/** + * This is a divisive hierarchical clustering algorithm based on bi-sect k-means algorithm. + * + * @param conf the configuration class for the hierarchical clustering + */ +class HierarchicalClustering(val conf: HierarchicalClusteringConf) + extends Serializable with Logging { + + /** + * Constructs with the default configuration + */ + def this() = this(new HierarchicalClusteringConf()) + + /** + * Trains a hierarchical clustering model with the given configuration + * + * @param data training points + * @return a model for hierarchical clustering + */ + def run(data: RDD[Vector]): HierarchicalClusteringModel = { + validateData(data) + logInfo(s"Run with ${conf.toString}") + + val startTime = System.currentTimeMillis() // to measure the execution time + val clusterTree = ClusterTree.fromRDD(data) // make the root node + val model = new HierarchicalClusteringModel(clusterTree) + val statsUpdater = new ClusterTreeStatsUpdater() + + var node: Option[ClusterTree] = Some(model.clusterTree) + statsUpdater(node.get) + + // If the followed conditions are satisfied, and then stop the training. + // 1. There is no splittable cluster + // 2. The number of the splitted clusters is greater than that of given clusters + // 3. The total variance of all clusters increases, when a cluster is splitted + var totalVariance = Double.MaxValue + var newTotalVariance = model.clusterTree.getVariance().get + var step = 1 + while (node != None + && model.clusterTree.getTreeSize() < this.conf.getNumClusters + && totalVariance >= newTotalVariance) { + + // split some times in order not to be wrong clustering result + var isMerged = false + var isSingleCluster = false + for (retry <- 1 to this.conf.getNumRetries()) { + if (isMerged == false && isSingleCluster == false) { + var subNodes = split(node.get).map(subNode => statsUpdater(subNode)) + // it seems that there is no splittable node + if (subNodes.size == 1) isSingleCluster = false + // add the sub nodes in to the tree + // if the sum of variance of sub nodes is greater than that of pre-splitted node + if (node.get.getVariance().get > subNodes.map(_.getVariance().get).sum) { + // insert the nodes to the tree + node.get.insert(subNodes.toList) + // calculate the local dendrogram height + val dist = breezeNorm(subNodes(0).center.toBreeze - subNodes(1).center.toBreeze, 2) + node.get.height = Some(dist) + isMerged = true + logInfo(s"the number of cluster is ${model.clusterTree.getTreeSize()} at step ${step}") + } + } + } + node.get.isVisited = true + + // update the total variance and select the next splittable node + totalVariance = newTotalVariance + newTotalVariance = model.clusterTree.toSeq().filter(_.isLeaf()).map(_.getVariance().get).sum + node = nextNode(model.clusterTree) + step += 1 + } + + model.isTrained = true + model.trainTime = (System.currentTimeMillis() - startTime).toInt + model + } + + /** + * validate the given data to train + */ + private def validateData(data: RDD[Vector]) { + conf match { + case conf if conf.getNumClusters() > data.count() => --- End diff -- Can this use `require`?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org