Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19715#discussion_r150404292
  
    --- Diff: 
mllib/src/test/scala/org/apache/spark/ml/feature/QuantileDiscretizerSuite.scala 
---
    @@ -146,4 +146,172 @@ class QuantileDiscretizerSuite
         val model = discretizer.fit(df)
         assert(model.hasParent)
       }
    +
    +  test("Multiple Columns: Test observed number of buckets and their sizes 
match expected values") {
    +    val spark = this.spark
    +    import spark.implicits._
    +
    +    val datasetSize = 100000
    +    val numBuckets = 5
    +    val data1 = Array.range(1, 100001, 1).map(_.toDouble)
    +    val data2 = Array.range(1, 200000, 2).map(_.toDouble)
    +    val data = (0 until 100000).map { idx =>
    +      (data1(idx), data2(idx))
    +    }
    +    val df: DataFrame = data.toSeq.toDF("input1", "input2")
    +
    +    val discretizer = new QuantileDiscretizer()
    +      .setInputCols(Array("input1", "input2"))
    +      .setOutputCols(Array("result1", "result2"))
    +      .setNumBuckets(numBuckets)
    +    assert(discretizer.isQuantileDiscretizeMultipleColumns())
    +    val result = discretizer.fit(df).transform(df)
    +
    +    val relativeError = discretizer.getRelativeError
    +    val isGoodBucket = udf {
    +      (size: Int) => math.abs( size - (datasetSize / numBuckets)) <= 
(relativeError * datasetSize)
    +    }
    +
    +    for (i <- 1 to 2) {
    +      val observedNumBuckets = result.select("result" + i).distinct.count
    +      assert(observedNumBuckets === numBuckets,
    +        "Observed number of buckets does not equal expected number of 
buckets.")
    +
    +      val numGoodBuckets = result.groupBy("result" + 
i).count.filter(isGoodBucket($"count")).count
    +      assert(numGoodBuckets === numBuckets,
    +        "Bucket sizes are not within expected relative error tolerance.")
    +    }
    +  }
    +
    +  test("Multiple Columns: Test on data with high proportion of duplicated 
values") {
    +    val spark = this.spark
    +    import spark.implicits._
    +
    +    val numBuckets = 5
    +    val expectedNumBucket = 3
    +    val data1 = Array(1.0, 3.0, 2.0, 1.0, 1.0, 2.0, 3.0, 2.0, 2.0, 2.0, 
1.0, 3.0)
    +    val data2 = Array(1.0, 2.0, 3.0, 1.0, 1.0, 1.0, 1.0, 3.0, 2.0, 3.0, 
1.0, 2.0)
    +    val data = (0 until data1.length).map { idx =>
    +      (data1(idx), data2(idx))
    +    }
    +    val df: DataFrame = data.toSeq.toDF("input1", "input2")
    +    val discretizer = new QuantileDiscretizer()
    +      .setInputCols(Array("input1", "input2"))
    +      .setOutputCols(Array("result1", "result2"))
    +      .setNumBuckets(numBuckets)
    +    assert(discretizer.isQuantileDiscretizeMultipleColumns())
    +    val result = discretizer.fit(df).transform(df)
    +    for (i <- 1 to 2) {
    +      val observedNumBuckets = result.select("result" + i).distinct.count
    +      assert(observedNumBuckets == expectedNumBucket,
    +        s"Observed number of buckets are not correct." +
    +          s" Expected $expectedNumBucket but found ($observedNumBuckets")
    +    }
    +  }
    +
    +  test("Multiple Columns: Test transform on data with NaN value") {
    +    val spark = this.spark
    +    import spark.implicits._
    +
    +    val numBuckets = 3
    +    val validData1 = Array(-0.9, -0.5, -0.3, 0.0, 0.2, 0.5, 0.9, 
Double.NaN, Double.NaN, Double.NaN)
    +    val expectedKeep1 = Array(0.0, 0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 3.0, 
3.0)
    +    val validData2 = Array(0.2, -0.1, 0.3, 0.0, 0.1, 0.3, 0.5, 0.8, 
Double.NaN, Double.NaN)
    +    val expectedKeep2 = Array(1.0, 0.0, 2.0, 0.0, 1.0, 2.0, 2.0, 2.0, 3.0, 
3.0)
    +
    +    val data = (0 until validData1.length).map { idx =>
    +      (validData1(idx), validData2(idx), expectedKeep1(idx), 
expectedKeep2(idx))
    +    }
    +    val dataFrame: DataFrame = data.toSeq.toDF("input1", "input2", 
"expected1", "expected2")
    +
    +    val discretizer = new QuantileDiscretizer()
    +      .setInputCols(Array("input1", "input2"))
    +      .setOutputCols(Array("result1", "result2"))
    +      .setNumBuckets(numBuckets)
    +    assert(discretizer.isQuantileDiscretizeMultipleColumns())
    +
    +    withClue("QuantileDiscretizer with handleInvalid=error should throw 
exception for NaN values") {
    +      intercept[SparkException] {
    +        discretizer.fit(dataFrame).transform(dataFrame).collect()
    +      }
    +    }
    +
    +    discretizer.setHandleInvalid("keep")
    +    discretizer.fit(dataFrame).transform(dataFrame).
    +      select("result1", "expected1", "result2", "expected2")
    +      .collect().foreach {
    +      case Row(r1: Double, e1: Double, r2: Double, e2: Double) =>
    +        assert(r1 === e1,
    +          s"The result value is not correct after bucketing. Expected $e1 
but found $r1")
    +        assert(r2 === e2,
    +          s"The result value is not correct after bucketing. Expected $e2 
but found $r2")
    +    }
    +
    +    discretizer.setHandleInvalid("skip")
    +    val result = discretizer.fit(dataFrame).transform(dataFrame)
    +    for (i <- 1 to 2) {
    +      val skipResults1: Array[Double] = result.select("result" + 
i).as[Double].collect()
    +      assert(skipResults1.length === 7)
    +      assert(skipResults1.forall(_ !== 4.0))
    +    }
    +  }
    +
    +  test("Multiple Columns: Test numBucketsArray") {
    +    val spark = this.spark
    +    import spark.implicits._
    +
    +    val datasetSize = 20
    +    val numBucketsArray: Array[Int] = Array(2, 5, 10)
    +    val data1 = Array.range(1, 21, 1).map(_.toDouble)
    +    val expected1 = Array (0.0, 1.0, 1.0, 2.0, 2.0, 2.0, 3.0, 4.0, 4.0, 
5.0,
    +      5.0, 5.0, 6.0, 6.0, 7.0, 8.0, 8.0, 9.0, 9.0, 9.0)
    --- End diff --
    
    Is this correct? I tried to apply the same data on current 
`QuantileDiscretizer`:
    
    ```scala
    val data1 = Array.range(1, 21, 1).map(_.toDouble)
    val df = data1.toSeq.toDF
    val discretizer = new 
QuantileDiscretizer().setInputCol("value").setOutputCol("result").setNumBuckets(2)
    discretizer.fit(df).transform(df).show
    ```
    ```
    +-----+------+
    |value|result|
    +-----+------+
    |  1.0|   0.0|
    |  2.0|   0.0|
    |  3.0|   0.0|
    |  4.0|   0.0|
    |  5.0|   0.0|
    |  6.0|   0.0|
    |  7.0|   0.0|
    |  8.0|   0.0|
    |  9.0|   0.0|
    | 10.0|   1.0|
    | 11.0|   1.0|
    | 12.0|   1.0|
    | 13.0|   1.0|
    | 14.0|   1.0|
    | 15.0|   1.0|
    | 16.0|   1.0|
    | 17.0|   1.0|
    | 18.0|   1.0|
    | 19.0|   1.0|
    | 20.0|   1.0|
    +-----+------+
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to