This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 83167e56ff9 [SPARK-40542][PS][SQL] Make `ddof` in `DataFrame.std` and 
`Series.std` accept arbitary integers
83167e56ff9 is described below

commit 83167e56ff9cdfeb29da81d07d56b482ccfedc74
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Fri Sep 23 10:13:33 2022 +0800

    [SPARK-40542][PS][SQL] Make `ddof` in `DataFrame.std` and `Series.std` 
accept arbitary integers
    
    ### What changes were proposed in this pull request?
    add a new `std` expression to support arbitary integeral `ddof`
    
    ### Why are the changes needed?
    for API coverage
    
    ### Does this PR introduce _any_ user-facing change?
    yes, it accept `ddof` other than {0, 1}
    
    before
    ```
    In [4]: df = ps.DataFrame({'a': [1, 2, 3, np.nan], 'b': [0.1, 0.2, 0.3, 
np.nan]}, columns=['a', 'b'])
    
    In [5]: df.std(ddof=2)
    ---------------------------------------------------------------------------
    AssertionError                            Traceback (most recent call last)
    Cell In [5], line 1
    ----> 1 df.std(ddof=2)
    
    File ~/Dev/spark/python/pyspark/pandas/generic.py:1866, in Frame.std(self, 
axis, skipna, ddof, numeric_only)
       1803 def std(
       1804     self,
       1805     axis: Optional[Axis] = None,
       (...)
       1808     numeric_only: bool = None,
       1809 ) -> Union[Scalar, "Series"]:
       1810     """
       1811     Return sample standard deviation.
       1812
       (...)
       1864     0.816496580927726
       1865     """
    -> 1866     assert ddof in (0, 1)
       1868     axis = validate_axis(axis)
       1870     if numeric_only is None and axis == 0:
    
    AssertionError:
    ```
    
    after:
    ```
    In [3]: df = ps.DataFrame({'a': [1, 2, 3, np.nan], 'b': [0.1, 0.2, 0.3, 
np.nan]}, columns=['a', 'b'])
    
    In [4]: df.std(ddof=2)
    Out[4]:
    a    1.414214
    b    0.141421
    dtype: float64
    
    In [5]: df.to_pandas().std(ddof=2)
    /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:975: 
PandasAPIOnSparkAdviceWarning: `to_pandas` loads all data into the driver's 
memory. It should only be used if the resulting pandas DataFrame is expected to 
be small.
      warnings.warn(message, PandasAPIOnSparkAdviceWarning)
    Out[5]:
    a    1.414214
    b    0.141421
    dtype: float64
    
    ```
    
    ### How was this patch tested?
    added testsuites
    
    Closes #37974 from zhengruifeng/ps_std_ddof.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 python/pyspark/pandas/generic.py                   | 21 +++++++++++++++-----
 python/pyspark/pandas/spark/functions.py           |  5 +++++
 .../pyspark/pandas/tests/test_generic_functions.py |  6 ++++++
 python/pyspark/pandas/tests/test_stats.py          |  6 ++++++
 .../expressions/aggregate/CentralMomentAgg.scala   | 23 ++++++++++++++++++++++
 .../spark/sql/api/python/PythonSQLUtils.scala      |  4 ++++
 6 files changed, 60 insertions(+), 5 deletions(-)

diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py
index cafa37e3d9b..6ba967da7f5 100644
--- a/python/pyspark/pandas/generic.py
+++ b/python/pyspark/pandas/generic.py
@@ -1810,6 +1810,8 @@ class Frame(object, metaclass=ABCMeta):
         """
         Return sample standard deviation.
 
+        .. versionadded:: 3.3.0
+
         Parameters
         ----------
         axis : {index (0), columns (1)}
@@ -1822,6 +1824,9 @@ class Frame(object, metaclass=ABCMeta):
         ddof : int, default 1
             Delta Degrees of Freedom. The divisor used in calculations is N - 
ddof,
             where N represents the number of elements.
+
+            .. versionchanged:: 3.4.0
+               Supported including arbitary integers.
         numeric_only : bool, default None
             Include only float, int, boolean columns. False is not supported. 
This parameter
             is mainly for pandas compatibility.
@@ -1843,6 +1848,11 @@ class Frame(object, metaclass=ABCMeta):
         b    0.1
         dtype: float64
 
+        >>> df.std(ddof=2)
+        a    1.414214
+        b    0.141421
+        dtype: float64
+
         >>> df.std(axis=1)
         0    0.636396
         1    1.272792
@@ -1862,8 +1872,12 @@ class Frame(object, metaclass=ABCMeta):
 
         >>> df['a'].std(ddof=0)
         0.816496580927726
+
+        >>> df['a'].std(ddof=-1)
+        0.707106...
         """
-        assert ddof in (0, 1)
+        if not isinstance(ddof, int):
+            raise TypeError("ddof must be integer")
 
         axis = validate_axis(axis)
 
@@ -1881,10 +1895,7 @@ class Frame(object, metaclass=ABCMeta):
                         spark_type_to_pandas_dtype(spark_type), 
spark_type.simpleString()
                     )
                 )
-            if ddof == 0:
-                return F.stddev_pop(spark_column)
-            else:
-                return F.stddev_samp(spark_column)
+            return SF.stddev(spark_column, ddof)
 
         return self._reduce_for_stat_function(
             std, name="std", axis=axis, numeric_only=numeric_only, ddof=ddof, 
skipna=skipna
diff --git a/python/pyspark/pandas/spark/functions.py 
b/python/pyspark/pandas/spark/functions.py
index ed3a5ae430d..3aa9d9a37dd 100644
--- a/python/pyspark/pandas/spark/functions.py
+++ b/python/pyspark/pandas/spark/functions.py
@@ -27,6 +27,11 @@ from pyspark.sql.column import (
 )
 
 
+def stddev(col: Column, ddof: int) -> Column:
+    sc = SparkContext._active_spark_context
+    return Column(sc._jvm.PythonSQLUtils.pandasStddev(col._jc, ddof))
+
+
 def skew(col: Column) -> Column:
     sc = SparkContext._active_spark_context
     return Column(sc._jvm.PythonSQLUtils.pandasSkewness(col._jc))
diff --git a/python/pyspark/pandas/tests/test_generic_functions.py 
b/python/pyspark/pandas/tests/test_generic_functions.py
index 06f86c55a38..c8f6dc275da 100644
--- a/python/pyspark/pandas/tests/test_generic_functions.py
+++ b/python/pyspark/pandas/tests/test_generic_functions.py
@@ -166,6 +166,7 @@ class GenericFunctionsTest(PandasOnSparkTestCase, 
TestUtils):
         self._test_stat_functions(lambda x: x.max(skipna=False))
         self._test_stat_functions(lambda x: x.std())
         self._test_stat_functions(lambda x: x.std(skipna=False))
+        self._test_stat_functions(lambda x: x.std(ddof=2))
         self._test_stat_functions(lambda x: x.sem())
         self._test_stat_functions(lambda x: x.sem(skipna=False))
         # self._test_stat_functions(lambda x: x.skew())
@@ -175,6 +176,11 @@ class GenericFunctionsTest(PandasOnSparkTestCase, 
TestUtils):
         pdf = pd.DataFrame({"a": [np.nan, np.nan, np.nan], "b": [1, np.nan, 
2], "c": [1, 2, 3]})
         psdf = ps.from_pandas(pdf)
 
+        with self.assertRaisesRegex(TypeError, "ddof must be integer"):
+            psdf.std(ddof="ddof")
+        with self.assertRaisesRegex(TypeError, "ddof must be integer"):
+            psdf.a.std(ddof="ddof")
+
         self.assert_eq(pdf.a.median(), psdf.a.median())
         self.assert_eq(pdf.a.median(skipna=False), psdf.a.median(skipna=False))
         self.assert_eq(1.0, psdf.b.median())
diff --git a/python/pyspark/pandas/tests/test_stats.py 
b/python/pyspark/pandas/tests/test_stats.py
index db3f7fd45fe..7a6a0d67494 100644
--- a/python/pyspark/pandas/tests/test_stats.py
+++ b/python/pyspark/pandas/tests/test_stats.py
@@ -450,6 +450,7 @@ class StatsTest(PandasOnSparkTestCase, SQLTestUtils):
         self.assert_eq(psser.var(ddof=0), pser.var(ddof=0), almost=True)
         self.assert_eq(psser.std(), pser.std(), almost=True)
         self.assert_eq(psser.std(ddof=0), pser.std(ddof=0), almost=True)
+        self.assert_eq(psser.std(ddof=2), pser.std(ddof=2), almost=True)
         self.assert_eq(psser.sem(), pser.sem(), almost=True)
         self.assert_eq(psser.sem(ddof=0), pser.sem(ddof=0), almost=True)
 
@@ -488,6 +489,11 @@ class StatsTest(PandasOnSparkTestCase, SQLTestUtils):
             pdf.std(ddof=0, numeric_only=True),
             check_exact=False,
         )
+        self.assert_eq(
+            psdf.std(ddof=2, numeric_only=True),
+            pdf.std(ddof=2, numeric_only=True),
+            check_exact=False,
+        )
         self.assert_eq(psdf.sem(numeric_only=True), 
pdf.sem(numeric_only=True), check_exact=False)
         self.assert_eq(
             psdf.sem(ddof=0, numeric_only=True),
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
index 8d9c221ff94..b830dd6c088 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala
@@ -339,6 +339,29 @@ case class Kurtosis(
     copy(child = newChild)
 }
 
+/**
+ * Standard deviation in Pandas' fashion.
+ * This expression is dedicated only for Pandas API on Spark.
+ * Refer to pandas.core.nanops.nanstd.
+ */
+case class PandasStddev(
+    child: Expression,
+    ddof: Int)
+  extends CentralMomentAgg(child, true) {
+
+  override protected def momentOrder = 2
+
+  override val evaluateExpression: Expression = {
+    If(n === 0.0, Literal.create(null, DoubleType),
+      If(n === ddof, divideByZeroEvalResult, sqrt(m2 / (n - ddof))))
+  }
+
+  override def prettyName: String = "pandas_stddev"
+
+  override protected def withNewChildInternal(newChild: Expression): 
PandasStddev =
+    copy(child = newChild)
+}
+
 /**
  * Skewness in Pandas' fashion. This expression is dedicated only for Pandas 
API on Spark.
  * Refer to pandas.core.nanops.nanskew.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
index c495b145dc6..579a945398f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala
@@ -155,6 +155,10 @@ private[sql] object PythonSQLUtils extends Logging {
     Column(TimestampDiff(unit, start.expr, end.expr))
   }
 
+  def pandasStddev(e: Column, ddof: Int): Column = {
+    Column(PandasStddev(e.expr, ddof).toAggregateExpression(false))
+  }
+
   def pandasSkewness(e: Column): Column = {
     Column(PandasSkewness(e.expr).toAggregateExpression(false))
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to