This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 070461cc673c [SPARK-47779][PS][TESTS] Add a helper function to sort PS Frame/Series 070461cc673c is described below commit 070461cc673c3fc910e66d1cbf628632b558b48c Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Tue Apr 9 17:07:26 2024 +0800 [SPARK-47779][PS][TESTS] Add a helper function to sort PS Frame/Series ### What changes were proposed in this pull request? Add a helper function to sort PS Frame/Series Use it in `pyspark.pandas.tests.diff_frames_ops.test_arithmetic_*` and their parity tests ### Why are the changes needed? normally, `sort_index` or `sort_value` is enough to make test result deterministic. However, there are some edge cases like `MultiIndex` with duplicated indices and the sorted result is non-deterministic. for example, test `pyspark.pandas.tests.connect.diff_frames_ops.test_parity_arithmetic_chain.ArithmeticChainParityTests.test_arithmetic_chain` fail in some testing envs even with `sort_index`: ``` Left: cow length 250.0 power NaN speed -218.8 speed 1.2 weight NaN ... float64 Right: cow length 250.0 power NaN speed 1.2 speed -218.8 weight NaN ... float64 ``` This PR introduce a new helper function to sort indices and values together. ### Does this PR introduce _any_ user-facing change? no, test only ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #45952 from zhengruifeng/ps_sort_index_value. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- .../tests/diff_frames_ops/test_arithmetic_chain.py | 15 +++++--- python/pyspark/testing/pandasutils.py | 42 ++++++++++++++++++++++ 2 files changed, 52 insertions(+), 5 deletions(-) diff --git a/python/pyspark/pandas/tests/diff_frames_ops/test_arithmetic_chain.py b/python/pyspark/pandas/tests/diff_frames_ops/test_arithmetic_chain.py index fef695dbb989..41c678dc101f 100644 --- a/python/pyspark/pandas/tests/diff_frames_ops/test_arithmetic_chain.py +++ b/python/pyspark/pandas/tests/diff_frames_ops/test_arithmetic_chain.py @@ -84,24 +84,29 @@ class ArithmeticChainTestingFuncMixin: # MultiIndex Series self.assert_eq( - (psser1 + psser2 - psser3).sort_index(), (pser1 + pser2 - pser3).sort_index() + self.sort_index_with_values(psser1 + psser2 - psser3), + self.sort_index_with_values(pser1 + pser2 - pser3), ) self.assert_eq( - (psser1 * psser2 * psser3).sort_index(), (pser1 * pser2 * pser3).sort_index() + self.sort_index_with_values(psser1 * psser2 * psser3), + self.sort_index_with_values(pser1 * pser2 * pser3), ) if check_extension and not extension_float_dtypes_available: self.assert_eq( - (psser1 - psser2 / psser3).sort_index(), (pser1 - pser2 / pser3).sort_index() + self.sort_index_with_values(psser1 - psser2 / psser3), + self.sort_index_with_values(pser1 - pser2 / pser3), ) else: self.assert_eq( - (psser1 - psser2 / psser3).sort_index(), (pser1 - pser2 / pser3).sort_index() + self.sort_index_with_values(psser1 - psser2 / psser3), + self.sort_index_with_values(pser1 - pser2 / pser3), ) self.assert_eq( - (psser1 + psser2 * psser3).sort_index(), (pser1 + pser2 * pser3).sort_index() + self.sort_index_with_values(psser1 + psser2 * psser3), + self.sort_index_with_values(pser1 + pser2 * pser3), ) diff --git a/python/pyspark/testing/pandasutils.py b/python/pyspark/testing/pandasutils.py index 1d2dc35280ef..a660057b2c3c 100644 --- a/python/pyspark/testing/pandasutils.py +++ b/python/pyspark/testing/pandasutils.py @@ -330,6 +330,48 @@ class PandasOnSparkTestUtils: """ return lambda x: getattr(x, func)() + def sort_index_with_values(self, pobj: Any): + assert isinstance(pobj, (pd.Series, pd.DataFrame, ps.Series, ps.DataFrame)) + + if isinstance(pobj, (ps.Series, ps.DataFrame)): + if isinstance(pobj, ps.Series): + psdf = pobj._psdf[[pobj.name]] + else: + psdf = pobj + scols = psdf._internal.index_spark_columns + psdf._internal.data_spark_columns + sorted = psdf._sort( + by=scols, + ascending=True, + na_position="last", + ) + if isinstance(pobj, ps.Series): + from pyspark.pandas.series import first_series + + return first_series(sorted) + else: + return sorted + else: + # quick-sort values and then stable-sort index + if isinstance(pobj, pd.Series): + return pobj.sort_values( + ascending=True, + na_position="last", + ).sort_index( + ascending=True, + na_position="last", + kind="mergesort", + ) + else: + return pobj.sort_values( + by=list(pobj.columns), + ascending=True, + na_position="last", + ).sort_index( + ascending=True, + na_position="last", + kind="mergesort", + ) + def assertPandasEqual(self, left: Any, right: Any, check_exact: bool = True): _assert_pandas_equal(left, right, check_exact) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org