Repository: spark
Updated Branches:
  refs/heads/branch-1.5 92e8acc98 -> ee43d355b


[SPARK-9493] [ML] add featureIndex to handle vector features in 
IsotonicRegression

This PR contains the following changes:
* add `featureIndex` to handle vector features (in order to chain isotonic 
regression easily with output from logistic regression
* make getter/setter names consistent with params
* remove inheritance from Regressor because it is tricky to handle both 
`DoubleType` and `VectorType`
* simplify test data generation

jkbradley zapletal-martin

Author: Xiangrui Meng <m...@databricks.com>

Closes #7952 from mengxr/SPARK-9493 and squashes the following commits:

8818ac3 [Xiangrui Meng] address comments
05e2216 [Xiangrui Meng] address comments
8d08090 [Xiangrui Meng] add featureIndex to handle vector features make 
getter/setter names consistent with params remove inheritance from Regressor

(cherry picked from commit 54c0789a05a783ce90e0e9848079be442a82966b)
Signed-off-by: Joseph K. Bradley <jos...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee43d355
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee43d355
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee43d355

Branch: refs/heads/branch-1.5
Commit: ee43d355bcfc9c3f4f281f0c44e1b1f331c7bb97
Parents: 92e8acc
Author: Xiangrui Meng <m...@databricks.com>
Authored: Thu Aug 6 13:29:31 2015 -0700
Committer: Joseph K. Bradley <jos...@databricks.com>
Committed: Thu Aug 6 13:29:38 2015 -0700

----------------------------------------------------------------------
 .../ml/regression/IsotonicRegression.scala      | 202 ++++++++++++++-----
 .../ml/regression/IsotonicRegressionSuite.scala |  82 ++++----
 2 files changed, 194 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ee43d355/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala 
b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
index 4ece8cf..f570590 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/regression/IsotonicRegression.scala
@@ -17,44 +17,113 @@
 
 package org.apache.spark.ml.regression
 
+import org.apache.spark.Logging
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.ml.PredictorParams
-import org.apache.spark.ml.param.{Param, ParamMap, BooleanParam}
-import org.apache.spark.ml.util.{SchemaUtils, Identifiable}
-import org.apache.spark.mllib.regression.{IsotonicRegression => 
MLlibIsotonicRegression}
-import org.apache.spark.mllib.regression.{IsotonicRegressionModel => 
MLlibIsotonicRegressionModel}
+import org.apache.spark.ml.{Estimator, Model}
+import org.apache.spark.ml.param._
+import org.apache.spark.ml.param.shared.{HasFeaturesCol, HasLabelCol, 
HasPredictionCol}
+import org.apache.spark.ml.util.{Identifiable, SchemaUtils}
+import org.apache.spark.mllib.linalg.{Vector, VectorUDT, Vectors}
+import org.apache.spark.mllib.regression.{IsotonicRegression => 
MLlibIsotonicRegression, IsotonicRegressionModel => 
MLlibIsotonicRegressionModel}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.types.{DoubleType, DataType}
-import org.apache.spark.sql.{Row, DataFrame}
+import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.functions.{col, lit, udf}
+import org.apache.spark.sql.types.{DoubleType, StructType}
 import org.apache.spark.storage.StorageLevel
 
 /**
  * Params for isotonic regression.
  */
-private[regression] trait IsotonicRegressionParams extends PredictorParams {
+private[regression] trait IsotonicRegressionBase extends Params with 
HasFeaturesCol
+  with HasLabelCol with HasPredictionCol with Logging {
 
   /**
-   * Param for weight column name.
-   * TODO: Move weightCol to sharedParams.
-   *
+   * Param for weight column name (default: none).
    * @group param
    */
+  // TODO: Move weightCol to sharedParams.
   final val weightCol: Param[String] =
-    new Param[String](this, "weightCol", "weight column name")
+    new Param[String](this, "weightCol",
+      "weight column name. If this is not set or empty, we treat all instance 
weights as 1.0.")
 
   /** @group getParam */
   final def getWeightCol: String = $(weightCol)
 
   /**
-   * Param for isotonic parameter.
-   * Isotonic (increasing) or antitonic (decreasing) sequence.
+   * Param for whether the output sequence should be isotonic/increasing 
(true) or
+   * antitonic/decreasing (false).
    * @group param
    */
   final val isotonic: BooleanParam =
-    new BooleanParam(this, "isotonic", "isotonic (increasing) or antitonic 
(decreasing) sequence")
+    new BooleanParam(this, "isotonic",
+      "whether the output sequence should be isotonic/increasing (true) or" +
+        "antitonic/decreasing (false)")
 
   /** @group getParam */
-  final def getIsotonicParam: Boolean = $(isotonic)
+  final def getIsotonic: Boolean = $(isotonic)
+
+  /**
+   * Param for the index of the feature if [[featuresCol]] is a vector column 
(default: `0`), no
+   * effect otherwise.
+   * @group param
+   */
+  final val featureIndex: IntParam = new IntParam(this, "featureIndex",
+    "The index of the feature if featuresCol is a vector column, no effect 
otherwise.")
+
+  /** @group getParam */
+  final def getFeatureIndex: Int = $(featureIndex)
+
+  setDefault(isotonic -> true, featureIndex -> 0)
+
+  /** Checks whether the input has weight column. */
+  protected[ml] def hasWeightCol: Boolean = {
+    isDefined(weightCol) && $(weightCol) != ""
+  }
+
+  /**
+   * Extracts (label, feature, weight) from input dataset.
+   */
+  protected[ml] def extractWeightedLabeledPoints(
+      dataset: DataFrame): RDD[(Double, Double, Double)] = {
+    val f = if 
(dataset.schema($(featuresCol)).dataType.isInstanceOf[VectorUDT]) {
+      val idx = $(featureIndex)
+      val extract = udf { v: Vector => v(idx) }
+      extract(col($(featuresCol)))
+    } else {
+      col($(featuresCol))
+    }
+    val w = if (hasWeightCol) {
+      col($(weightCol))
+    } else {
+      lit(1.0)
+    }
+    dataset.select(col($(labelCol)), f, w)
+      .map { case Row(label: Double, feature: Double, weights: Double) =>
+      (label, feature, weights)
+    }
+  }
+
+  /**
+   * Validates and transforms input schema.
+   * @param schema input schema
+   * @param fitting whether this is in fitting or prediction
+   * @return output schema
+   */
+  protected[ml] def validateAndTransformSchema(
+      schema: StructType,
+      fitting: Boolean): StructType = {
+    if (fitting) {
+      SchemaUtils.checkColumnType(schema, $(labelCol), DoubleType)
+      if (hasWeightCol) {
+        SchemaUtils.checkColumnType(schema, $(weightCol), DoubleType)
+      } else {
+        logInfo("The weight column is not defined. Treat all instance weights 
as 1.0.")
+      }
+    }
+    val featuresType = schema($(featuresCol)).dataType
+    require(featuresType == DoubleType || featuresType.isInstanceOf[VectorUDT])
+    SchemaUtils.appendColumn(schema, $(predictionCol), DoubleType)
+  }
 }
 
 /**
@@ -67,52 +136,46 @@ private[regression] trait IsotonicRegressionParams extends 
PredictorParams {
  * Uses [[org.apache.spark.mllib.regression.IsotonicRegression]].
  */
 @Experimental
-class IsotonicRegression(override val uid: String)
-  extends Regressor[Double, IsotonicRegression, IsotonicRegressionModel]
-  with IsotonicRegressionParams {
+class IsotonicRegression(override val uid: String) extends 
Estimator[IsotonicRegressionModel]
+  with IsotonicRegressionBase {
 
   def this() = this(Identifiable.randomUID("isoReg"))
 
-  /**
-   * Set the isotonic parameter.
-   * Default is true.
-   * @group setParam
-   */
-  def setIsotonicParam(value: Boolean): this.type = set(isotonic, value)
-  setDefault(isotonic -> true)
+  /** @group setParam */
+  def setLabelCol(value: String): this.type = set(labelCol, value)
 
-  /**
-   * Set weight column param.
-   * Default is weight.
-   * @group setParam
-   */
-  def setWeightParam(value: String): this.type = set(weightCol, value)
-  setDefault(weightCol -> "weight")
+  /** @group setParam */
+  def setFeaturesCol(value: String): this.type = set(featuresCol, value)
 
-  override private[ml] def featuresDataType: DataType = DoubleType
+  /** @group setParam */
+  def setPredictionCol(value: String): this.type = set(predictionCol, value)
 
-  override def copy(extra: ParamMap): IsotonicRegression = defaultCopy(extra)
+  /** @group setParam */
+  def setIsotonic(value: Boolean): this.type = set(isotonic, value)
 
-  private[this] def extractWeightedLabeledPoints(
-      dataset: DataFrame): RDD[(Double, Double, Double)] = {
+  /** @group setParam */
+  def setWeightCol(value: String): this.type = set(weightCol, value)
 
-    dataset.select($(labelCol), $(featuresCol), $(weightCol))
-      .map { case Row(label: Double, features: Double, weights: Double) =>
-        (label, features, weights)
-      }
-  }
+  /** @group setParam */
+  def setFeatureIndex(value: Int): this.type = set(featureIndex, value)
 
-  override protected def train(dataset: DataFrame): IsotonicRegressionModel = {
-    SchemaUtils.checkColumnType(dataset.schema, $(weightCol), DoubleType)
+  override def copy(extra: ParamMap): IsotonicRegression = defaultCopy(extra)
+
+  override def fit(dataset: DataFrame): IsotonicRegressionModel = {
+    validateAndTransformSchema(dataset.schema, fitting = true)
     // Extract columns from data.  If dataset is persisted, do not persist 
oldDataset.
     val instances = extractWeightedLabeledPoints(dataset)
     val handlePersistence = dataset.rdd.getStorageLevel == StorageLevel.NONE
     if (handlePersistence) instances.persist(StorageLevel.MEMORY_AND_DISK)
 
     val isotonicRegression = new 
MLlibIsotonicRegression().setIsotonic($(isotonic))
-    val parentModel = isotonicRegression.run(instances)
+    val oldModel = isotonicRegression.run(instances)
 
-    new IsotonicRegressionModel(uid, parentModel)
+    copyValues(new IsotonicRegressionModel(uid, oldModel).setParent(this))
+  }
+
+  override def transformSchema(schema: StructType): StructType = {
+    validateAndTransformSchema(schema, fitting = true)
   }
 }
 
@@ -123,22 +186,49 @@ class IsotonicRegression(override val uid: String)
  *
  * For detailed rules see 
[[org.apache.spark.mllib.regression.IsotonicRegressionModel.predict()]].
  *
- * @param parentModel A 
[[org.apache.spark.mllib.regression.IsotonicRegressionModel]]
- *                    model trained by 
[[org.apache.spark.mllib.regression.IsotonicRegression]].
+ * @param oldModel A 
[[org.apache.spark.mllib.regression.IsotonicRegressionModel]]
+ *                 model trained by 
[[org.apache.spark.mllib.regression.IsotonicRegression]].
  */
+@Experimental
 class IsotonicRegressionModel private[ml] (
     override val uid: String,
-    private[ml] val parentModel: MLlibIsotonicRegressionModel)
-  extends RegressionModel[Double, IsotonicRegressionModel]
-  with IsotonicRegressionParams {
+    private val oldModel: MLlibIsotonicRegressionModel)
+  extends Model[IsotonicRegressionModel] with IsotonicRegressionBase {
 
-  override def featuresDataType: DataType = DoubleType
+  /** @group setParam */
+  def setFeaturesCol(value: String): this.type = set(featuresCol, value)
 
-  override protected def predict(features: Double): Double = {
-    parentModel.predict(features)
-  }
+  /** @group setParam */
+  def setPredictionCol(value: String): this.type = set(predictionCol, value)
+
+  /** @group setParam */
+  def setFeatureIndex(value: Int): this.type = set(featureIndex, value)
+
+  /** Boundaries in increasing order for which predictions are known. */
+  def boundaries: Vector = Vectors.dense(oldModel.boundaries)
+
+  /**
+   * Predictions associated with the boundaries at the same index, monotone 
because of isotonic
+   * regression.
+   */
+  def predictions: Vector = Vectors.dense(oldModel.predictions)
 
   override def copy(extra: ParamMap): IsotonicRegressionModel = {
-    copyValues(new IsotonicRegressionModel(uid, parentModel), extra)
+    copyValues(new IsotonicRegressionModel(uid, oldModel), extra)
+  }
+
+  override def transform(dataset: DataFrame): DataFrame = {
+    val predict = dataset.schema($(featuresCol)).dataType match {
+      case DoubleType =>
+        udf { feature: Double => oldModel.predict(feature) }
+      case _: VectorUDT =>
+        val idx = $(featureIndex)
+        udf { features: Vector => oldModel.predict(features(idx)) }
+    }
+    dataset.withColumn($(predictionCol), predict(col($(featuresCol))))
+  }
+
+  override def transformSchema(schema: StructType): StructType = {
+    validateAndTransformSchema(schema, fitting = false)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ee43d355/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
index 66e4b17..c0ab00b 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/regression/IsotonicRegressionSuite.scala
@@ -19,57 +19,46 @@ package org.apache.spark.ml.regression
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.param.ParamsSuite
+import org.apache.spark.mllib.linalg.Vectors
 import org.apache.spark.mllib.util.MLlibTestSparkContext
-import org.apache.spark.sql.types.{DoubleType, StructField, StructType}
 import org.apache.spark.sql.{DataFrame, Row}
 
 class IsotonicRegressionSuite extends SparkFunSuite with MLlibTestSparkContext 
{
-  private val schema = StructType(
-    Array(
-      StructField("label", DoubleType),
-      StructField("features", DoubleType),
-      StructField("weight", DoubleType)))
-
-  private val predictionSchema = StructType(Array(StructField("features", 
DoubleType)))
-
   private def generateIsotonicInput(labels: Seq[Double]): DataFrame = {
-    val data = Seq.tabulate(labels.size)(i => Row(labels(i), i.toDouble, 1d))
-    val parallelData = sc.parallelize(data)
-
-    sqlContext.createDataFrame(parallelData, schema)
+    sqlContext.createDataFrame(
+      labels.zipWithIndex.map { case (label, i) => (label, i.toDouble, 1.0) }
+    ).toDF("label", "features", "weight")
   }
 
   private def generatePredictionInput(features: Seq[Double]): DataFrame = {
-    val data = Seq.tabulate(features.size)(i => Row(features(i)))
-
-    val parallelData = sc.parallelize(data)
-    sqlContext.createDataFrame(parallelData, predictionSchema)
+    sqlContext.createDataFrame(features.map(Tuple1.apply))
+      .toDF("features")
   }
 
   test("isotonic regression predictions") {
     val dataset = generateIsotonicInput(Seq(1, 2, 3, 1, 6, 17, 16, 17, 18))
-    val trainer = new IsotonicRegression().setIsotonicParam(true)
+    val ir = new IsotonicRegression().setIsotonic(true)
 
-    val model = trainer.fit(dataset)
+    val model = ir.fit(dataset)
 
     val predictions = model
       .transform(dataset)
-      .select("prediction").map {
-        case Row(pred) => pred
+      .select("prediction").map { case Row(pred) =>
+        pred
       }.collect()
 
     assert(predictions === Array(1, 2, 2, 2, 6, 16.5, 16.5, 17, 18))
 
-    assert(model.parentModel.boundaries === Array(0, 1, 3, 4, 5, 6, 7, 8))
-    assert(model.parentModel.predictions === Array(1, 2, 2, 6, 16.5, 16.5, 
17.0, 18.0))
-    assert(model.parentModel.isotonic)
+    assert(model.boundaries === Vectors.dense(0, 1, 3, 4, 5, 6, 7, 8))
+    assert(model.predictions === Vectors.dense(1, 2, 2, 6, 16.5, 16.5, 17.0, 
18.0))
+    assert(model.getIsotonic)
   }
 
   test("antitonic regression predictions") {
     val dataset = generateIsotonicInput(Seq(7, 5, 3, 5, 1))
-    val trainer = new IsotonicRegression().setIsotonicParam(false)
+    val ir = new IsotonicRegression().setIsotonic(false)
 
-    val model = trainer.fit(dataset)
+    val model = ir.fit(dataset)
     val features = generatePredictionInput(Seq(-2.0, -1.0, 0.5, 0.75, 1.0, 
2.0, 9.0))
 
     val predictions = model
@@ -94,9 +83,10 @@ class IsotonicRegressionSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     val ir = new IsotonicRegression()
     assert(ir.getLabelCol === "label")
     assert(ir.getFeaturesCol === "features")
-    assert(ir.getWeightCol === "weight")
     assert(ir.getPredictionCol === "prediction")
-    assert(ir.getIsotonicParam === true)
+    assert(!ir.isDefined(ir.weightCol))
+    assert(ir.getIsotonic)
+    assert(ir.getFeatureIndex === 0)
 
     val model = ir.fit(dataset)
     model.transform(dataset)
@@ -105,21 +95,22 @@ class IsotonicRegressionSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 
     assert(model.getLabelCol === "label")
     assert(model.getFeaturesCol === "features")
-    assert(model.getWeightCol === "weight")
     assert(model.getPredictionCol === "prediction")
-    assert(model.getIsotonicParam === true)
+    assert(!model.isDefined(model.weightCol))
+    assert(model.getIsotonic)
+    assert(model.getFeatureIndex === 0)
     assert(model.hasParent)
   }
 
   test("set parameters") {
     val isotonicRegression = new IsotonicRegression()
-      .setIsotonicParam(false)
-      .setWeightParam("w")
+      .setIsotonic(false)
+      .setWeightCol("w")
       .setFeaturesCol("f")
       .setLabelCol("l")
       .setPredictionCol("p")
 
-    assert(isotonicRegression.getIsotonicParam === false)
+    assert(!isotonicRegression.getIsotonic)
     assert(isotonicRegression.getWeightCol === "w")
     assert(isotonicRegression.getFeaturesCol === "f")
     assert(isotonicRegression.getLabelCol === "l")
@@ -130,7 +121,7 @@ class IsotonicRegressionSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     val dataset = generateIsotonicInput(Seq(1, 2, 3))
 
     intercept[IllegalArgumentException] {
-      new IsotonicRegression().setWeightParam("w").fit(dataset)
+      new IsotonicRegression().setWeightCol("w").fit(dataset)
     }
 
     intercept[IllegalArgumentException] {
@@ -145,4 +136,27 @@ class IsotonicRegressionSuite extends SparkFunSuite with 
MLlibTestSparkContext {
       new 
IsotonicRegression().fit(dataset).setFeaturesCol("f").transform(dataset)
     }
   }
+
+  test("vector features column with feature index") {
+    val dataset = sqlContext.createDataFrame(Seq(
+      (4.0, Vectors.dense(0.0, 1.0)),
+      (3.0, Vectors.dense(0.0, 2.0)),
+      (5.0, Vectors.sparse(2, Array(1), Array(3.0))))
+    ).toDF("label", "features")
+
+    val ir = new IsotonicRegression()
+      .setFeatureIndex(1)
+
+    val model = ir.fit(dataset)
+
+    val features = generatePredictionInput(Seq(2.0, 3.0, 4.0, 5.0))
+
+    val predictions = model
+      .transform(features)
+      .select("prediction").map {
+      case Row(pred) => pred
+    }.collect()
+
+    assert(predictions === Array(3.5, 5.0, 5.0, 5.0))
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to