This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 58b22fd17345 [SPARK-46433][PS][TESTS][FOLLOWUPS] Reorganize `OpsOnDiffFramesGroupByTests`: Factor out 4 more slow tests 58b22fd17345 is described below commit 58b22fd17345659f6a2ed24419f58a09914c0357 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Tue Dec 19 17:00:11 2023 +0800 [SPARK-46433][PS][TESTS][FOLLOWUPS] Reorganize `OpsOnDiffFramesGroupByTests`: Factor out 4 more slow tests ### What changes were proposed in this pull request? Factor out 4 more slow tests ### Why are the changes needed? to make pyspark tests more suitable for parallelism ### Does this PR introduce _any_ user-facing change? no, test-only ### How was this patch tested? ci ### Was this patch authored or co-authored using generative AI tooling? no Closes #44408 from zhengruifeng/ps_test_diff_groupby_2. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- dev/sparktestsupport/modules.py | 8 ++ .../diff_frames_ops/test_parity_groupby_diff.py | 41 +++++++ .../test_parity_groupby_diff_len.py | 41 +++++++ .../diff_frames_ops/test_parity_groupby_fillna.py | 41 +++++++ .../diff_frames_ops/test_parity_groupby_filter.py | 41 +++++++ .../tests/diff_frames_ops/test_groupby_diff.py | 82 ++++++++++++++ .../tests/diff_frames_ops/test_groupby_diff_len.py | 100 +++++++++++++++++ .../tests/diff_frames_ops/test_groupby_fillna.py | 105 ++++++++++++++++++ .../tests/diff_frames_ops/test_groupby_filter.py | 82 ++++++++++++++ .../tests/test_ops_on_diff_frames_groupby.py | 122 --------------------- 10 files changed, 541 insertions(+), 122 deletions(-) diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index 9877835fce00..38d3d42b658c 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -870,6 +870,10 @@ pyspark_pandas_slow = Module( "pyspark.pandas.tests.diff_frames_ops.test_groupby_aggregate", "pyspark.pandas.tests.diff_frames_ops.test_groupby_apply", "pyspark.pandas.tests.diff_frames_ops.test_groupby_cumulative", + "pyspark.pandas.tests.diff_frames_ops.test_groupby_diff", + "pyspark.pandas.tests.diff_frames_ops.test_groupby_diff_len", + "pyspark.pandas.tests.diff_frames_ops.test_groupby_fillna", + "pyspark.pandas.tests.diff_frames_ops.test_groupby_filter", "pyspark.pandas.tests.series.test_all_any", "pyspark.pandas.tests.series.test_arg_ops", "pyspark.pandas.tests.series.test_as_of", @@ -1207,6 +1211,10 @@ pyspark_pandas_connect_part3 = Module( "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_aggregate", "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_apply", "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_cumulative", + "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_diff", + "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_diff_len", + "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_fillna", + "pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_filter", ], excluded_python_implementations=[ "PyPy" # Skip these tests under PyPy since they require numpy, pandas, and pyarrow and diff --git a/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_groupby_diff.py b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_groupby_diff.py new file mode 100644 index 000000000000..7e96834623b5 --- /dev/null +++ b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_groupby_diff.py @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest + +from pyspark.pandas.tests.diff_frames_ops.test_groupby_diff import GroupByDiffMixin +from pyspark.testing.connectutils import ReusedConnectTestCase +from pyspark.testing.pandasutils import PandasOnSparkTestUtils + + +class GroupByDiffParityTests( + GroupByDiffMixin, + PandasOnSparkTestUtils, + ReusedConnectTestCase, +): + pass + + +if __name__ == "__main__": + from pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_diff import * # noqa + + try: + import xmlrunner # type: ignore[import] + + testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) diff --git a/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_groupby_diff_len.py b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_groupby_diff_len.py new file mode 100644 index 000000000000..8aab992c1a4a --- /dev/null +++ b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_groupby_diff_len.py @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest + +from pyspark.pandas.tests.diff_frames_ops.test_groupby_diff_len import GroupByDiffLenMixin +from pyspark.testing.connectutils import ReusedConnectTestCase +from pyspark.testing.pandasutils import PandasOnSparkTestUtils + + +class GroupByDiffLenParityTests( + GroupByDiffLenMixin, + PandasOnSparkTestUtils, + ReusedConnectTestCase, +): + pass + + +if __name__ == "__main__": + from pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_diff_len import * # noqa + + try: + import xmlrunner # type: ignore[import] + + testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) diff --git a/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_groupby_fillna.py b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_groupby_fillna.py new file mode 100644 index 000000000000..d997329bd5dc --- /dev/null +++ b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_groupby_fillna.py @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest + +from pyspark.pandas.tests.diff_frames_ops.test_groupby_fillna import GroupByFillNAMixin +from pyspark.testing.connectutils import ReusedConnectTestCase +from pyspark.testing.pandasutils import PandasOnSparkTestUtils + + +class GroupByFillNAParityTests( + GroupByFillNAMixin, + PandasOnSparkTestUtils, + ReusedConnectTestCase, +): + pass + + +if __name__ == "__main__": + from pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_fillna import * # noqa + + try: + import xmlrunner # type: ignore[import] + + testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) diff --git a/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_groupby_filter.py b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_groupby_filter.py new file mode 100644 index 000000000000..a1805f56a78b --- /dev/null +++ b/python/pyspark/pandas/tests/connect/diff_frames_ops/test_parity_groupby_filter.py @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import unittest + +from pyspark.pandas.tests.diff_frames_ops.test_groupby_filter import GroupByFilterMixin +from pyspark.testing.connectutils import ReusedConnectTestCase +from pyspark.testing.pandasutils import PandasOnSparkTestUtils + + +class GroupByFilterParityTests( + GroupByFilterMixin, + PandasOnSparkTestUtils, + ReusedConnectTestCase, +): + pass + + +if __name__ == "__main__": + from pyspark.pandas.tests.connect.diff_frames_ops.test_parity_groupby_filter import * # noqa + + try: + import xmlrunner # type: ignore[import] + + testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) diff --git a/python/pyspark/pandas/tests/diff_frames_ops/test_groupby_diff.py b/python/pyspark/pandas/tests/diff_frames_ops/test_groupby_diff.py new file mode 100644 index 000000000000..fe613e3ef401 --- /dev/null +++ b/python/pyspark/pandas/tests/diff_frames_ops/test_groupby_diff.py @@ -0,0 +1,82 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pandas as pd + +from pyspark import pandas as ps +from pyspark.pandas.config import set_option, reset_option +from pyspark.testing.pandasutils import PandasOnSparkTestCase +from pyspark.testing.sqlutils import SQLTestUtils + + +class GroupByDiffMixin: + @classmethod + def setUpClass(cls): + super().setUpClass() + set_option("compute.ops_on_diff_frames", True) + + @classmethod + def tearDownClass(cls): + reset_option("compute.ops_on_diff_frames") + super().tearDownClass() + + def test_diff(self): + pdf = pd.DataFrame( + { + "a": [1, 2, 3, 4, 5, 6] * 3, + "b": [1, 1, 2, 3, 5, 8] * 3, + "c": [1, 4, 9, 16, 25, 36] * 3, + } + ) + pkey = pd.Series([1, 1, 2, 3, 5, 8] * 3) + psdf = ps.from_pandas(pdf) + kkey = ps.from_pandas(pkey) + + self.assert_eq( + psdf.groupby(kkey).diff().sort_index(), pdf.groupby(pkey).diff().sort_index() + ) + self.assert_eq( + psdf.groupby(kkey)["a"].diff().sort_index(), pdf.groupby(pkey)["a"].diff().sort_index() + ) + self.assert_eq( + psdf.groupby(kkey)[["a"]].diff().sort_index(), + pdf.groupby(pkey)[["a"]].diff().sort_index(), + ) + + self.assert_eq(psdf.groupby(kkey).diff().sum(), pdf.groupby(pkey).diff().sum().astype(int)) + self.assert_eq(psdf.groupby(kkey)["a"].diff().sum(), pdf.groupby(pkey)["a"].diff().sum()) + + +class GroupByDiffTests( + GroupByDiffMixin, + PandasOnSparkTestCase, + SQLTestUtils, +): + pass + + +if __name__ == "__main__": + import unittest + from pyspark.pandas.tests.diff_frames_ops.test_groupby_diff import * # noqa + + try: + import xmlrunner + + testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) diff --git a/python/pyspark/pandas/tests/diff_frames_ops/test_groupby_diff_len.py b/python/pyspark/pandas/tests/diff_frames_ops/test_groupby_diff_len.py new file mode 100644 index 000000000000..0600b014ef76 --- /dev/null +++ b/python/pyspark/pandas/tests/diff_frames_ops/test_groupby_diff_len.py @@ -0,0 +1,100 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pandas as pd + +from pyspark import pandas as ps +from pyspark.pandas.config import set_option, reset_option +from pyspark.testing.pandasutils import PandasOnSparkTestCase +from pyspark.testing.sqlutils import SQLTestUtils + + +class GroupByDiffLenMixin: + @classmethod + def setUpClass(cls): + super().setUpClass() + set_option("compute.ops_on_diff_frames", True) + + @classmethod + def tearDownClass(cls): + reset_option("compute.ops_on_diff_frames") + super().tearDownClass() + + def test_groupby_different_lengths(self): + pdfs1 = [ + pd.DataFrame({"c": [4, 2, 7, 3, None, 1, 1, 1, 2], "d": list("abcdefght")}), + pd.DataFrame({"c": [4, 2, 7, None, 1, 1, 2], "d": list("abcdefg")}), + pd.DataFrame({"c": [4, 2, 7, 3, None, 1, 1, 1, 2, 2], "d": list("abcdefghti")}), + ] + pdfs2 = [ + pd.DataFrame({"a": [1, 2, 6, 4, 4, 6, 4, 3, 7], "b": [4, 2, 7, 3, 3, 1, 1, 1, 2]}), + pd.DataFrame({"a": [1, 2, 6, 4, 4, 6, 4, 7], "b": [4, 2, 7, 3, 3, 1, 1, 2]}), + pd.DataFrame({"a": [1, 2, 6, 4, 4, 6, 4, 3, 7], "b": [4, 2, 7, 3, 3, 1, 1, 1, 2]}), + ] + + for pdf1, pdf2 in zip(pdfs1, pdfs2): + psdf1 = ps.from_pandas(pdf1) + psdf2 = ps.from_pandas(pdf2) + + for as_index in [True, False]: + if as_index: + + def sort(df): + return df.sort_index() + + else: + + def sort(df): + return df.sort_values("c").reset_index(drop=True) + + self.assert_eq( + sort(psdf1.groupby(psdf2.a, as_index=as_index).sum()), + sort(pdf1.groupby(pdf2.a, as_index=as_index).sum()), + almost=as_index, + ) + + self.assert_eq( + sort(psdf1.groupby(psdf2.a, as_index=as_index).c.sum()), + sort(pdf1.groupby(pdf2.a, as_index=as_index).c.sum()), + almost=as_index, + ) + self.assert_eq( + sort(psdf1.groupby(psdf2.a, as_index=as_index)["c"].sum()), + sort(pdf1.groupby(pdf2.a, as_index=as_index)["c"].sum()), + almost=as_index, + ) + + +class GroupByDiffLenTests( + GroupByDiffLenMixin, + PandasOnSparkTestCase, + SQLTestUtils, +): + pass + + +if __name__ == "__main__": + import unittest + from pyspark.pandas.tests.diff_frames_ops.test_groupby_diff_len import * # noqa + + try: + import xmlrunner + + testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) diff --git a/python/pyspark/pandas/tests/diff_frames_ops/test_groupby_fillna.py b/python/pyspark/pandas/tests/diff_frames_ops/test_groupby_fillna.py new file mode 100644 index 000000000000..ad2c36cec50d --- /dev/null +++ b/python/pyspark/pandas/tests/diff_frames_ops/test_groupby_fillna.py @@ -0,0 +1,105 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pandas as pd + +from pyspark import pandas as ps +from pyspark.pandas.config import set_option, reset_option +from pyspark.testing.pandasutils import PandasOnSparkTestCase +from pyspark.testing.sqlutils import SQLTestUtils + + +class GroupByFillNAMixin: + @classmethod + def setUpClass(cls): + super().setUpClass() + set_option("compute.ops_on_diff_frames", True) + + @classmethod + def tearDownClass(cls): + reset_option("compute.ops_on_diff_frames") + super().tearDownClass() + + def test_fillna(self): + pdf = pd.DataFrame( + { + "A": [1, 1, 2, 2] * 3, + "B": [2, 4, None, 3] * 3, + "C": [None, None, None, 1] * 3, + "D": [0, 1, 5, 4] * 3, + } + ) + pkey = pd.Series([1, 1, 2, 2] * 3) + psdf = ps.from_pandas(pdf) + kkey = ps.from_pandas(pkey) + + self.assert_eq( + psdf.groupby(kkey).fillna(0).sort_index(), pdf.groupby(pkey).fillna(0).sort_index() + ) + self.assert_eq( + psdf.groupby(kkey)["C"].fillna(0).sort_index(), + pdf.groupby(pkey)["C"].fillna(0).sort_index(), + ) + self.assert_eq( + psdf.groupby(kkey)[["C"]].fillna(0).sort_index(), + pdf.groupby(pkey)[["C"]].fillna(0).sort_index(), + ) + self.assert_eq( + psdf.groupby(kkey).fillna(method="bfill").sort_index(), + pdf.groupby(pkey).fillna(method="bfill").sort_index(), + ) + self.assert_eq( + psdf.groupby(kkey)["C"].fillna(method="bfill").sort_index(), + pdf.groupby(pkey)["C"].fillna(method="bfill").sort_index(), + ) + self.assert_eq( + psdf.groupby(kkey)[["C"]].fillna(method="bfill").sort_index(), + pdf.groupby(pkey)[["C"]].fillna(method="bfill").sort_index(), + ) + self.assert_eq( + psdf.groupby(kkey).fillna(method="ffill").sort_index(), + pdf.groupby(pkey).fillna(method="ffill").sort_index(), + ) + self.assert_eq( + psdf.groupby(kkey)["C"].fillna(method="ffill").sort_index(), + pdf.groupby(pkey)["C"].fillna(method="ffill").sort_index(), + ) + self.assert_eq( + psdf.groupby(kkey)[["C"]].fillna(method="ffill").sort_index(), + pdf.groupby(pkey)[["C"]].fillna(method="ffill").sort_index(), + ) + + +class GroupByFillNATests( + GroupByFillNAMixin, + PandasOnSparkTestCase, + SQLTestUtils, +): + pass + + +if __name__ == "__main__": + import unittest + from pyspark.pandas.tests.diff_frames_ops.test_groupby_fillna import * # noqa + + try: + import xmlrunner + + testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) diff --git a/python/pyspark/pandas/tests/diff_frames_ops/test_groupby_filter.py b/python/pyspark/pandas/tests/diff_frames_ops/test_groupby_filter.py new file mode 100644 index 000000000000..b0e61a13c3b8 --- /dev/null +++ b/python/pyspark/pandas/tests/diff_frames_ops/test_groupby_filter.py @@ -0,0 +1,82 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import pandas as pd + +from pyspark import pandas as ps +from pyspark.pandas.config import set_option, reset_option +from pyspark.testing.pandasutils import PandasOnSparkTestCase +from pyspark.testing.sqlutils import SQLTestUtils + + +class GroupByFilterMixin: + @classmethod + def setUpClass(cls): + super().setUpClass() + set_option("compute.ops_on_diff_frames", True) + + @classmethod + def tearDownClass(cls): + reset_option("compute.ops_on_diff_frames") + super().tearDownClass() + + def test_filter(self): + pdf = pd.DataFrame( + {"a": [1, 2, 3, 4, 5, 6], "b": [1, 1, 2, 3, 5, 8], "c": [1, 4, 9, 16, 25, 36]}, + columns=["a", "b", "c"], + ) + pkey = pd.Series([1, 1, 2, 3, 5, 8]) + psdf = ps.from_pandas(pdf) + kkey = ps.from_pandas(pkey) + + self.assert_eq( + psdf.groupby(kkey).filter(lambda x: any(x.a == 2)).sort_index(), + pdf.groupby(pkey).filter(lambda x: any(x.a == 2)).sort_index(), + ) + self.assert_eq( + psdf.groupby(kkey)["a"].filter(lambda x: any(x == 2)).sort_index(), + pdf.groupby(pkey)["a"].filter(lambda x: any(x == 2)).sort_index(), + ) + self.assert_eq( + psdf.groupby(kkey)[["a"]].filter(lambda x: any(x.a == 2)).sort_index(), + pdf.groupby(pkey)[["a"]].filter(lambda x: any(x.a == 2)).sort_index(), + ) + self.assert_eq( + psdf.groupby(["a", kkey]).filter(lambda x: any(x.a == 2)).sort_index(), + pdf.groupby(["a", pkey]).filter(lambda x: any(x.a == 2)).sort_index(), + ) + + +class GroupByFilterTests( + GroupByFilterMixin, + PandasOnSparkTestCase, + SQLTestUtils, +): + pass + + +if __name__ == "__main__": + import unittest + from pyspark.pandas.tests.diff_frames_ops.test_groupby_filter import * # noqa + + try: + import xmlrunner + + testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", verbosity=2) + except ImportError: + testRunner = None + unittest.main(testRunner=testRunner, verbosity=2) diff --git a/python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby.py b/python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby.py index 5dd4ec887c16..5c4c9283724c 100644 --- a/python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby.py +++ b/python/pyspark/pandas/tests/test_ops_on_diff_frames_groupby.py @@ -36,50 +36,6 @@ class OpsOnDiffFramesGroupByTestsMixin: reset_option("compute.ops_on_diff_frames") super().tearDownClass() - def test_groupby_different_lengths(self): - pdfs1 = [ - pd.DataFrame({"c": [4, 2, 7, 3, None, 1, 1, 1, 2], "d": list("abcdefght")}), - pd.DataFrame({"c": [4, 2, 7, None, 1, 1, 2], "d": list("abcdefg")}), - pd.DataFrame({"c": [4, 2, 7, 3, None, 1, 1, 1, 2, 2], "d": list("abcdefghti")}), - ] - pdfs2 = [ - pd.DataFrame({"a": [1, 2, 6, 4, 4, 6, 4, 3, 7], "b": [4, 2, 7, 3, 3, 1, 1, 1, 2]}), - pd.DataFrame({"a": [1, 2, 6, 4, 4, 6, 4, 7], "b": [4, 2, 7, 3, 3, 1, 1, 2]}), - pd.DataFrame({"a": [1, 2, 6, 4, 4, 6, 4, 3, 7], "b": [4, 2, 7, 3, 3, 1, 1, 1, 2]}), - ] - - for pdf1, pdf2 in zip(pdfs1, pdfs2): - psdf1 = ps.from_pandas(pdf1) - psdf2 = ps.from_pandas(pdf2) - - for as_index in [True, False]: - if as_index: - - def sort(df): - return df.sort_index() - - else: - - def sort(df): - return df.sort_values("c").reset_index(drop=True) - - self.assert_eq( - sort(psdf1.groupby(psdf2.a, as_index=as_index).sum()), - sort(pdf1.groupby(pdf2.a, as_index=as_index).sum()), - almost=as_index, - ) - - self.assert_eq( - sort(psdf1.groupby(psdf2.a, as_index=as_index).c.sum()), - sort(pdf1.groupby(pdf2.a, as_index=as_index).c.sum()), - almost=as_index, - ) - self.assert_eq( - sort(psdf1.groupby(psdf2.a, as_index=as_index)["c"].sum()), - sort(pdf1.groupby(pdf2.a, as_index=as_index)["c"].sum()), - almost=as_index, - ) - def test_groupby_multiindex_columns(self): pdf1 = pd.DataFrame( {("y", "c"): [4, 2, 7, 3, None, 1, 1, 1, 2], ("z", "d"): list("abcdefght")} @@ -207,32 +163,6 @@ class OpsOnDiffFramesGroupByTestsMixin: pdf.groupby(["a", pkey]).transform(lambda x: x + x.min()).sort_index(), ) - def test_filter(self): - pdf = pd.DataFrame( - {"a": [1, 2, 3, 4, 5, 6], "b": [1, 1, 2, 3, 5, 8], "c": [1, 4, 9, 16, 25, 36]}, - columns=["a", "b", "c"], - ) - pkey = pd.Series([1, 1, 2, 3, 5, 8]) - psdf = ps.from_pandas(pdf) - kkey = ps.from_pandas(pkey) - - self.assert_eq( - psdf.groupby(kkey).filter(lambda x: any(x.a == 2)).sort_index(), - pdf.groupby(pkey).filter(lambda x: any(x.a == 2)).sort_index(), - ) - self.assert_eq( - psdf.groupby(kkey)["a"].filter(lambda x: any(x == 2)).sort_index(), - pdf.groupby(pkey)["a"].filter(lambda x: any(x == 2)).sort_index(), - ) - self.assert_eq( - psdf.groupby(kkey)[["a"]].filter(lambda x: any(x.a == 2)).sort_index(), - pdf.groupby(pkey)[["a"]].filter(lambda x: any(x.a == 2)).sort_index(), - ) - self.assert_eq( - psdf.groupby(["a", kkey]).filter(lambda x: any(x.a == 2)).sort_index(), - pdf.groupby(["a", pkey]).filter(lambda x: any(x.a == 2)).sort_index(), - ) - def test_head(self): pdf = pd.DataFrame( { @@ -260,58 +190,6 @@ class OpsOnDiffFramesGroupByTestsMixin: psdf.groupby([kkey, "b"]).head(2).sort_index(), ) - def test_diff(self): - pdf = pd.DataFrame( - { - "a": [1, 2, 3, 4, 5, 6] * 3, - "b": [1, 1, 2, 3, 5, 8] * 3, - "c": [1, 4, 9, 16, 25, 36] * 3, - } - ) - pkey = pd.Series([1, 1, 2, 3, 5, 8] * 3) - psdf = ps.from_pandas(pdf) - kkey = ps.from_pandas(pkey) - - self.assert_eq( - psdf.groupby(kkey).diff().sort_index(), pdf.groupby(pkey).diff().sort_index() - ) - self.assert_eq( - psdf.groupby(kkey)["a"].diff().sort_index(), pdf.groupby(pkey)["a"].diff().sort_index() - ) - self.assert_eq( - psdf.groupby(kkey)[["a"]].diff().sort_index(), - pdf.groupby(pkey)[["a"]].diff().sort_index(), - ) - - self.assert_eq(psdf.groupby(kkey).diff().sum(), pdf.groupby(pkey).diff().sum().astype(int)) - self.assert_eq(psdf.groupby(kkey)["a"].diff().sum(), pdf.groupby(pkey)["a"].diff().sum()) - - def test_fillna(self): - pdf = pd.DataFrame( - { - "a": [1, 2, 3, 4, 5, 6] * 3, - "b": [1, 1, 2, 3, 5, 8] * 3, - "c": [1, 4, 9, 16, 25, 36] * 3, - }, - ) - pkey = pd.Series([1, 1, 2, 3, 5, 8] * 3) - psdf = ps.from_pandas(pdf) - kkey = ps.from_pandas(pkey) - - self.assert_eq( - psdf.groupby(kkey).rank().sort_index(), pdf.groupby(pkey).rank().sort_index() - ) - self.assert_eq( - psdf.groupby(kkey)["a"].rank().sort_index(), pdf.groupby(pkey)["a"].rank().sort_index() - ) - self.assert_eq( - psdf.groupby(kkey)[["a"]].rank().sort_index(), - pdf.groupby(pkey)[["a"]].rank().sort_index(), - ) - - self.assert_eq(psdf.groupby(kkey).rank().sum(), pdf.groupby(pkey).rank().sum()) - self.assert_eq(psdf.groupby(kkey)["a"].rank().sum(), pdf.groupby(pkey)["a"].rank().sum()) - def test_shift(self): pdf = pd.DataFrame( { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org