This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new f92b4941c63 [SPARK-40386][PS][SQL] Implement `ddof` in `DataFrame.cov` f92b4941c63 is described below commit f92b4941c631526a387c6f23554db53fbf922b96 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Tue Sep 13 11:43:47 2022 +0800 [SPARK-40386][PS][SQL] Implement `ddof` in `DataFrame.cov` ### What changes were proposed in this pull request? 1, add a dedicated expression for `DataFrame.cov`; 2, add missing parameter `ddof` in `DataFrame.cov` ### Why are the changes needed? for api coverage ### Does this PR introduce _any_ user-facing change? yes, API change ``` >>> np.random.seed(42) >>> df = ps.DataFrame(np.random.randn(1000, 5), ... columns=['a', 'b', 'c', 'd', 'e']) >>> df.cov() a b c d e a 0.998438 -0.020161 0.059277 -0.008943 0.014144 b -0.020161 1.059352 -0.008543 -0.024738 0.009826 c 0.059277 -0.008543 1.010670 -0.001486 -0.000271 d -0.008943 -0.024738 -0.001486 0.921297 -0.013692 e 0.014144 0.009826 -0.000271 -0.013692 0.977795 >>> df.cov(ddof=2) a b c d e a 0.999439 -0.020181 0.059336 -0.008952 0.014159 b -0.020181 1.060413 -0.008551 -0.024762 0.009836 c 0.059336 -0.008551 1.011683 -0.001487 -0.000271 d -0.008952 -0.024762 -0.001487 0.922220 -0.013705 e 0.014159 0.009836 -0.000271 -0.013705 0.978775 >>> df.cov(ddof=-1) a b c d e a 0.996444 -0.020121 0.059158 -0.008926 0.014116 b -0.020121 1.057235 -0.008526 -0.024688 0.009807 c 0.059158 -0.008526 1.008650 -0.001483 -0.000270 d -0.008926 -0.024688 -0.001483 0.919456 -0.013664 e 0.014116 0.009807 -0.000270 -0.013664 0.975842 ``` ### How was this patch tested? added tests Closes #37829 from zhengruifeng/ps_cov_ddof. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- python/pyspark/pandas/frame.py | 31 +++++++++++++++++----- python/pyspark/pandas/spark/functions.py | 5 ++++ python/pyspark/pandas/tests/test_dataframe.py | 10 +++++++ .../expressions/aggregate/Covariance.scala | 22 +++++++++++++++ .../spark/sql/api/python/PythonSQLUtils.scala | 4 +++ 5 files changed, 66 insertions(+), 6 deletions(-) diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py index 2a7fda2d527..3438d07896e 100644 --- a/python/pyspark/pandas/frame.py +++ b/python/pyspark/pandas/frame.py @@ -8738,8 +8738,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})] internal = self._internal.with_new_sdf(sdf, data_fields=data_fields) self._update_internal_frame(internal, check_same_anchor=False) - # TODO: ddof should be implemented. - def cov(self, min_periods: Optional[int] = None) -> "DataFrame": + def cov(self, min_periods: Optional[int] = None, ddof: int = 1) -> "DataFrame": """ Compute pairwise covariance of columns, excluding NA/null values. @@ -8755,8 +8754,7 @@ defaultdict(<class 'list'>, {'col..., 'col...})] below this threshold will be returned as ``NaN``. This method is generally used for the analysis of time series data to - understand the relationship between different measures - across time. + understand the relationship between different measures across time. .. versionadded:: 3.3.0 @@ -8765,6 +8763,11 @@ defaultdict(<class 'list'>, {'col..., 'col...})] min_periods : int, optional Minimum number of observations required per pair of columns to have a valid result. + ddof : int, default 1 + Delta degrees of freedom. The divisor used in calculations + is ``N - ddof``, where ``N`` represents the number of elements. + + .. versionadded:: 3.4.0 Returns ------- @@ -8794,6 +8797,20 @@ defaultdict(<class 'list'>, {'col..., 'col...})] c 0.059277 -0.008543 1.010670 -0.001486 -0.000271 d -0.008943 -0.024738 -0.001486 0.921297 -0.013692 e 0.014144 0.009826 -0.000271 -0.013692 0.977795 + >>> df.cov(ddof=2) + a b c d e + a 0.999439 -0.020181 0.059336 -0.008952 0.014159 + b -0.020181 1.060413 -0.008551 -0.024762 0.009836 + c 0.059336 -0.008551 1.011683 -0.001487 -0.000271 + d -0.008952 -0.024762 -0.001487 0.922220 -0.013705 + e 0.014159 0.009836 -0.000271 -0.013705 0.978775 + >>> df.cov(ddof=-1) + a b c d e + a 0.996444 -0.020121 0.059158 -0.008926 0.014116 + b -0.020121 1.057235 -0.008526 -0.024688 0.009807 + c 0.059158 -0.008526 1.008650 -0.001483 -0.000270 + d -0.008926 -0.024688 -0.001483 0.919456 -0.013664 + e 0.014116 0.009807 -0.000270 -0.013664 0.975842 **Minimum number of periods** @@ -8813,6 +8830,8 @@ defaultdict(<class 'list'>, {'col..., 'col...})] b NaN 1.248003 0.191417 c -0.150812 0.191417 0.895202 """ + if not isinstance(ddof, int): + raise TypeError("ddof must be integer") min_periods = 1 if min_periods is None else min_periods # Only compute covariance for Boolean and Numeric except Decimal @@ -8891,8 +8910,8 @@ defaultdict(<class 'list'>, {'col..., 'col...})] step += r for c in range(r, num_cols): cov_scols.append( - F.covar_samp( - F.col(data_cols[r]).cast("double"), F.col(data_cols[c]).cast("double") + SF.covar( + F.col(data_cols[r]).cast("double"), F.col(data_cols[c]).cast("double"), ddof ) if count_not_null[r * num_cols + c - step] >= min_periods else F.lit(None) diff --git a/python/pyspark/pandas/spark/functions.py b/python/pyspark/pandas/spark/functions.py index 58715b5f781..79d73dcd7ea 100644 --- a/python/pyspark/pandas/spark/functions.py +++ b/python/pyspark/pandas/spark/functions.py @@ -51,6 +51,11 @@ def mode(col: Column, dropna: bool) -> Column: return Column(sc._jvm.PythonSQLUtils.pandasMode(col._jc, dropna)) +def covar(col1: Column, col2: Column, ddof: int) -> Column: + sc = SparkContext._active_spark_context + return Column(sc._jvm.PythonSQLUtils.pandasCovar(col1._jc, col2._jc, ddof)) + + def repeat(col: Column, n: Union[int, Column]) -> Column: """ Repeats a string column n times, and returns it as a new string column. diff --git a/python/pyspark/pandas/tests/test_dataframe.py b/python/pyspark/pandas/tests/test_dataframe.py index 464cf09b45f..f491da72fd5 100644 --- a/python/pyspark/pandas/tests/test_dataframe.py +++ b/python/pyspark/pandas/tests/test_dataframe.py @@ -6958,6 +6958,16 @@ class DataFrameTest(ComparisonTestBase, SQLTestUtils): self.assert_eq(pdf.cov(min_periods=4), psdf.cov(min_periods=4), almost=True) self.assert_eq(pdf.cov(min_periods=5), psdf.cov(min_periods=5)) + # ddof + with self.assertRaisesRegex(TypeError, "ddof must be integer"): + psdf.cov(ddof="ddof") + for ddof in [-1, 0, 2]: + self.assert_eq(pdf.cov(ddof=ddof), psdf.cov(ddof=ddof), almost=True) + self.assert_eq( + pdf.cov(min_periods=4, ddof=ddof), psdf.cov(min_periods=4, ddof=ddof), almost=True + ) + self.assert_eq(pdf.cov(min_periods=5, ddof=ddof), psdf.cov(min_periods=5, ddof=ddof)) + # bool pdf = pd.DataFrame( { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala index 7a856a05b6f..ff31fb1128b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Covariance.scala @@ -143,3 +143,25 @@ case class CovSample( override protected def withNewChildrenInternal( newLeft: Expression, newRight: Expression): CovSample = copy(left = newLeft, right = newRight) } + +/** + * Covariance in Pandas' fashion. This expression is dedicated only for Pandas API on Spark. + * Refer to numpy.cov. + */ +case class PandasCovar( + override val left: Expression, + override val right: Expression, + ddof: Int) + extends Covariance(left, right, true) { + + override val evaluateExpression: Expression = { + If(n === 0.0, Literal.create(null, DoubleType), + If(n === ddof, divideByZeroEvalResult, ck / (n - ddof))) + } + override def prettyName: String = "pandas_covar" + + override protected def withNewChildrenInternal( + newLeft: Expression, + newRight: Expression): PandasCovar = + copy(left = newLeft, right = newRight) +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala index 538356cd8c8..2b74bcc3850 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/python/PythonSQLUtils.scala @@ -138,6 +138,10 @@ private[sql] object PythonSQLUtils extends Logging { def pandasMode(e: Column, ignoreNA: Boolean): Column = { Column(PandasMode(e.expr, ignoreNA).toAggregateExpression(false)) } + + def pandasCovar(col1: Column, col2: Column, ddof: Int): Column = { + Column(PandasCovar(col1.expr, col2.expr, ddof).toAggregateExpression(false)) + } } /** --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org