Repository: spark
Updated Branches:
  refs/heads/master e17965891 -> adb222b95


[SPARK-23751][ML][PYSPARK] Kolmogorov-Smirnoff test Python API in pyspark.ml

## What changes were proposed in this pull request?

Kolmogorov-Smirnoff test Python API in `pyspark.ml`

**Note**  API with `CDF` is a little difficult to support in python. We can add 
it in following PR.

## How was this patch tested?

doctest

Author: WeichenXu <weichen...@databricks.com>

Closes #20904 from WeichenXu123/ks-test-py.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/adb222b9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/adb222b9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/adb222b9

Branch: refs/heads/master
Commit: adb222b957f327a69929b8f16fa5ebc071fa99e3
Parents: e179658
Author: WeichenXu <weichen...@databricks.com>
Authored: Tue Apr 10 11:18:14 2018 -0700
Committer: Joseph K. Bradley <jos...@databricks.com>
Committed: Tue Apr 10 11:18:14 2018 -0700

----------------------------------------------------------------------
 .../spark/ml/stat/KolmogorovSmirnovTest.scala   |  29 +--
 python/pyspark/ml/stat.py                       | 181 +++++++++++++------
 2 files changed, 138 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/adb222b9/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala
----------------------------------------------------------------------
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala 
b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala
index c62d746..af8ff64 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/stat/KolmogorovSmirnovTest.scala
@@ -24,7 +24,7 @@ import org.apache.spark.api.java.function.Function
 import org.apache.spark.ml.util.SchemaUtils
 import org.apache.spark.mllib.stat.{Statistics => OldStatistics}
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{DataFrame, Dataset, Row}
 import org.apache.spark.sql.functions.col
 
 /**
@@ -59,7 +59,7 @@ object KolmogorovSmirnovTest {
    * distribution of the sample data and the theoretical distribution we can 
provide a test for the
    * the null hypothesis that the sample data comes from that theoretical 
distribution.
    *
-   * @param dataset a `DataFrame` containing the sample of data to test
+   * @param dataset A `Dataset` or a `DataFrame` containing the sample of data 
to test
    * @param sampleCol Name of sample column in dataset, of any numerical type
    * @param cdf a `Double => Double` function to calculate the theoretical CDF 
at a given value
    * @return DataFrame containing the test result for the input sampled data.
@@ -68,10 +68,10 @@ object KolmogorovSmirnovTest {
    *          - `statistic: Double`
    */
   @Since("2.4.0")
-  def test(dataset: DataFrame, sampleCol: String, cdf: Double => Double): 
DataFrame = {
+  def test(dataset: Dataset[_], sampleCol: String, cdf: Double => Double): 
DataFrame = {
     val spark = dataset.sparkSession
 
-    val rdd = getSampleRDD(dataset, sampleCol)
+    val rdd = getSampleRDD(dataset.toDF(), sampleCol)
     val testResult = OldStatistics.kolmogorovSmirnovTest(rdd, cdf)
     spark.createDataFrame(Seq(KolmogorovSmirnovTestResult(
       testResult.pValue, testResult.statistic)))
@@ -81,10 +81,11 @@ object KolmogorovSmirnovTest {
    * Java-friendly version of `test(dataset: DataFrame, sampleCol: String, 
cdf: Double => Double)`
    */
   @Since("2.4.0")
-  def test(dataset: DataFrame, sampleCol: String,
-    cdf: Function[java.lang.Double, java.lang.Double]): DataFrame = {
-    val f: Double => Double = x => cdf.call(x)
-    test(dataset, sampleCol, f)
+  def test(
+      dataset: Dataset[_],
+      sampleCol: String,
+      cdf: Function[java.lang.Double, java.lang.Double]): DataFrame = {
+    test(dataset, sampleCol, (x: Double) => cdf.call(x))
   }
 
   /**
@@ -92,10 +93,11 @@ object KolmogorovSmirnovTest {
    * distribution equality. Currently supports the normal distribution, taking 
as parameters
    * the mean and standard deviation.
    *
-   * @param dataset a `DataFrame` containing the sample of data to test
+   * @param dataset A `Dataset` or a `DataFrame` containing the sample of data 
to test
    * @param sampleCol Name of sample column in dataset, of any numerical type
    * @param distName a `String` name for a theoretical distribution, currently 
only support "norm".
-   * @param params `Double*` specifying the parameters to be used for the 
theoretical distribution
+   * @param params `Double*` specifying the parameters to be used for the 
theoretical distribution.
+    *              For "norm" distribution, the parameters includes mean and 
variance.
    * @return DataFrame containing the test result for the input sampled data.
    *         This DataFrame will contain a single Row with the following 
fields:
    *          - `pValue: Double`
@@ -103,10 +105,13 @@ object KolmogorovSmirnovTest {
    */
   @Since("2.4.0")
   @varargs
-  def test(dataset: DataFrame, sampleCol: String, distName: String, params: 
Double*): DataFrame = {
+  def test(
+      dataset: Dataset[_],
+      sampleCol: String, distName: String,
+      params: Double*): DataFrame = {
     val spark = dataset.sparkSession
 
-    val rdd = getSampleRDD(dataset, sampleCol)
+    val rdd = getSampleRDD(dataset.toDF(), sampleCol)
     val testResult = OldStatistics.kolmogorovSmirnovTest(rdd, distName, 
params: _*)
     spark.createDataFrame(Seq(KolmogorovSmirnovTestResult(
       testResult.pValue, testResult.statistic)))

http://git-wip-us.apache.org/repos/asf/spark/blob/adb222b9/python/pyspark/ml/stat.py
----------------------------------------------------------------------
diff --git a/python/pyspark/ml/stat.py b/python/pyspark/ml/stat.py
index 0eeb5e5..93d0f4f 100644
--- a/python/pyspark/ml/stat.py
+++ b/python/pyspark/ml/stat.py
@@ -32,32 +32,6 @@ class ChiSquareTest(object):
 
     The null hypothesis is that the occurrence of the outcomes is 
statistically independent.
 
-    :param dataset:
-      DataFrame of categorical labels and categorical features.
-      Real-valued features will be treated as categorical for each distinct 
value.
-    :param featuresCol:
-      Name of features column in dataset, of type `Vector` (`VectorUDT`).
-    :param labelCol:
-      Name of label column in dataset, of any numerical type.
-    :return:
-      DataFrame containing the test result for every feature against the label.
-      This DataFrame will contain a single Row with the following fields:
-      - `pValues: Vector`
-      - `degreesOfFreedom: Array[Int]`
-      - `statistics: Vector`
-      Each of these fields has one value per feature.
-
-    >>> from pyspark.ml.linalg import Vectors
-    >>> from pyspark.ml.stat import ChiSquareTest
-    >>> dataset = [[0, Vectors.dense([0, 0, 1])],
-    ...            [0, Vectors.dense([1, 0, 1])],
-    ...            [1, Vectors.dense([2, 1, 1])],
-    ...            [1, Vectors.dense([3, 1, 1])]]
-    >>> dataset = spark.createDataFrame(dataset, ["label", "features"])
-    >>> chiSqResult = ChiSquareTest.test(dataset, 'features', 'label')
-    >>> chiSqResult.select("degreesOfFreedom").collect()[0]
-    Row(degreesOfFreedom=[3, 1, 0])
-
     .. versionadded:: 2.2.0
 
     """
@@ -66,6 +40,32 @@ class ChiSquareTest(object):
     def test(dataset, featuresCol, labelCol):
         """
         Perform a Pearson's independence test using dataset.
+
+        :param dataset:
+          DataFrame of categorical labels and categorical features.
+          Real-valued features will be treated as categorical for each 
distinct value.
+        :param featuresCol:
+          Name of features column in dataset, of type `Vector` (`VectorUDT`).
+        :param labelCol:
+          Name of label column in dataset, of any numerical type.
+        :return:
+          DataFrame containing the test result for every feature against the 
label.
+          This DataFrame will contain a single Row with the following fields:
+          - `pValues: Vector`
+          - `degreesOfFreedom: Array[Int]`
+          - `statistics: Vector`
+          Each of these fields has one value per feature.
+
+        >>> from pyspark.ml.linalg import Vectors
+        >>> from pyspark.ml.stat import ChiSquareTest
+        >>> dataset = [[0, Vectors.dense([0, 0, 1])],
+        ...            [0, Vectors.dense([1, 0, 1])],
+        ...            [1, Vectors.dense([2, 1, 1])],
+        ...            [1, Vectors.dense([3, 1, 1])]]
+        >>> dataset = spark.createDataFrame(dataset, ["label", "features"])
+        >>> chiSqResult = ChiSquareTest.test(dataset, 'features', 'label')
+        >>> chiSqResult.select("degreesOfFreedom").collect()[0]
+        Row(degreesOfFreedom=[3, 1, 0])
         """
         sc = SparkContext._active_spark_context
         javaTestObj = _jvm().org.apache.spark.ml.stat.ChiSquareTest
@@ -85,40 +85,6 @@ class Correlation(object):
       which is fairly costly. Cache the input Dataset before calling corr with 
`method = 'spearman'`
       to avoid recomputing the common lineage.
 
-    :param dataset:
-      A dataset or a dataframe.
-    :param column:
-      The name of the column of vectors for which the correlation coefficient 
needs
-      to be computed. This must be a column of the dataset, and it must contain
-      Vector objects.
-    :param method:
-      String specifying the method to use for computing correlation.
-      Supported: `pearson` (default), `spearman`.
-    :return:
-      A dataframe that contains the correlation matrix of the column of 
vectors. This
-      dataframe contains a single row and a single column of name
-      '$METHODNAME($COLUMN)'.
-
-    >>> from pyspark.ml.linalg import Vectors
-    >>> from pyspark.ml.stat import Correlation
-    >>> dataset = [[Vectors.dense([1, 0, 0, -2])],
-    ...            [Vectors.dense([4, 5, 0, 3])],
-    ...            [Vectors.dense([6, 7, 0, 8])],
-    ...            [Vectors.dense([9, 0, 0, 1])]]
-    >>> dataset = spark.createDataFrame(dataset, ['features'])
-    >>> pearsonCorr = Correlation.corr(dataset, 'features', 
'pearson').collect()[0][0]
-    >>> print(str(pearsonCorr).replace('nan', 'NaN'))
-    DenseMatrix([[ 1.        ,  0.0556...,         NaN,  0.4004...],
-                 [ 0.0556...,  1.        ,         NaN,  0.9135...],
-                 [        NaN,         NaN,  1.        ,         NaN],
-                 [ 0.4004...,  0.9135...,         NaN,  1.        ]])
-    >>> spearmanCorr = Correlation.corr(dataset, 'features', 
method='spearman').collect()[0][0]
-    >>> print(str(spearmanCorr).replace('nan', 'NaN'))
-    DenseMatrix([[ 1.        ,  0.1054...,         NaN,  0.4       ],
-                 [ 0.1054...,  1.        ,         NaN,  0.9486... ],
-                 [        NaN,         NaN,  1.        ,         NaN],
-                 [ 0.4       ,  0.9486... ,         NaN,  1.        ]])
-
     .. versionadded:: 2.2.0
 
     """
@@ -127,6 +93,40 @@ class Correlation(object):
     def corr(dataset, column, method="pearson"):
         """
         Compute the correlation matrix with specified method using dataset.
+
+        :param dataset:
+          A Dataset or a DataFrame.
+        :param column:
+          The name of the column of vectors for which the correlation 
coefficient needs
+          to be computed. This must be a column of the dataset, and it must 
contain
+          Vector objects.
+        :param method:
+          String specifying the method to use for computing correlation.
+          Supported: `pearson` (default), `spearman`.
+        :return:
+          A DataFrame that contains the correlation matrix of the column of 
vectors. This
+          DataFrame contains a single row and a single column of name
+          '$METHODNAME($COLUMN)'.
+
+        >>> from pyspark.ml.linalg import Vectors
+        >>> from pyspark.ml.stat import Correlation
+        >>> dataset = [[Vectors.dense([1, 0, 0, -2])],
+        ...            [Vectors.dense([4, 5, 0, 3])],
+        ...            [Vectors.dense([6, 7, 0, 8])],
+        ...            [Vectors.dense([9, 0, 0, 1])]]
+        >>> dataset = spark.createDataFrame(dataset, ['features'])
+        >>> pearsonCorr = Correlation.corr(dataset, 'features', 
'pearson').collect()[0][0]
+        >>> print(str(pearsonCorr).replace('nan', 'NaN'))
+        DenseMatrix([[ 1.        ,  0.0556...,         NaN,  0.4004...],
+                     [ 0.0556...,  1.        ,         NaN,  0.9135...],
+                     [        NaN,         NaN,  1.        ,         NaN],
+                     [ 0.4004...,  0.9135...,         NaN,  1.        ]])
+        >>> spearmanCorr = Correlation.corr(dataset, 'features', 
method='spearman').collect()[0][0]
+        >>> print(str(spearmanCorr).replace('nan', 'NaN'))
+        DenseMatrix([[ 1.        ,  0.1054...,         NaN,  0.4       ],
+                     [ 0.1054...,  1.        ,         NaN,  0.9486... ],
+                     [        NaN,         NaN,  1.        ,         NaN],
+                     [ 0.4       ,  0.9486... ,         NaN,  1.        ]])
         """
         sc = SparkContext._active_spark_context
         javaCorrObj = _jvm().org.apache.spark.ml.stat.Correlation
@@ -134,6 +134,67 @@ class Correlation(object):
         return _java2py(sc, javaCorrObj.corr(*args))
 
 
+class KolmogorovSmirnovTest(object):
+    """
+    .. note:: Experimental
+
+    Conduct the two-sided Kolmogorov Smirnov (KS) test for data sampled from a 
continuous
+    distribution.
+
+    By comparing the largest difference between the empirical cumulative
+    distribution of the sample data and the theoretical distribution we can 
provide a test for the
+    the null hypothesis that the sample data comes from that theoretical 
distribution.
+
+    .. versionadded:: 2.4.0
+
+    """
+    @staticmethod
+    @since("2.4.0")
+    def test(dataset, sampleCol, distName, *params):
+        """
+        Conduct a one-sample, two-sided Kolmogorov-Smirnov test for 
probability distribution
+        equality. Currently supports the normal distribution, taking as 
parameters the mean and
+        standard deviation.
+
+        :param dataset:
+          a Dataset or a DataFrame containing the sample of data to test.
+        :param sampleCol:
+          Name of sample column in dataset, of any numerical type.
+        :param distName:
+          a `string` name for a theoretical distribution, currently only 
support "norm".
+        :param params:
+          a list of `Double` values specifying the parameters to be used for 
the theoretical
+          distribution. For "norm" distribution, the parameters includes mean 
and variance.
+        :return:
+          A DataFrame that contains the Kolmogorov-Smirnov test result for the 
input sampled data.
+          This DataFrame will contain a single Row with the following fields:
+          - `pValue: Double`
+          - `statistic: Double`
+
+        >>> from pyspark.ml.stat import KolmogorovSmirnovTest
+        >>> dataset = [[-1.0], [0.0], [1.0]]
+        >>> dataset = spark.createDataFrame(dataset, ['sample'])
+        >>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 
0.0, 1.0).first()
+        >>> round(ksResult.pValue, 3)
+        1.0
+        >>> round(ksResult.statistic, 3)
+        0.175
+        >>> dataset = [[2.0], [3.0], [4.0]]
+        >>> dataset = spark.createDataFrame(dataset, ['sample'])
+        >>> ksResult = KolmogorovSmirnovTest.test(dataset, 'sample', 'norm', 
3.0, 1.0).first()
+        >>> round(ksResult.pValue, 3)
+        1.0
+        >>> round(ksResult.statistic, 3)
+        0.175
+        """
+        sc = SparkContext._active_spark_context
+        javaTestObj = _jvm().org.apache.spark.ml.stat.KolmogorovSmirnovTest
+        dataset = _py2java(sc, dataset)
+        params = [float(param) for param in params]
+        return _java2py(sc, javaTestObj.test(dataset, sampleCol, distName,
+                                             _jvm().PythonUtils.toSeq(params)))
+
+
 if __name__ == "__main__":
     import doctest
     import pyspark.ml.stat


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to