This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 001d309  [SPARK-25765][ML] Add training cost to BisectingKMeans summary
001d309 is described below

commit 001d3095385626e329b3853364a4feeb811aac5a
Author: Marco Gaido <marcogaid...@gmail.com>
AuthorDate: Tue Jan 1 09:18:58 2019 -0600

    [SPARK-25765][ML] Add training cost to BisectingKMeans summary
    
    ## What changes were proposed in this pull request?
    
    The PR adds the `trainingCost` value to the `BisectingKMeansSummary`, in 
order to expose the information retrievable by running `computeCost` on the 
training dataset. This fills the gap with `KMeans` implementation.
    
    ## How was this patch tested?
    
    improved UTs
    
    Closes #22764 from mgaido91/SPARK-25765.
    
    Authored-by: Marco Gaido <marcogaid...@gmail.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../spark/ml/clustering/BisectingKMeans.scala      | 13 ++++-
 .../spark/mllib/clustering/BisectingKMeans.scala   |  3 +-
 .../mllib/clustering/BisectingKMeansModel.scala    | 59 +++++++++++++++++++---
 .../spark/ml/clustering/BisectingKMeansSuite.scala |  2 +
 .../mllib/clustering/BisectingKMeansSuite.scala    |  1 +
 project/MimaExcludes.scala                         |  3 ++
 python/pyspark/ml/clustering.py                    | 12 ++++-
 7 files changed, 82 insertions(+), 11 deletions(-)

diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala 
b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
index 49e9f51..d846f17 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/clustering/BisectingKMeans.scala
@@ -264,7 +264,12 @@ class BisectingKMeans @Since("2.0.0") (
     val parentModel = bkm.run(rdd, Some(instr))
     val model = copyValues(new BisectingKMeansModel(uid, 
parentModel).setParent(this))
     val summary = new BisectingKMeansSummary(
-      model.transform(dataset), $(predictionCol), $(featuresCol), $(k), 
$(maxIter))
+      model.transform(dataset),
+      $(predictionCol),
+      $(featuresCol),
+      $(k),
+      $(maxIter),
+      parentModel.trainingCost)
     instr.logNamedValue("clusterSizes", summary.clusterSizes)
     instr.logNumFeatures(model.clusterCenters.head.size)
     model.setSummary(Some(summary))
@@ -294,6 +299,8 @@ object BisectingKMeans extends 
DefaultParamsReadable[BisectingKMeans] {
  * @param featuresCol  Name for column of features in `predictions`.
  * @param k  Number of clusters.
  * @param numIter  Number of iterations.
+ * @param trainingCost Sum of the cost to the nearest centroid for all points 
in the training
+ *                     dataset. This is equivalent to sklearn's inertia.
  */
 @Since("2.1.0")
 @Experimental
@@ -302,4 +309,6 @@ class BisectingKMeansSummary private[clustering] (
     predictionCol: String,
     featuresCol: String,
     k: Int,
-    numIter: Int) extends ClusteringSummary(predictions, predictionCol, 
featuresCol, k, numIter)
+    numIter: Int,
+    @Since("3.0.0") val trainingCost: Double)
+  extends ClusteringSummary(predictions, predictionCol, featuresCol, k, 
numIter)
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
index 80ab8eb..696dff0 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
@@ -242,7 +242,8 @@ class BisectingKMeans private (
     norms.unpersist(false)
     val clusters = activeClusters ++ inactiveClusters
     val root = buildTree(clusters, dMeasure)
-    new BisectingKMeansModel(root, this.distanceMeasure)
+    val totalCost = root.leafNodes.map(_.cost).sum
+    new BisectingKMeansModel(root, this.distanceMeasure, totalCost)
   }
 
   /**
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
index 4c5794f..b54b891 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeansModel.scala
@@ -41,11 +41,12 @@ import org.apache.spark.sql.{Row, SparkSession}
 @Since("1.6.0")
 class BisectingKMeansModel private[clustering] (
     private[clustering] val root: ClusteringTreeNode,
-    @Since("2.4.0") val distanceMeasure: String
+    @Since("2.4.0") val distanceMeasure: String,
+    @Since("3.0.0") val trainingCost: Double
   ) extends Serializable with Saveable with Logging {
 
   @Since("1.6.0")
-  def this(root: ClusteringTreeNode) = this(root, DistanceMeasure.EUCLIDEAN)
+  def this(root: ClusteringTreeNode) = this(root, DistanceMeasure.EUCLIDEAN, 
0.0)
 
   private val distanceMeasureInstance: DistanceMeasure =
     DistanceMeasure.decodeFromString(distanceMeasure)
@@ -109,10 +110,10 @@ class BisectingKMeansModel private[clustering] (
 
   @Since("2.0.0")
   override def save(sc: SparkContext, path: String): Unit = {
-    BisectingKMeansModel.SaveLoadV2_0.save(sc, this, path)
+    BisectingKMeansModel.SaveLoadV3_0.save(sc, this, path)
   }
 
-  override protected def formatVersion: String = "2.0"
+  override protected def formatVersion: String = "3.0"
 }
 
 @Since("2.0.0")
@@ -128,11 +129,15 @@ object BisectingKMeansModel extends 
Loader[BisectingKMeansModel] {
       case (SaveLoadV2_0.thisClassName, SaveLoadV2_0.thisFormatVersion) =>
         val model = SaveLoadV2_0.load(sc, path)
         model
+      case (SaveLoadV3_0.thisClassName, SaveLoadV3_0.thisFormatVersion) =>
+        val model = SaveLoadV3_0.load(sc, path)
+        model
       case _ => throw new Exception(
         s"BisectingKMeansModel.load did not recognize model with (className, 
format version):" +
           s"($loadedClassName, $formatVersion).  Supported:\n" +
           s"  (${SaveLoadV1_0.thisClassName}, ${SaveLoadV1_0.thisClassName}\n" 
+
-          s"  (${SaveLoadV2_0.thisClassName}, ${SaveLoadV2_0.thisClassName})")
+          s"  (${SaveLoadV2_0.thisClassName}, 
${SaveLoadV2_0.thisClassName})\n" +
+          s"  (${SaveLoadV3_0.thisClassName}, ${SaveLoadV3_0.thisClassName})")
     }
   }
 
@@ -195,7 +200,8 @@ object BisectingKMeansModel extends 
Loader[BisectingKMeansModel] {
       val data = rows.select("index", "size", "center", "norm", "cost", 
"height", "children")
       val nodes = data.rdd.map(Data.apply).collect().map(d => (d.index, 
d)).toMap
       val rootNode = buildTree(rootId, nodes)
-      new BisectingKMeansModel(rootNode, DistanceMeasure.EUCLIDEAN)
+      val totalCost = rootNode.leafNodes.map(_.cost).sum
+      new BisectingKMeansModel(rootNode, DistanceMeasure.EUCLIDEAN, totalCost)
     }
   }
 
@@ -231,7 +237,46 @@ object BisectingKMeansModel extends 
Loader[BisectingKMeansModel] {
       val data = rows.select("index", "size", "center", "norm", "cost", 
"height", "children")
       val nodes = data.rdd.map(Data.apply).collect().map(d => (d.index, 
d)).toMap
       val rootNode = buildTree(rootId, nodes)
-      new BisectingKMeansModel(rootNode, distanceMeasure)
+      val totalCost = rootNode.leafNodes.map(_.cost).sum
+      new BisectingKMeansModel(rootNode, distanceMeasure, totalCost)
+    }
+  }
+
+  private[clustering] object SaveLoadV3_0 {
+    private[clustering] val thisFormatVersion = "3.0"
+
+    private[clustering]
+    val thisClassName = 
"org.apache.spark.mllib.clustering.BisectingKMeansModel"
+
+    def save(sc: SparkContext, model: BisectingKMeansModel, path: String): 
Unit = {
+      val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
+      val metadata = compact(render(
+        ("class" -> thisClassName) ~ ("version" -> thisFormatVersion)
+          ~ ("rootId" -> model.root.index) ~ ("distanceMeasure" -> 
model.distanceMeasure)
+          ~ ("trainingCost" -> model.trainingCost)))
+      sc.parallelize(Seq(metadata), 
1).saveAsTextFile(Loader.metadataPath(path))
+
+      val data = getNodes(model.root).map(node => Data(node.index, node.size,
+        node.centerWithNorm.vector, node.centerWithNorm.norm, node.cost, 
node.height,
+        node.children.map(_.index)))
+      spark.createDataFrame(data).write.parquet(Loader.dataPath(path))
+    }
+
+    def load(sc: SparkContext, path: String): BisectingKMeansModel = {
+      implicit val formats: DefaultFormats = DefaultFormats
+      val (className, formatVersion, metadata) = Loader.loadMetadata(sc, path)
+      assert(className == thisClassName)
+      assert(formatVersion == thisFormatVersion)
+      val rootId = (metadata \ "rootId").extract[Int]
+      val distanceMeasure = (metadata \ "distanceMeasure").extract[String]
+      val trainingCost = (metadata \ "trainingCost").extract[Double]
+      val spark = SparkSession.builder().sparkContext(sc).getOrCreate()
+      val rows = spark.read.parquet(Loader.dataPath(path))
+      Loader.checkSchema[Data](rows.schema)
+      val data = rows.select("index", "size", "center", "norm", "cost", 
"height", "children")
+      val nodes = data.rdd.map(Data.apply).collect().map(d => (d.index, 
d)).toMap
+      val rootNode = buildTree(rootId, nodes)
+      new BisectingKMeansModel(rootNode, distanceMeasure, trainingCost)
     }
   }
 }
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
index 1b7780e..461f8b8 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/clustering/BisectingKMeansSuite.scala
@@ -134,6 +134,8 @@ class BisectingKMeansSuite extends MLTest with 
DefaultReadWriteTest {
     assert(clusterSizes.sum === numRows)
     assert(clusterSizes.forall(_ >= 0))
     assert(summary.numIter == 20)
+    assert(summary.trainingCost < 0.1)
+    assert(model.computeCost(dataset) == summary.trainingCost)
 
     model.setSummary(None)
     assert(!model.hasSummary)
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/clustering/BisectingKMeansSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/mllib/clustering/BisectingKMeansSuite.scala
index 4a4d8b5..10d5f32 100644
--- 
a/mllib/src/test/scala/org/apache/spark/mllib/clustering/BisectingKMeansSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/mllib/clustering/BisectingKMeansSuite.scala
@@ -194,6 +194,7 @@ class BisectingKMeansSuite extends SparkFunSuite with 
MLlibTestSparkContext {
       assert(model.k === sameModel.k)
       assert(model.distanceMeasure === sameModel.distanceMeasure)
       model.clusterCenters.zip(sameModel.clusterCenters).foreach(c => c._1 === 
c._2)
+      assert(model.trainingCost == sameModel.trainingCost)
     } finally {
       Utils.deleteRecursively(tempDir)
     }
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 89fc53c..cf8d9f3 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -36,6 +36,9 @@ object MimaExcludes {
 
   // Exclude rules for 3.0.x
   lazy val v30excludes = v24excludes ++ Seq(
+    // [SPARK-25765][ML] Add training cost to BisectingKMeans summary
+    
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.clustering.BisectingKMeansModel.this"),
+
     // [SPARK-24243][CORE] Expose exceptions from InProcessAppHandle
     
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.launcher.SparkAppHandle.getError"),
 
diff --git a/python/pyspark/ml/clustering.py b/python/pyspark/ml/clustering.py
index d8a6dfb..5a776ae 100644
--- a/python/pyspark/ml/clustering.py
+++ b/python/pyspark/ml/clustering.py
@@ -587,6 +587,8 @@ class BisectingKMeans(JavaEstimator, HasDistanceMeasure, 
HasFeaturesCol, HasPred
     2
     >>> summary.clusterSizes
     [2, 2]
+    >>> summary.trainingCost
+    2.000...
     >>> transformed = model.transform(df).select("features", "prediction")
     >>> rows = transformed.collect()
     >>> rows[0].prediction == rows[1].prediction
@@ -700,7 +702,15 @@ class BisectingKMeansSummary(ClusteringSummary):
 
     .. versionadded:: 2.1.0
     """
-    pass
+
+    @property
+    @since("3.0.0")
+    def trainingCost(self):
+        """
+        Sum of squared distances to the nearest centroid for all points in the 
training dataset.
+        This is equivalent to sklearn's inertia.
+        """
+        return self._call_java("trainingCost")
 
 
 @inherit_doc


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to