This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 83167e56ff9 [SPARK-40542][PS][SQL] Make `ddof` in `DataFrame.std` and `Series.std` accept arbitary integers 83167e56ff9 is described below commit 83167e56ff9cdfeb29da81d07d56b482ccfedc74 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Fri Sep 23 10:13:33 2022 +0800 [SPARK-40542][PS][SQL] Make `ddof` in `DataFrame.std` and `Series.std` accept arbitary integers ### What changes were proposed in this pull request? add a new `std` expression to support arbitary integeral `ddof` ### Why are the changes needed? for API coverage ### Does this PR introduce _any_ user-facing change? yes, it accept `ddof` other than {0, 1} before ``` In [4]: df = ps.DataFrame({'a': [1, 2, 3, np.nan], 'b': [0.1, 0.2, 0.3, np.nan]}, columns=['a', 'b']) In [5]: df.std(ddof=2) --------------------------------------------------------------------------- AssertionError Traceback (most recent call last) Cell In [5], line 1 ----> 1 df.std(ddof=2) File ~/Dev/spark/python/pyspark/pandas/generic.py:1866, in Frame.std(self, axis, skipna, ddof, numeric_only) 1803 def std( 1804 self, 1805 axis: Optional[Axis] = None, (...) 1808 numeric_only: bool = None, 1809 ) -> Union[Scalar, "Series"]: 1810 """ 1811 Return sample standard deviation. 1812 (...) 1864 0.816496580927726 1865 """ -> 1866 assert ddof in (0, 1) 1868 axis = validate_axis(axis) 1870 if numeric_only is None and axis == 0: AssertionError: ``` after: ``` In [3]: df = ps.DataFrame({'a': [1, 2, 3, np.nan], 'b': [0.1, 0.2, 0.3, np.nan]}, columns=['a', 'b']) In [4]: df.std(ddof=2) Out[4]: a 1.414214 b 0.141421 dtype: float64 In [5]: df.to_pandas().std(ddof=2) /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:975: PandasAPIOnSparkAdviceWarning: `to_pandas` loads all data into the driver's memory. It should only be used if the resulting pandas DataFrame is expected to be small. warnings.warn(message, PandasAPIOnSparkAdviceWarning) Out[5]: a 1.414214 b 0.141421 dtype: float64 ``` ### How was this patch tested? added testsuites Closes #37974 from zhengruifeng/ps_std_ddof. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- python/pyspark/pandas/generic.py | 21 +++++++++++++++----- python/pyspark/pandas/spark/functions.py | 5 +++++ .../pyspark/pandas/tests/test_generic_functions.py | 6 ++++++ python/pyspark/pandas/tests/test_stats.py | 6 ++++++ .../expressions/aggregate/CentralMomentAgg.scala | 23 ++++++++++++++++++++++ .../spark/sql/api/python/PythonSQLUtils.scala | 4 ++++ 6 files changed, 60 insertions(+), 5 deletions(-) diff --git a/python/pyspark/pandas/generic.py b/python/pyspark/pandas/generic.py index cafa37e3d9b..6ba967da7f5 100644 --- a/python/pyspark/pandas/generic.py +++ b/python/pyspark/pandas/generic.py @@ -1810,6 +1810,8 @@ class Frame(object, metaclass=ABCMeta): """ Return sample standard deviation. + .. versionadded:: 3.3.0 + Parameters ---------- axis : {index (0), columns (1)} @@ -1822,6 +1824,9 @@ class Frame(object, metaclass=ABCMeta): ddof : int, default 1 Delta Degrees of Freedom. The divisor used in calculations is N - ddof, where N represents the number of elements. + + .. versionchanged:: 3.4.0 + Supported including arbitary integers. numeric_only : bool, default None Include only float, int, boolean columns. False is not supported. This parameter is mainly for pandas compatibility. @@ -1843,6 +1848,11 @@ class Frame(object, metaclass=ABCMeta): b 0.1 dtype: float64 + >>> df.std(ddof=2) + a 1.414214 + b 0.141421 + dtype: float64 + >>> df.std(axis=1) 0 0.636396 1 1.272792 @@ -1862,8 +1872,12 @@ class Frame(object, metaclass=ABCMeta): >>> df['a'].std(ddof=0) 0.816496580927726 + + >>> df['a'].std(ddof=-1) + 0.707106... """ - assert ddof in (0, 1) + if not isinstance(ddof, int): + raise TypeError("ddof must be integer") axis = validate_axis(axis) @@ -1881,10 +1895,7 @@ class Frame(object, metaclass=ABCMeta): spark_type_to_pandas_dtype(spark_type), spark_type.simpleString() ) ) - if ddof == 0: - return F.stddev_pop(spark_column) - else: - return F.stddev_samp(spark_column) + return SF.stddev(spark_column, ddof) return self._reduce_for_stat_function( std, name="std", axis=axis, numeric_only=numeric_only, ddof=ddof, skipna=skipna diff --git a/python/pyspark/pandas/spark/functions.py b/python/pyspark/pandas/spark/functions.py index ed3a5ae430d..3aa9d9a37dd 100644 --- a/python/pyspark/pandas/spark/functions.py +++ b/python/pyspark/pandas/spark/functions.py @@ -27,6 +27,11 @@ from pyspark.sql.column import ( ) +def stddev(col: Column, ddof: int) -> Column: + sc = SparkContext._active_spark_context + return Column(sc._jvm.PythonSQLUtils.pandasStddev(col._jc, ddof)) + + def skew(col: Column) -> Column: sc = SparkContext._active_spark_context return Column(sc._jvm.PythonSQLUtils.pandasSkewness(col._jc)) diff --git a/python/pyspark/pandas/tests/test_generic_functions.py b/python/pyspark/pandas/tests/test_generic_functions.py index 06f86c55a38..c8f6dc275da 100644 --- a/python/pyspark/pandas/tests/test_generic_functions.py +++ b/python/pyspark/pandas/tests/test_generic_functions.py @@ -166,6 +166,7 @@ class GenericFunctionsTest(PandasOnSparkTestCase, TestUtils): self._test_stat_functions(lambda x: x.max(skipna=False)) self._test_stat_functions(lambda x: x.std()) self._test_stat_functions(lambda x: x.std(skipna=False)) + self._test_stat_functions(lambda x: x.std(ddof=2)) self._test_stat_functions(lambda x: x.sem()) self._test_stat_functions(lambda x: x.sem(skipna=False)) # self._test_stat_functions(lambda x: x.skew()) @@ -175,6 +176,11 @@ class GenericFunctionsTest(PandasOnSparkTestCase, TestUtils): pdf = pd.DataFrame({"a": [np.nan, np.nan, np.nan], "b": [1, np.nan, 2], "c": [1, 2, 3]}) psdf = ps.from_pandas(pdf) + with self.assertRaisesRegex(TypeError, "ddof must be integer"): + psdf.std(ddof="ddof") + with self.assertRaisesRegex(TypeError, "ddof must be integer"): + psdf.a.std(ddof="ddof") + self.assert_eq(pdf.a.median(), psdf.a.median()) self.assert_eq(pdf.a.median(skipna=False), psdf.a.median(skipna=False)) self.assert_eq(1.0, psdf.b.median()) diff --git a/python/pyspark/pandas/tests/test_stats.py b/python/pyspark/pandas/tests/test_stats.py index db3f7fd45fe..7a6a0d67494 100644 --- a/python/pyspark/pandas/tests/test_stats.py +++ b/python/pyspark/pandas/tests/test_stats.py @@ -450,6 +450,7 @@ class StatsTest(PandasOnSparkTestCase, SQLTestUtils): self.assert_eq(psser.var(ddof=0), pser.var(ddof=0), almost=True) self.assert_eq(psser.std(), pser.std(), almost=True) self.assert_eq(psser.std(ddof=0), pser.std(ddof=0), almost=True) + self.assert_eq(psser.std(ddof=2), pser.std(ddof=2), almost=True) self.assert_eq(psser.sem(), pser.sem(), almost=True) self.assert_eq(psser.sem(ddof=0), pser.sem(ddof=0), almost=True) @@ -488,6 +489,11 @@ class StatsTest(PandasOnSparkTestCase, SQLTestUtils): pdf.std(ddof=0, numeric_only=True), check_exact=False, ) + self.assert_eq( + psdf.std(ddof=2, numeric_only=True), + pdf.std(ddof=2, numeric_only=True), + check_exact=False, + ) self.assert_eq(psdf.sem(numeric_only=True), pdf.sem(numeric_only=True), check_exact=False) self.assert_eq( psdf.sem(ddof=0, numeric_only=True), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala index 8d9c221ff94..b830dd6c088 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/CentralMomentAgg.scala @@ -339,6 +339,29 @@ case class Kurtosis( copy(child = newChild) } +/** + * Standard deviation in Pandas' fashion. + * This expression is dedicated only for Pandas API on Spark. + * Refer to pandas.core.nanops.nanstd. + */ +case class PandasStddev( + child: Expression, + ddof: Int) + extends CentralMomentAgg(child, true) { + + override protected def momentOrder = 2 + + override val evaluateExpression: Expression = { + If(n === 0.0, Literal.create(null, DoubleType), + If(n === ddof, divideByZeroEvalResult, sqrt(m2 / (n - ddof)))) + } + + override def prettyName: String = "pandas_stddev" + + override protected def withNewChildInternal(newChild: Expression): PandasStddev = + copy(child = newChild) +} + /** * Skewness in Pandas' fashion. This expression is dedicated only for Pandas API on Spark. * Refer to pandas.core.nanops.nanskew. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala index c495b145dc6..579a945398f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala @@ -155,6 +155,10 @@ private[sql] object PythonSQLUtils extends Logging { Column(TimestampDiff(unit, start.expr, end.expr)) } + def pandasStddev(e: Column, ddof: Int): Column = { + Column(PandasStddev(e.expr, ddof).toAggregateExpression(false)) + } + def pandasSkewness(e: Column): Column = { Column(PandasSkewness(e.expr).toAggregateExpression(false)) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org