Repository: spark
Updated Branches:
  refs/heads/master b366f1849 -> 57dc326bd


[SPARK-17219][ML] Add NaN value handling in Bucketizer

## What changes were proposed in this pull request?
This PR fixes an issue when Bucketizer is called to handle a dataset containing 
NaN value.
Sometimes, null value might also be useful to users, so in these cases, 
Bucketizer should
reserve one extra bucket for NaN values, instead of throwing an illegal 
exception.
Before:
```
Bucketizer.transform on NaN value threw an illegal exception.
```
After:
```
NaN values will be grouped in an extra bucket.
```
## How was this patch tested?
New test cases added in `BucketizerSuite`.
Signed-off-by: VinceShieh <vincent.xieintel.com>

Author: VinceShieh <vincent....@intel.com>

Closes #14858 from VinceShieh/spark-17219.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/57dc326b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/57dc326b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/57dc326b

Branch: refs/heads/master
Commit: 57dc326bd00cf0a49da971e9c573c48ae28acaa2
Parents: b366f18
Author: VinceShieh <vincent....@intel.com>
Authored: Wed Sep 21 10:20:57 2016 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Sep 21 10:20:57 2016 +0100

----------------------------------------------------------------------
 docs/ml-features.md                             |  6 +++-
 .../apache/spark/ml/feature/Bucketizer.scala    | 13 +++++---
 .../spark/ml/feature/QuantileDiscretizer.scala  |  9 ++++--
 .../spark/ml/feature/BucketizerSuite.scala      | 31 ++++++++++++++++++++
 .../ml/feature/QuantileDiscretizerSuite.scala   | 29 +++++++++++++++---
 python/pyspark/ml/feature.py                    |  5 ++++
 .../spark/sql/DataFrameStatFunctions.scala      |  4 ++-
 7 files changed, 85 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/57dc326b/docs/ml-features.md
----------------------------------------------------------------------
diff --git a/docs/ml-features.md b/docs/ml-features.md
index 746593f..a39b31c 100644
--- a/docs/ml-features.md
+++ b/docs/ml-features.md
@@ -1102,7 +1102,11 @@ for more details on the API.
 ## QuantileDiscretizer
 
 `QuantileDiscretizer` takes a column with continuous features and outputs a 
column with binned
-categorical features. The number of bins is set by the `numBuckets` parameter.
+categorical features. The number of bins is set by the `numBuckets` parameter. 
It is possible
+that the number of buckets used will be less than this value, for example, if 
there are too few
+distinct values of the input to create enough distinct quantiles. Note also 
that NaN values are
+handled specially and placed into their own bucket. For example, if 4 buckets 
are used, then
+non-NaN data will be put into buckets[0-3], but NaNs will be counted in a 
special bucket[4].
 The bin ranges are chosen using an approximate algorithm (see the 
documentation for
 
[approxQuantile](api/scala/index.html#org.apache.spark.sql.DataFrameStatFunctions)
 for a
 detailed description). The precision of the approximation can be controlled 
with the

http://git-wip-us.apache.org/repos/asf/spark/blob/57dc326b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala
index 100d9e7..ec0ea05 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala
@@ -106,7 +106,10 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") 
override val uid: String
 @Since("1.6.0")
 object Bucketizer extends DefaultParamsReadable[Bucketizer] {
 
-  /** We require splits to be of length >= 3 and to be in strictly increasing 
order. */
+  /**
+   * We require splits to be of length >= 3 and to be in strictly increasing 
order.
+   * No NaN split should be accepted.
+   */
   private[feature] def checkSplits(splits: Array[Double]): Boolean = {
     if (splits.length < 3) {
       false
@@ -114,10 +117,10 @@ object Bucketizer extends 
DefaultParamsReadable[Bucketizer] {
       var i = 0
       val n = splits.length - 1
       while (i < n) {
-        if (splits(i) >= splits(i + 1)) return false
+        if (splits(i) >= splits(i + 1) || splits(i).isNaN) return false
         i += 1
       }
-      true
+      !splits(n).isNaN
     }
   }
 
@@ -126,7 +129,9 @@ object Bucketizer extends DefaultParamsReadable[Bucketizer] 
{
    * @throws SparkException if a feature is < splits.head or > splits.last
    */
   private[feature] def binarySearchForBuckets(splits: Array[Double], feature: 
Double): Double = {
-    if (feature == splits.last) {
+    if (feature.isNaN) {
+      splits.length - 1
+    } else if (feature == splits.last) {
       splits.length - 2
     } else {
       val idx = ju.Arrays.binarySearch(splits, feature)

http://git-wip-us.apache.org/repos/asf/spark/blob/57dc326b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
index e098008..1e59d71 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/QuantileDiscretizer.scala
@@ -39,7 +39,7 @@ private[feature] trait QuantileDiscretizerBase extends Params
    * default: 2
    * @group param
    */
-  val numBuckets = new IntParam(this, "numBuckets", "Maximum number of buckets 
(quantiles, or " +
+  val numBuckets = new IntParam(this, "numBuckets", "Number of buckets 
(quantiles, or " +
     "categories) into which data points are grouped. Must be >= 2.",
     ParamValidators.gtEq(2))
   setDefault(numBuckets -> 2)
@@ -65,7 +65,12 @@ private[feature] trait QuantileDiscretizerBase extends Params
 
 /**
  * `QuantileDiscretizer` takes a column with continuous features and outputs a 
column with binned
- * categorical features. The number of bins can be set using the `numBuckets` 
parameter.
+ * categorical features. The number of bins can be set using the `numBuckets` 
parameter. It is
+ * possible that the number of buckets used will be less than this value, for 
example, if there
+ * are too few distinct values of the input to create enough distinct 
quantiles. Note also that
+ * NaN values are handled specially and placed into their own bucket. For 
example, if 4 buckets
+ * are used, then non-NaN data will be put into buckets(0-3), but NaNs will be 
counted in a special
+ * bucket(4).
  * The bin ranges are chosen using an approximate algorithm (see the 
documentation for
  * [[org.apache.spark.sql.DataFrameStatFunctions.approxQuantile 
approxQuantile]]
  * for a detailed description). The precision of the approximation can be 
controlled with the

http://git-wip-us.apache.org/repos/asf/spark/blob/57dc326b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala
index cd10c78..c7f5093 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/BucketizerSuite.scala
@@ -88,6 +88,37 @@ class BucketizerSuite extends SparkFunSuite with 
MLlibTestSparkContext with Defa
     }
   }
 
+  test("Bucket continuous features, with NaN data but non-NaN splits") {
+    val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, 
Double.PositiveInfinity)
+    val validData = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, Double.NaN, 
Double.NaN, Double.NaN)
+    val expectedBuckets = Array(0.0, 1.0, 1.0, 2.0, 2.0, 3.0, 3.0, 4.0, 4.0, 
4.0)
+    val dataFrame: DataFrame =
+      spark.createDataFrame(validData.zip(expectedBuckets)).toDF("feature", 
"expected")
+
+    val bucketizer: Bucketizer = new Bucketizer()
+      .setInputCol("feature")
+      .setOutputCol("result")
+      .setSplits(splits)
+
+    bucketizer.transform(dataFrame).select("result", 
"expected").collect().foreach {
+      case Row(x: Double, y: Double) =>
+        assert(x === y,
+          s"The feature value is not correct after bucketing.  Expected $y but 
found $x")
+    }
+  }
+
+  test("Bucket continuous features, with NaN splits") {
+    val splits = Array(Double.NegativeInfinity, -0.5, 0.0, 0.5, 
Double.PositiveInfinity, Double.NaN)
+    withClue("Invalid NaN split was not caught as an invalid split!") {
+      intercept[IllegalArgumentException] {
+        val bucketizer: Bucketizer = new Bucketizer()
+          .setInputCol("feature")
+          .setOutputCol("result")
+          .setSplits(splits)
+      }
+    }
+  }
+
   test("Binary search correctness on hand-picked examples") {
     import BucketizerSuite.checkBinarySearch
     // length 3, with -inf

http://git-wip-us.apache.org/repos/asf/spark/blob/57dc326b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
 
b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
index 18f1e89..6822594 100644
--- 
a/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
+++ 
b/mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala
@@ -52,12 +52,12 @@ class QuantileDiscretizerSuite
       "Bucket sizes are not within expected relative error tolerance.")
   }
 
-  test("Test Bucketizer on duplicated splits") {
+  test("Test on data with high proportion of duplicated values") {
     val spark = this.spark
     import spark.implicits._
 
-    val datasetSize = 12
     val numBuckets = 5
+    val expectedNumBuckets = 3
     val df = sc.parallelize(Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 
2.0, 1.0, 3.0))
       .map(Tuple1.apply).toDF("input")
     val discretizer = new QuantileDiscretizer()
@@ -65,10 +65,31 @@ class QuantileDiscretizerSuite
       .setOutputCol("result")
       .setNumBuckets(numBuckets)
     val result = discretizer.fit(df).transform(df)
+    val observedNumBuckets = result.select("result").distinct.count
+    assert(observedNumBuckets == expectedNumBuckets,
+      s"Observed number of buckets are not correct." +
+        s" Expected $expectedNumBuckets but found $observedNumBuckets")
+  }
+
+  test("Test transform on data with NaN value") {
+    val spark = this.spark
+    import spark.implicits._
+
+    val numBuckets = 3
+    val df = sc.parallelize(Array(1.0, 1.0, 1.0, Double.NaN))
+      .map(Tuple1.apply).toDF("input")
+    val discretizer = new QuantileDiscretizer()
+      .setInputCol("input")
+      .setOutputCol("result")
+      .setNumBuckets(numBuckets)
 
+    // Reserve extra one bucket for NaN
+    val expectedNumBuckets = discretizer.fit(df).getSplits.length - 1
+    val result = discretizer.fit(df).transform(df)
     val observedNumBuckets = result.select("result").distinct.count
-    assert(2 <= observedNumBuckets && observedNumBuckets <= numBuckets,
-      "Observed number of buckets are not within expected range.")
+    assert(observedNumBuckets == expectedNumBuckets,
+      s"Observed number of buckets are not correct." +
+        s" Expected $expectedNumBuckets but found $observedNumBuckets")
   }
 
   test("Test transform method on unseen data") {

http://git-wip-us.apache.org/repos/asf/spark/blob/57dc326b/python/pyspark/ml/feature.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 2881380..c45434f 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -1155,6 +1155,11 @@ class QuantileDiscretizer(JavaEstimator, HasInputCol, 
HasOutputCol, JavaMLReadab
 
     `QuantileDiscretizer` takes a column with continuous features and outputs 
a column with binned
     categorical features. The number of bins can be set using the 
:py:attr:`numBuckets` parameter.
+    It is possible that the number of buckets used will be less than this 
value, for example, if
+    there are too few distinct values of the input to create enough distinct 
quantiles. Note also
+    that NaN values are handled specially and placed into their own bucket. 
For example, if 4
+    buckets are used, then non-NaN data will be put into buckets(0-3), but 
NaNs will be counted in
+    a special bucket(4).
     The bin ranges are chosen using an approximate algorithm (see the 
documentation for
     :py:meth:`~.DataFrameStatFunctions.approxQuantile` for a detailed 
description).
     The precision of the approximation can be controlled with the

http://git-wip-us.apache.org/repos/asf/spark/blob/57dc326b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
index 1855eab..d69be36 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameStatFunctions.scala
@@ -52,6 +52,7 @@ final class DataFrameStatFunctions private[sql](df: 
DataFrame) {
    * The algorithm was first present in 
[[http://dx.doi.org/10.1145/375663.375670 Space-efficient
    * Online Computation of Quantile Summaries]] by Greenwald and Khanna.
    *
+   * Note that NaN values will be removed from the numerical column before 
calculation
    * @param col the name of the numerical column
    * @param probabilities a list of quantile probabilities
    *   Each number must belong to [0, 1].
@@ -67,7 +68,8 @@ final class DataFrameStatFunctions private[sql](df: 
DataFrame) {
       col: String,
       probabilities: Array[Double],
       relativeError: Double): Array[Double] = {
-    StatFunctions.multipleApproxQuantiles(df, Seq(col), probabilities, 
relativeError).head.toArray
+    StatFunctions.multipleApproxQuantiles(df.select(col).na.drop(),
+      Seq(col), probabilities, relativeError).head.toArray
   }
 
   /**


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to