Github user yanboliang commented on a diff in the pull request: https://github.com/apache/spark/pull/18538#discussion_r138024385 --- Diff: mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringEvaluator.scala --- @@ -0,0 +1,437 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.ml.evaluation + +import org.apache.spark.SparkContext +import org.apache.spark.annotation.{Experimental, Since} +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.ml.linalg.{BLAS, DenseVector, Vector, Vectors, VectorUDT} +import org.apache.spark.ml.param.{Param, ParamMap, ParamValidators} +import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasPredictionCol} +import org.apache.spark.ml.util.{DefaultParamsReadable, DefaultParamsWritable, Identifiable, SchemaUtils} +import org.apache.spark.sql.{DataFrame, Dataset} +import org.apache.spark.sql.functions.{avg, col, udf} +import org.apache.spark.sql.types.IntegerType + +/** + * :: Experimental :: + * Evaluator for clustering results. + * The metric computes the Silhouette measure + * using the squared Euclidean distance. + * + * The Silhouette is a measure for the validation + * of the consistency within clusters. It ranges + * between 1 and -1, where a value close to 1 + * means that the points in a cluster are close + * to the other points in the same cluster and + * far from the points of the other clusters. + */ +@Experimental +@Since("2.3.0") +class ClusteringEvaluator @Since("2.3.0") (@Since("2.3.0") override val uid: String) + extends Evaluator with HasPredictionCol with HasFeaturesCol with DefaultParamsWritable { + + @Since("2.3.0") + def this() = this(Identifiable.randomUID("cluEval")) + + @Since("2.3.0") + override def copy(pMap: ParamMap): ClusteringEvaluator = this.defaultCopy(pMap) + + @Since("2.3.0") + override def isLargerBetter: Boolean = true + + /** @group setParam */ + @Since("2.3.0") + def setPredictionCol(value: String): this.type = set(predictionCol, value) + + /** @group setParam */ + @Since("2.3.0") + def setFeaturesCol(value: String): this.type = set(featuresCol, value) + + /** + * param for metric name in evaluation + * (supports `"squaredSilhouette"` (default)) + * @group param + */ + @Since("2.3.0") + val metricName: Param[String] = { + val allowedParams = ParamValidators.inArray(Array("squaredSilhouette")) + new Param( + this, + "metricName", + "metric name in evaluation (squaredSilhouette)", + allowedParams + ) + } + + /** @group getParam */ + @Since("2.3.0") + def getMetricName: String = $(metricName) + + /** @group setParam */ + @Since("2.3.0") + def setMetricName(value: String): this.type = set(metricName, value) + + setDefault(metricName -> "squaredSilhouette") + + @Since("2.3.0") + override def evaluate(dataset: Dataset[_]): Double = { + SchemaUtils.checkColumnType(dataset.schema, $(featuresCol), new VectorUDT) + SchemaUtils.checkColumnType(dataset.schema, $(predictionCol), IntegerType) + + // Silhouette is reasonable only when the number of clusters is grater then 1 + assert(dataset.select($(predictionCol)).distinct().count() > 1, + "Number of clusters must be greater than one.") + + $(metricName) match { + case "squaredSilhouette" => SquaredEuclideanSilhouette.computeSilhouetteScore( + dataset, + $(predictionCol), + $(featuresCol) + ) + } + } +} + + +@Since("2.3.0") +object ClusteringEvaluator + extends DefaultParamsReadable[ClusteringEvaluator] { + + @Since("2.3.0") + override def load(path: String): ClusteringEvaluator = super.load(path) + +} + + +/** + * SquaredEuclideanSilhouette computes the average of the + * Silhouette over all the data of the dataset, which is + * a measure of how appropriately the data have been clustered. + * + * The Silhouette for each point `i` is defined as: + * + * <blockquote> + * $$ + * s_{i} = \frac{b_{i}-a_{i}}{max\{a_{i},b_{i}\}} + * $$ + * </blockquote> + * + * which can be rewritten as + * + * <blockquote> + * $$ + * s_{i}= \begin{cases} + * 1-\frac{a_{i}}{b_{i}} & \text{if } a_{i} \leq b_{i} \\ + * \frac{b_{i}}{a_{i}}-1 & \text{if } a_{i} \gt b_{i} \end{cases} + * $$ + * </blockquote> + * + * where `$a_{i}$` is the average dissimilarity of `i` with all other data + * within the same cluster, `$b_{i}$` is the lowest average dissimilarity + * of `i` to any other cluster, of which `i` is not a member. + * `$a_{i}$` can be interpreted as as how well `i` is assigned to its cluster + * (the smaller the value, the better the assignment), while `$b_{i}$` is + * a measure of how well `i` has not been assigned to its "neighboring cluster", + * ie. the nearest cluster to `i`. + * + * Unfortunately, the naive implementation of the algorithm requires to compute + * the distance of each couple of points in the dataset. Since the computation of + * the distance measure takes `D` operations - if `D` is the number of dimensions + * of each point, the computational complexity of the algorithm is `O(N^2^*D)`, where + * `N` is the cardinality of the dataset. Of course this is not scalable in `N`, + * which is the critical number in a Big Data context. + * + * The algorithm which is implemented in this object, instead, is an efficient + * and parallel implementation of the Silhouette using the squared Euclidean + * distance measure. + * + * With this assumption, the total distance of the point `X` + * to the points `$C_{i}$` belonging to the cluster `$\Gamma$` is: + * + * <blockquote> + * $$ + * \sum\limits_{i=1}^N d(X, C_{i} ) = + * \sum\limits_{i=1}^N \Big( \sum\limits_{j=1}^D (x_{j}-c_{ij})^2 \Big) + * = \sum\limits_{i=1}^N \Big( \sum\limits_{j=1}^D x_{j}^2 + + * \sum\limits_{j=1}^D c_{ij}^2 -2\sum\limits_{j=1}^D x_{j}c_{ij} \Big) + * = \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}^2 + + * \sum\limits_{i=1}^N \sum\limits_{j=1}^D c_{ij}^2 + * -2 \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}c_{ij} + * $$ + * </blockquote> + * + * where `$x_{j}$` is the `j`-th dimension of the point `X` and + * `$c_{ij}$` is the `j`-th dimension of the `i`-th point in cluster `$\Gamma$`. + * + * Then, the first term of the equation can be rewritten as: + * + * <blockquote> + * $$ + * \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}^2 = N \xi_{X} \text{ , + * with } \xi_{X} = \sum\limits_{j=1}^D x_{j}^2 + * $$ + * </blockquote> + * + * where `$\xi_{X}$` is fixed for each point and it can be precomputed. + * + * Moreover, the second term is fixed for each cluster too, + * thus we can name it `$\Psi_{\Gamma}$` + * + * <blockquote> + * $$ + * \sum\limits_{i=1}^N \sum\limits_{j=1}^D c_{ij}^2 = + * \sum\limits_{i=1}^N \xi_{C_{i}} = \Psi_{\Gamma} + * $$ + * </blockquote> + * + * Last, the third element becomes + * + * <blockquote> + * $$ + * \sum\limits_{i=1}^N \sum\limits_{j=1}^D x_{j}c_{ij} = + * \sum\limits_{j=1}^D \Big(\sum\limits_{i=1}^N c_{ij} \Big) x_{j} + * $$ + * </blockquote> + * + * thus defining the vector + * + * <blockquote> + * $$ + * Y_{\Gamma}:Y_{\Gamma j} = \sum\limits_{i=1}^N c_{ij} , j=0, ..., D + * $$ + * </blockquote> + * + * which is fixed for each cluster `$\Gamma$`, we have + * + * <blockquote> + * $$ + * \sum\limits_{j=1}^D \Big(\sum\limits_{i=1}^N c_{ij} \Big) x_{j} = + * \sum\limits_{j=1}^D Y_{\Gamma j} x_{j} + * $$ + * </blockquote> + * + * In this way, the previous equation becomes + * + * <blockquote> + * $$ + * N\xi_{X} + \Psi_{\Gamma} - 2 \sum\limits_{j=1}^D Y_{\Gamma j} x_{j} + * $$ + * </blockquote> + * + * and the average distance of a point to a cluster can be computed as + * + * <blockquote> + * $$ + * \frac{\sum\limits_{i=1}^N d(X, C_{i} )^2}{N} = + * \frac{N\xi_{X} + \Psi_{\Gamma} - 2 \sum\limits_{j=1}^D Y_{\Gamma j} x_{j}}{N} = + * \xi_{X} + \frac{\Psi_{\Gamma} }{N} - 2 \frac{\sum\limits_{j=1}^D Y_{\Gamma j} x_{j}}{N} + * $$ + * </blockquote> + * + * Thus, it is enough to precompute: the constant `$\xi_{X}$` for each point `X`; the + * constants `$\Psi_{\Gamma}$`, `N` and the vector `$Y_{\Gamma}$` for + * each cluster `$\Gamma$`. + * + * In the implementation, the precomputed values for the clusters + * are distributed among the worker nodes via broadcasted variables, + * because we can assume that the clusters are limited in number and + * anyway they are much fewer than the points. + * + * The main strengths of this algorithm are the low computational complexity + * and the intrinsic parallelism. The precomputed information for each point + * and for each cluster can be computed with a computational complexity + * which is `O(N/W)`, where `N` is the number of points in the dataset and + * `W` is the number of worker nodes. After that, every point can be + * analyzed independently of the others. + * + * For every point we need to compute the average distance to all the clusters. + * Since the formula above requires `O(D)` operations, this phase has a + * computational complexity which is `O(C*D*N/W)` where `C` is the number of + * clusters (which we assume quite low), `D` is the number of dimensions, + * `N` is the number of points in the dataset and `W` is the number + * of worker nodes. + */ +private[evaluation] object SquaredEuclideanSilhouette { + + private[this] var kryoRegistrationPerformed: Boolean = false + + /** + * This method registers the class + * [[org.apache.spark.ml.evaluation.SquaredEuclideanSilhouette.ClusterStats]] + * for kryo serialization. + * + * @param sc `SparkContext` to be used + */ + def registerKryoClasses(sc: SparkContext): Unit = { + if (!kryoRegistrationPerformed) { + sc.getConf.registerKryoClasses( + Array( + classOf[SquaredEuclideanSilhouette.ClusterStats] + ) + ) + kryoRegistrationPerformed = true + } + } + + case class ClusterStats(featureSum: Vector, squaredNormSum: Double, numOfPoints: Long) + + /** + * The method takes the input dataset and computes the aggregated values + * about a cluster which are needed by the algorithm. + * + * @param df The DataFrame which contains the input data + * @param predictionCol The name of the column which contains the cluster id for the point. --- End diff -- ```the cluster id``` -> ```the predicted cluster id```
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org