This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d793c5c6858 [SPARK-39284][PS] Implement Groupby.mad d793c5c6858 is described below commit d793c5c6858cb3d89fd981495a85f4c60ae63035 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Sun Jun 5 09:49:24 2022 +0800 [SPARK-39284][PS] Implement Groupby.mad ### What changes were proposed in this pull request? Implement Groupby.mad ### Why are the changes needed? to increase pandas api coverage ### Does this PR introduce _any_ user-facing change? yes ``` In [6]: pdf = pd.DataFrame({"A": [1, 2, 2, 1, 1], "B": [3, 2, 3, 9, 0], "C": [3, 4, 13, -14, 9]}) In [7]: psdf = ps.from_pandas(pdf) In [8]: pdf.groupby("A")[["B", "C"]].mad() Out[8]: B C A 1 3.333333 8.888889 2 0.500000 4.500000 In [9]: psdf.groupby("A")[["B", "C"]].mad() Out[9]: B C A 1 3.333333 8.888889 2 0.500000 4.500000 In [10]: pdf.B.groupby(pdf.A).mad() Out[10]: A 1 3.333333 2 0.500000 Name: B, dtype: float64 In [11]: psdf.B.groupby(psdf.A).mad() Out[11]: A 1 3.333333 2 0.500000 Name: B, dtype: float64 ``` ### How was this patch tested? added ut Closes #36660 from zhengruifeng/ps_groupby_mad. Lead-authored-by: Ruifeng Zheng <ruife...@apache.org> Co-authored-by: Ruifeng Zheng <ruife...@foxmail.com> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- python/pyspark/pandas/groupby.py | 84 +++++++++++++++++++++++++++-- python/pyspark/pandas/missing/groupby.py | 2 - python/pyspark/pandas/tests/test_groupby.py | 3 ++ 3 files changed, 83 insertions(+), 6 deletions(-) diff --git a/python/pyspark/pandas/groupby.py b/python/pyspark/pandas/groupby.py index ce8a322c20b..4377ad6a5c9 100644 --- a/python/pyspark/pandas/groupby.py +++ b/python/pyspark/pandas/groupby.py @@ -753,6 +753,80 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): bool_to_numeric=True, ) + # TODO: 'axis', 'skipna', 'level' parameter should be implemented. + def mad(self) -> FrameLike: + """ + Compute mean absolute deviation of groups, excluding missing values. + + .. versionadded:: 3.4.0 + + Examples + -------- + >>> df = ps.DataFrame({"A": [1, 2, 1, 1], "B": [True, False, False, True], + ... "C": [3, 4, 3, 4], "D": ["a", "b", "b", "a"]}) + + >>> df.groupby("A").mad() + B C + A + 1 0.444444 0.444444 + 2 0.000000 0.000000 + + >>> df.B.groupby(df.A).mad() + A + 1 0.444444 + 2 0.000000 + Name: B, dtype: float64 + + See Also + -------- + pyspark.pandas.Series.groupby + pyspark.pandas.DataFrame.groupby + """ + groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in range(len(self._groupkeys))] + internal, agg_columns, sdf = self._prepare_reduce( + groupkey_names=groupkey_names, + accepted_spark_types=(NumericType, BooleanType), + bool_to_numeric=False, + ) + psdf: DataFrame = DataFrame(internal) + + if len(psdf._internal.column_labels) > 0: + window = Window.partitionBy(groupkey_names).rowsBetween( + Window.unboundedPreceding, Window.unboundedFollowing + ) + new_agg_scols = {} + new_stat_scols = [] + for agg_column in agg_columns: + # it is not able to directly use 'self._reduce_for_stat_function', due to + # 'it is not allowed to use a window function inside an aggregate function'. + # so we need to create temporary columns to compute the 'abs(x - avg(x))' here. + agg_column_name = agg_column._internal.data_spark_column_names[0] + new_agg_column_name = verify_temp_column_name( + psdf._internal.spark_frame, "__tmp_agg_col_{}__".format(agg_column_name) + ) + casted_agg_scol = F.col(agg_column_name).cast("double") + new_agg_scols[new_agg_column_name] = F.abs( + casted_agg_scol - F.avg(casted_agg_scol).over(window) + ) + new_stat_scols.append(F.avg(F.col(new_agg_column_name)).alias(agg_column_name)) + + sdf = ( + psdf._internal.spark_frame.withColumns(new_agg_scols) + .groupby(groupkey_names) + .agg(*new_stat_scols) + ) + else: + sdf = sdf.select(*groupkey_names).distinct() + + internal = internal.copy( + spark_frame=sdf, + index_spark_columns=[scol_for(sdf, col) for col in groupkey_names], + data_spark_columns=[scol_for(sdf, col) for col in internal.data_spark_column_names], + data_fields=None, + ) + + return self._prepare_return(DataFrame(internal)) + def all(self, skipna: bool = True) -> FrameLike: """ Returns True if all values in the group are truthful, else False. @@ -805,7 +879,7 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): 5 False """ groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in range(len(self._groupkeys))] - internal, sdf = self._prepare_reduce(groupkey_names) + internal, _, sdf = self._prepare_reduce(groupkey_names) psdf: DataFrame = DataFrame(internal) def sfun(scol: Column, scol_type: DataType) -> Column: @@ -3022,7 +3096,9 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): `accepted_spark_types`. """ groupkey_names = [SPARK_INDEX_NAME_FORMAT(i) for i in range(len(self._groupkeys))] - internal, sdf = self._prepare_reduce(groupkey_names, accepted_spark_types, bool_to_numeric) + internal, _, sdf = self._prepare_reduce( + groupkey_names, accepted_spark_types, bool_to_numeric + ) psdf: DataFrame = DataFrame(internal) if len(psdf._internal.column_labels) > 0: @@ -3072,7 +3148,7 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): groupkey_names: List, accepted_spark_types: Optional[Tuple[Type[DataType], ...]] = None, bool_to_numeric: bool = False, - ) -> Tuple[InternalFrame, SparkDataFrame]: + ) -> Tuple[InternalFrame, List[Series], SparkDataFrame]: groupkey_scols = [s.alias(name) for s, name in zip(self._groupkeys_scols, groupkey_names)] agg_columns = [] for psser in self._agg_columns: @@ -3100,7 +3176,7 @@ class GroupBy(Generic[FrameLike], metaclass=ABCMeta): data_fields=[psser._internal.data_fields[0] for psser in agg_columns], column_label_names=self._psdf._internal.column_label_names, ) - return internal, sdf + return internal, agg_columns, sdf @staticmethod def _resolve_grouping_from_diff_dataframes( diff --git a/python/pyspark/pandas/missing/groupby.py b/python/pyspark/pandas/missing/groupby.py index 3ea443ebd6e..ce61b1df1e1 100644 --- a/python/pyspark/pandas/missing/groupby.py +++ b/python/pyspark/pandas/missing/groupby.py @@ -48,7 +48,6 @@ class MissingPandasLikeDataFrameGroupBy: groups = _unsupported_property("groups") hist = _unsupported_property("hist") indices = _unsupported_property("indices") - mad = _unsupported_property("mad") ngroups = _unsupported_property("ngroups") plot = _unsupported_property("plot") quantile = _unsupported_property("quantile") @@ -82,7 +81,6 @@ class MissingPandasLikeSeriesGroupBy: indices = _unsupported_property("indices") is_monotonic_decreasing = _unsupported_property("is_monotonic_decreasing") is_monotonic_increasing = _unsupported_property("is_monotonic_increasing") - mad = _unsupported_property("mad") ngroups = _unsupported_property("ngroups") plot = _unsupported_property("plot") quantile = _unsupported_property("quantile") diff --git a/python/pyspark/pandas/tests/test_groupby.py b/python/pyspark/pandas/tests/test_groupby.py index 3709a696f2d..cff2ce706d8 100644 --- a/python/pyspark/pandas/tests/test_groupby.py +++ b/python/pyspark/pandas/tests/test_groupby.py @@ -1353,6 +1353,9 @@ class GroupByTest(PandasOnSparkTestCase, TestUtils): self._test_stat_func(lambda groupby_obj: groupby_obj.max(numeric_only=None)) self._test_stat_func(lambda groupby_obj: groupby_obj.max(numeric_only=True)) + def test_mad(self): + self._test_stat_func(lambda groupby_obj: groupby_obj.mad()) + def test_first(self): self._test_stat_func(lambda groupby_obj: groupby_obj.first()) self._test_stat_func(lambda groupby_obj: groupby_obj.first(numeric_only=None)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org