Repository: spark
Updated Branches:
  refs/heads/master 8cad854ef -> dbd778d84


[SPARK-8764] [ML] string indexer should take option to handle unseen values

As a precursor to adding a public constructor add an option to handle unseen 
values by skipping rather than throwing an exception (default remains throwing 
an exception),

Author: Holden Karau <hol...@pigscanfly.ca>

Closes #7266 from 
holdenk/SPARK-8764-string-indexer-should-take-option-to-handle-unseen-values 
and squashes the following commits:

38a4de9 [Holden Karau] fix long line
045bf22 [Holden Karau] Add a second b entry so b gets 0 for sure
81dd312 [Holden Karau] Update the docs for handleInvalid param to be more 
descriptive
7f37f6e [Holden Karau] remove extra space (scala style)
414e249 [Holden Karau] And switch to using handleInvalid instead of skipInvalid
1e53f9b [Holden Karau] update the param (codegen side)
7a22215 [Holden Karau] fix typo
100a39b [Holden Karau] Merge in master
aa5b093 [Holden Karau] Since we filter we should never go down this code path 
if getSkipInvalid is true
75ffa69 [Holden Karau] Remove extra newline
d69ef5e [Holden Karau] Add a test
b5734be [Holden Karau] Add support for unseen labels
afecd4e [Holden Karau] Add a param to skip invalid entries.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dbd778d8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dbd778d8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dbd778d8

Branch: refs/heads/master
Commit: dbd778d84d094ca142bc08c351478595b280bc2a
Parents: 8cad854
Author: Holden Karau <hol...@pigscanfly.ca>
Authored: Tue Aug 11 11:33:36 2015 -0700
Committer: Joseph K. Bradley <jos...@databricks.com>
Committed: Tue Aug 11 11:33:36 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/ml/feature/StringIndexer.scala | 26 +++++++++++++---
 .../ml/param/shared/SharedParamsCodeGen.scala   |  4 +++
 .../spark/ml/param/shared/sharedParams.scala    | 15 +++++++++
 .../spark/ml/feature/StringIndexerSuite.scala   | 32 ++++++++++++++++++++
 4 files changed, 73 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dbd778d8/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
index ebfa972..e4485eb 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/StringIndexer.scala
@@ -33,7 +33,8 @@ import org.apache.spark.util.collection.OpenHashMap
 /**
  * Base trait for [[StringIndexer]] and [[StringIndexerModel]].
  */
-private[feature] trait StringIndexerBase extends Params with HasInputCol with 
HasOutputCol {
+private[feature] trait StringIndexerBase extends Params with HasInputCol with 
HasOutputCol
+    with HasHandleInvalid {
 
   /** Validates and transforms the input schema. */
   protected def validateAndTransformSchema(schema: StructType): StructType = {
@@ -66,12 +67,15 @@ class StringIndexer(override val uid: String) extends 
Estimator[StringIndexerMod
   def this() = this(Identifiable.randomUID("strIdx"))
 
   /** @group setParam */
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, value)
+  setDefault(handleInvalid, "error")
+
+  /** @group setParam */
   def setInputCol(value: String): this.type = set(inputCol, value)
 
   /** @group setParam */
   def setOutputCol(value: String): this.type = set(outputCol, value)
 
-  // TODO: handle unseen labels
 
   override def fit(dataset: DataFrame): StringIndexerModel = {
     val counts = dataset.select(col($(inputCol)).cast(StringType))
@@ -112,6 +116,10 @@ class StringIndexerModel private[ml] (
   }
 
   /** @group setParam */
+  def setHandleInvalid(value: String): this.type = set(handleInvalid, value)
+  setDefault(handleInvalid, "error")
+
+  /** @group setParam */
   def setInputCol(value: String): this.type = set(inputCol, value)
 
   /** @group setParam */
@@ -128,14 +136,24 @@ class StringIndexerModel private[ml] (
       if (labelToIndex.contains(label)) {
         labelToIndex(label)
       } else {
-        // TODO: handle unseen labels
         throw new SparkException(s"Unseen label: $label.")
       }
     }
+
     val outputColName = $(outputCol)
     val metadata = NominalAttribute.defaultAttr
       .withName(outputColName).withValues(labels).toMetadata()
-    dataset.select(col("*"),
+    // If we are skipping invalid records, filter them out.
+    val filteredDataset = (getHandleInvalid) match {
+      case "skip" => {
+        val filterer = udf { label: String =>
+          labelToIndex.contains(label)
+        }
+        dataset.where(filterer(dataset($(inputCol))))
+      }
+      case _ => dataset
+    }
+    filteredDataset.select(col("*"),
       indexer(dataset($(inputCol)).cast(StringType)).as(outputColName, 
metadata))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dbd778d8/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
index a97c805..da4c076 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/param/shared/SharedParamsCodeGen.scala
@@ -59,6 +59,10 @@ private[shared] object SharedParamsCodeGen {
       ParamDesc[Int]("checkpointInterval", "checkpoint interval (>= 1)",
         isValid = "ParamValidators.gtEq(1)"),
       ParamDesc[Boolean]("fitIntercept", "whether to fit an intercept term", 
Some("true")),
+      ParamDesc[String]("handleInvalid", "how to handle invalid entries. 
Options are skip (which " +
+        "will filter out rows with bad values), or error (which will throw an 
errror). More " +
+        "options may be added later.",
+        isValid = "ParamValidators.inArray(Array(\"skip\", \"error\"))"),
       ParamDesc[Boolean]("standardization", "whether to standardize the 
training features" +
         " before fitting the model.", Some("true")),
       ParamDesc[Long]("seed", "random seed", 
Some("this.getClass.getName.hashCode.toLong")),

http://git-wip-us.apache.org/repos/asf/spark/blob/dbd778d8/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala 
b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
index f332630..23e2b6c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/param/shared/sharedParams.scala
@@ -248,6 +248,21 @@ private[ml] trait HasFitIntercept extends Params {
 }
 
 /**
+ * Trait for shared param handleInvalid.
+ */
+private[ml] trait HasHandleInvalid extends Params {
+
+  /**
+   * Param for how to handle invalid entries. Options are skip (which will 
filter out rows with bad values), or error (which will throw an errror). More 
options may be added later..
+   * @group param
+   */
+  final val handleInvalid: Param[String] = new Param[String](this, 
"handleInvalid", "how to handle invalid entries. Options are skip (which will 
filter out rows with bad values), or error (which will throw an errror). More 
options may be added later.", ParamValidators.inArray(Array("skip", "error")))
+
+  /** @group getParam */
+  final def getHandleInvalid: String = $(handleInvalid)
+}
+
+/**
  * Trait for shared param standardization (default: true).
  */
 private[ml] trait HasStandardization extends Params {

http://git-wip-us.apache.org/repos/asf/spark/blob/dbd778d8/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
index d0295a0..b111036 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.ml.feature
 
+import org.apache.spark.SparkException
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.ml.attribute.{Attribute, NominalAttribute}
 import org.apache.spark.ml.param.ParamsSuite
@@ -62,6 +63,37 @@ class StringIndexerSuite extends SparkFunSuite with 
MLlibTestSparkContext {
       reversed2.collect().map(r => (r.getInt(0), r.getString(1))).toSet)
   }
 
+  test("StringIndexerUnseen") {
+    val data = sc.parallelize(Seq((0, "a"), (1, "b"), (4, "b")), 2)
+    val data2 = sc.parallelize(Seq((0, "a"), (1, "b"), (2, "c")), 2)
+    val df = sqlContext.createDataFrame(data).toDF("id", "label")
+    val df2 = sqlContext.createDataFrame(data2).toDF("id", "label")
+    val indexer = new StringIndexer()
+      .setInputCol("label")
+      .setOutputCol("labelIndex")
+      .fit(df)
+    // Verify we throw by default with unseen values
+    intercept[SparkException] {
+      indexer.transform(df2).collect()
+    }
+    val indexerSkipInvalid = new StringIndexer()
+      .setInputCol("label")
+      .setOutputCol("labelIndex")
+      .setHandleInvalid("skip")
+      .fit(df)
+    // Verify that we skip the c record
+    val transformed = indexerSkipInvalid.transform(df2)
+    val attr = Attribute.fromStructField(transformed.schema("labelIndex"))
+      .asInstanceOf[NominalAttribute]
+    assert(attr.values.get === Array("b", "a"))
+    val output = transformed.select("id", "labelIndex").map { r =>
+      (r.getInt(0), r.getDouble(1))
+    }.collect().toSet
+    // a -> 1, b -> 0
+    val expected = Set((0, 1.0), (1, 0.0))
+    assert(output === expected)
+  }
+
   test("StringIndexer with a numeric input column") {
     val data = sc.parallelize(Seq((0, 100), (1, 200), (2, 300), (3, 100), (4, 
100), (5, 300)), 2)
     val df = sqlContext.createDataFrame(data).toDF("id", "label")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to