This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 001d309 [SPARK-25765][ML] Add training cost to BisectingKMeans summary 001d309 is described below commit 001d3095385626e329b3853364a4feeb811aac5a Author: Marco Gaido <marcogaid...@gmail.com> AuthorDate: Tue Jan 1 09:18:58 2019 -0600 [SPARK-25765][ML] Add training cost to BisectingKMeans summary ## What changes were proposed in this pull request? The PR adds the `trainingCost` value to the `BisectingKMeansSummary`, in order to expose the information retrievable by running `computeCost` on the training dataset. This fills the gap with `KMeans` implementation. ## How was this patch tested? improved UTs Closes #22764 from mgaido91/SPARK-25765. Authored-by: Marco Gaido <marcogaid...@gmail.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../spark/ml/clustering/BisectingKMeans.scala | 13 ++++- .../spark/mllib/clustering/BisectingKMeans.scala | 3 +- .../mllib/clustering/BisectingKMeansModel.scala | 59 +++++++++++++++++++--- .../spark/ml/clustering/BisectingKMeansSuite.scala | 2 + .../mllib/clustering/BisectingKMeansSuite.scala | 1 + project/MimaExcludes.scala | 3 ++ python/pyspark/ml/clustering.py | 12 ++++- 7 files changed, 82 insertions(+), 11 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala index 49e9f51..d846f17 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala @@ -264,7 +264,12 @@ class BisectingKMeans @Since("2.0.0") ( val parentModel = bkm.run(rdd, Some(instr)) val model = copyValues(new BisectingKMeansModel(uid, parentModel).setParent(this)) val summary = new BisectingKMeansSummary( - model.transform(dataset), $(predictionCol), $(featuresCol), $(k), $(maxIter)) + model.transform(dataset), + $(predictionCol), + $(featuresCol), + $(k), + $(maxIter), + parentModel.trainingCost) instr.logNamedValue("clusterSizes", summary.clusterSizes) instr.logNumFeatures(model.clusterCenters.head.size) model.setSummary(Some(summary)) @@ -294,6 +299,8 @@ object BisectingKMeans extends DefaultParamsReadable[BisectingKMeans] { * @param featuresCol Name for column of features in `predictions`. * @param k Number of clusters. * @param numIter Number of iterations. + * @param trainingCost Sum of the cost to the nearest centroid for all points in the training + * dataset. This is equivalent to sklearn's inertia. */ @Since("2.1.0") @Experimental @@ -302,4 +309,6 @@ class BisectingKMeansSummary private[clustering] ( predictionCol: String, featuresCol: String, k: Int, - numIter: Int) extends ClusteringSummary(predictions, predictionCol, featuresCol, k, numIter) + numIter: Int, + @Since("3.0.0") val trainingCost: Double) + extends ClusteringSummary(predictions, predictionCol, featuresCol, k, numIter) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala index 80ab8eb..696dff0 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala @@ -242,7 +242,8 @@ class BisectingKMeans private ( norms.unpersist(false) val clusters = activeClusters ++ inactiveClusters val root = buildTree(clusters, dMeasure) - new BisectingKMeansModel(root, this.distanceMeasure) + val totalCost = root.leafNodes.map(_.cost).sum + new BisectingKMeansModel(root, this.distanceMeasure, totalCost) } /** diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala index 4c5794f..b54b891 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala @@ -41,11 +41,12 @@ import org.apache.spark.sql.{Row, SparkSession} @Since("1.6.0") class BisectingKMeansModel private[clustering] ( private[clustering] val root: ClusteringTreeNode, - @Since("2.4.0") val distanceMeasure: String + @Since("2.4.0") val distanceMeasure: String, + @Since("3.0.0") val trainingCost: Double ) extends Serializable with Saveable with Logging { @Since("1.6.0") - def this(root: ClusteringTreeNode) = this(root, DistanceMeasure.EUCLIDEAN) + def this(root: ClusteringTreeNode) = this(root, DistanceMeasure.EUCLIDEAN, 0.0) private val distanceMeasureInstance: DistanceMeasure = DistanceMeasure.decodeFromString(distanceMeasure) @@ -109,10 +110,10 @@ class BisectingKMeansModel private[clustering] ( @Since("2.0.0") override def save(sc: SparkContext, path: String): Unit = { - BisectingKMeansModel.SaveLoadV2_0.save(sc, this, path) + BisectingKMeansModel.SaveLoadV3_0.save(sc, this, path) } - override protected def formatVersion: String = "2.0" + override protected def formatVersion: String = "3.0" } @Since("2.0.0") @@ -128,11 +129,15 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] { case (SaveLoadV2_0.thisClassName, SaveLoadV2_0.thisFormatVersion) => val model = SaveLoadV2_0.load(sc, path) model + case (SaveLoadV3_0.thisClassName, SaveLoadV3_0.thisFormatVersion) => + val model = SaveLoadV3_0.load(sc, path) + model case _ => throw new Exception( s"BisectingKMeansModel.load did not recognize model with (className, format version):" + s"($loadedClassName, $formatVersion). Supported:\n" + s" (${SaveLoadV1_0.thisClassName}, ${SaveLoadV1_0.thisClassName}\n" + - s" (${SaveLoadV2_0.thisClassName}, ${SaveLoadV2_0.thisClassName})") + s" (${SaveLoadV2_0.thisClassName}, ${SaveLoadV2_0.thisClassName})\n" + + s" (${SaveLoadV3_0.thisClassName}, ${SaveLoadV3_0.thisClassName})") } } @@ -195,7 +200,8 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] { val data = rows.select("index", "size", "center", "norm", "cost", "height", "children") val nodes = data.rdd.map(Data.apply).collect().map(d => (d.index, d)).toMap val rootNode = buildTree(rootId, nodes) - new BisectingKMeansModel(rootNode, DistanceMeasure.EUCLIDEAN) + val totalCost = rootNode.leafNodes.map(_.cost).sum + new BisectingKMeansModel(rootNode, DistanceMeasure.EUCLIDEAN, totalCost) } } @@ -231,7 +237,46 @@ object BisectingKMeansModel extends Loader[BisectingKMeansModel] { val data = rows.select("index", "size", "center", "norm", "cost", "height", "children") val nodes = data.rdd.map(Data.apply).collect().map(d => (d.index, d)).toMap val rootNode = buildTree(rootId, nodes) - new BisectingKMeansModel(rootNode, distanceMeasure) + val totalCost = rootNode.leafNodes.map(_.cost).sum + new BisectingKMeansModel(rootNode, distanceMeasure, totalCost) + } + } + + private[clustering] object SaveLoadV3_0 { + private[clustering] val thisFormatVersion = "3.0" + + private[clustering] + val thisClassName = "org.apache.spark.mllib.clustering.BisectingKMeansModel" + + def save(sc: SparkContext, model: BisectingKMeansModel, path: String): Unit = { + val spark = SparkSession.builder().sparkContext(sc).getOrCreate() + val metadata = compact(render( + ("class" -> thisClassName) ~ ("version" -> thisFormatVersion) + ~ ("rootId" -> model.root.index) ~ ("distanceMeasure" -> model.distanceMeasure) + ~ ("trainingCost" -> model.trainingCost))) + sc.parallelize(Seq(metadata), 1).saveAsTextFile(Loader.metadataPath(path)) + + val data = getNodes(model.root).map(node => Data(node.index, node.size, + node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, node.height, + node.children.map(_.index))) + spark.createDataFrame(data).write.parquet(Loader.dataPath(path)) + } + + def load(sc: SparkContext, path: String): BisectingKMeansModel = { + implicit val formats: DefaultFormats = DefaultFormats + val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path) + assert(className == thisClassName) + assert(formatVersion == thisFormatVersion) + val rootId = (metadata \ "rootId").extract[Int] + val distanceMeasure = (metadata \ "distanceMeasure").extract[String] + val trainingCost = (metadata \ "trainingCost").extract[Double] + val spark = SparkSession.builder().sparkContext(sc).getOrCreate() + val rows = spark.read.parquet(Loader.dataPath(path)) + Loader.checkSchema[Data](rows.schema) + val data = rows.select("index", "size", "center", "norm", "cost", "height", "children") + val nodes = data.rdd.map(Data.apply).collect().map(d => (d.index, d)).toMap + val rootNode = buildTree(rootId, nodes) + new BisectingKMeansModel(rootNode, distanceMeasure, trainingCost) } } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala index 1b7780e..461f8b8 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala @@ -134,6 +134,8 @@ class BisectingKMeansSuite extends MLTest with DefaultReadWriteTest { assert(clusterSizes.sum === numRows) assert(clusterSizes.forall(_ >= 0)) assert(summary.numIter == 20) + assert(summary.trainingCost < 0.1) + assert(model.computeCost(dataset) == summary.trainingCost) model.setSummary(None) assert(!model.hasSummary) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/clustering/BisectingKMeansSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/clustering/BisectingKMeansSuite.scala index 4a4d8b5..10d5f32 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/clustering/BisectingKMeansSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/clustering/BisectingKMeansSuite.scala @@ -194,6 +194,7 @@ class BisectingKMeansSuite extends SparkFunSuite with MLlibTestSparkContext { assert(model.k === sameModel.k) assert(model.distanceMeasure === sameModel.distanceMeasure) model.clusterCenters.zip(sameModel.clusterCenters).foreach(c => c._1 === c._2) + assert(model.trainingCost == sameModel.trainingCost) } finally { Utils.deleteRecursively(tempDir) } diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 89fc53c..cf8d9f3 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -36,6 +36,9 @@ object MimaExcludes { // Exclude rules for 3.0.x lazy val v30excludes = v24excludes ++ Seq( + // [SPARK-25765][ML] Add training cost to BisectingKMeans summary + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.BisectingKMeansModel.this"), + // [SPARK-24243][CORE] Expose exceptions from InProcessAppHandle ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.launcher.SparkAppHandle.getError"), diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py index d8a6dfb..5a776ae 100644 --- a/python/pyspark/ml/clustering.py +++ b/python/pyspark/ml/clustering.py @@ -587,6 +587,8 @@ class BisectingKMeans(JavaEstimator, HasDistanceMeasure, HasFeaturesCol, HasPred 2 >>> summary.clusterSizes [2, 2] + >>> summary.trainingCost + 2.000... >>> transformed = model.transform(df).select("features", "prediction") >>> rows = transformed.collect() >>> rows[0].prediction == rows[1].prediction @@ -700,7 +702,15 @@ class BisectingKMeansSummary(ClusteringSummary): .. versionadded:: 2.1.0 """ - pass + + @property + @since("3.0.0") + def trainingCost(self): + """ + Sum of squared distances to the nearest centroid for all points in the training dataset. + This is equivalent to sklearn's inertia. + """ + return self._call_java("trainingCost") @inherit_doc --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org