This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5db824aa6f9 [SPARK-45193][PS][CONNECT][TESTS] Refactor `test_mode` to be compatible with Spark Connect 5db824aa6f9 is described below commit 5db824aa6f9203b24668c4e0135fab50d20831da Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Mon Sep 18 17:00:35 2023 +0800 [SPARK-45193][PS][CONNECT][TESTS] Refactor `test_mode` to be compatible with Spark Connect ### What changes were proposed in this pull request? Refactor `test_mode` to be compatible with Spark Connect ### Why are the changes needed? for test parity ### Does this PR introduce _any_ user-facing change? No, test-only ### How was this patch tested? CI, and manually check: ``` In [5]: import pandas as pd In [6]: def func(iterator): ...: for pdf in iterator: ...: if len(pdf) > 0: ...: if pdf["partition"][0] == 3: ...: yield pd.DataFrame({"num" : ["0", "1", "2", "3", "4"] }) ...: else: ...: yield pd.DataFrame({"num" : ["3", "3", "3", "3", "4"]} ) ...: In [7]: df = spark.range(0, 4, 1, 4).select(sf.spark_partition_id().alias("partition")) In [8]: df.mapInPandas(func, "num string").withColumn("p", sf.spark_partition_id()).show() /Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py:239: FutureWarning: is_categorical_dtype is deprecated and will be removed in a future version. Use isinstance(dtype, CategoricalDtype) instead /Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py:239: FutureWarning: is_categorical_dtype is deprecated and will be removed in a future version. Use isinstance(dtype, CategoricalDtype) instead /Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py:239: FutureWarning: is_categorical_dtype is deprecated and will be removed in a future version. Use isinstance(dtype, CategoricalDtype) instead /Users/ruifeng.zheng/Dev/spark/python/lib/pyspark.zip/pyspark/sql/pandas/serializers.py:239: FutureWarning: is_categorical_dtype is deprecated and will be removed in a future version. Use isinstance(dtype, CategoricalDtype) instead +---+---+ |num| p| +---+---+ | 3| 0| | 3| 0| | 3| 0| | 3| 0| | 4| 0| | 3| 1| | 3| 1| | 3| 1| | 3| 1| | 4| 1| | 3| 2| | 3| 2| | 3| 2| | 3| 2| | 4| 2| | 0| 3| | 1| 3| | 2| 3| | 3| 3| | 4| 3| +---+---+ ``` ### Was this patch authored or co-authored using generative AI tooling? No Closes #42970 from zhengruifeng/py_test_mode. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- .../pandas/tests/computation/test_compute.py | 44 +++++++++++++++++----- .../connect/computation/test_parity_compute.py | 30 +-------------- 2 files changed, 35 insertions(+), 39 deletions(-) diff --git a/python/pyspark/pandas/tests/computation/test_compute.py b/python/pyspark/pandas/tests/computation/test_compute.py index 9a29cb236a8..dc145601fca 100644 --- a/python/pyspark/pandas/tests/computation/test_compute.py +++ b/python/pyspark/pandas/tests/computation/test_compute.py @@ -15,11 +15,11 @@ # limitations under the License. # import unittest -from distutils.version import LooseVersion import numpy as np import pandas as pd +from pyspark.sql import functions as sf from pyspark import pandas as ps from pyspark.testing.pandasutils import ComparisonTestBase from pyspark.testing.sqlutils import SQLTestUtils @@ -101,16 +101,40 @@ class FrameComputeMixin: with self.assertRaises(ValueError): psdf.mode(axis=2) - def f(index, iterator): - return ["3", "3", "3", "3", "4"] if index == 3 else ["0", "1", "2", "3", "4"] + def func(iterator): + for pdf in iterator: + if len(pdf) > 0: + if pdf["partition"][0] == 3: + yield pd.DataFrame( + { + "num": [ + "3", + "3", + "3", + "3", + "4", + ] + } + ) + else: + yield pd.DataFrame( + { + "num": [ + "0", + "1", + "2", + "3", + "4", + ] + } + ) + + df = ( + self.spark.range(0, 4, 1, 4) + .select(sf.spark_partition_id().alias("partition")) + .mapInPandas(func, "num string") + ) - rdd = self.spark.sparkContext.parallelize( - [ - 1, - ], - 4, - ).mapPartitionsWithIndex(f) - df = self.spark.createDataFrame(rdd, schema="string") psdf = df.pandas_api() self.assert_eq(psdf.mode(), psdf._to_pandas().mode()) diff --git a/python/pyspark/pandas/tests/connect/computation/test_parity_compute.py b/python/pyspark/pandas/tests/connect/computation/test_parity_compute.py index ba19f8a611b..3e76cb621f0 100644 --- a/python/pyspark/pandas/tests/connect/computation/test_parity_compute.py +++ b/python/pyspark/pandas/tests/connect/computation/test_parity_compute.py @@ -15,42 +15,14 @@ # limitations under the License. # import unittest -import pandas as pd -import numpy as np -from pyspark import pandas as ps from pyspark.pandas.tests.computation.test_compute import FrameComputeMixin from pyspark.testing.connectutils import ReusedConnectTestCase from pyspark.testing.pandasutils import PandasOnSparkTestUtils class FrameParityComputeTests(FrameComputeMixin, PandasOnSparkTestUtils, ReusedConnectTestCase): - @property - def psdf(self): - return ps.from_pandas(self.pdf) - - def test_mode(self): - pdf = pd.DataFrame( - { - "A": [1, 2, None, 4, 5, 4, 2], - "B": [-0.1, 0.2, -0.3, np.nan, 0.5, -0.1, -0.1], - "C": ["d", "b", "c", "c", "e", "a", "a"], - "D": [np.nan, np.nan, np.nan, np.nan, 0.1, -0.1, -0.1], - "E": [np.nan, np.nan, np.nan, np.nan, np.nan, np.nan, np.nan], - } - ) - psdf = ps.from_pandas(pdf) - - self.assert_eq(psdf.mode(), pdf.mode()) - self.assert_eq(psdf.mode(numeric_only=True), pdf.mode(numeric_only=True)) - self.assert_eq(psdf.mode(dropna=False), pdf.mode(dropna=False)) - - # dataframe with single column - for c in ["A", "B", "C", "D", "E"]: - self.assert_eq(psdf[[c]].mode(), pdf[[c]].mode()) - - with self.assertRaises(ValueError): - psdf.mode(axis=2) + pass if __name__ == "__main__": --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org