This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8e1fa5616068 [SPARK-46620][PS][CONNECT] Introduce a basic fallback 
mechanism for frame methods
8e1fa5616068 is described below

commit 8e1fa56160686219039d4cd24db867b982c3af25
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Thu Jan 25 17:26:26 2024 +0800

    [SPARK-46620][PS][CONNECT] Introduce a basic fallback mechanism for frame 
methods
    
    ### What changes were proposed in this pull request?
    1, Introduce a basic fallback mechanism for frame methods, with a new 
option `compute.pandas_fallback` default false;
    2, implement `Frame.asfreq` and `Frame.asof`
    
    ### Why are the changes needed?
    for pandas parity
    
    ### Does this PR introduce _any_ user-facing change?
    yes
    
    ```
    In [1]: import pyspark.pandas as ps
       ...: import pandas as pd
       ...:
       ...: index = pd.date_range('1/1/2000', periods=4, freq='min')
       ...: series = pd.Series([0.0, None, 2.0, 3.0], index=index)
       ...: pdf = pd.DataFrame({'s': series})
       ...: psdf = ps.from_pandas(pdf)
    
    In [2]: psdf.asfreq(freq='30s')
    ---------------------------------------------------------------------------
    PandasNotImplementedError                 Traceback (most recent call last)
    Cell In[2], line 1
    ----> 1 psdf.asfreq(freq='30s')
    
    File ~/Dev/spark/python/pyspark/pandas/missing/__init__.py:23, in 
unsupported_function.<locals>.unsupported_function(*args, **kwargs)
         22 def unsupported_function(*args, **kwargs):
    ---> 23     raise PandasNotImplementedError(
         24         class_name=class_name, method_name=method_name, 
reason=reason
         25     )
    
    PandasNotImplementedError: The method `pd.DataFrame.asfreq()` is not 
implemented yet.
    
    In [3]: ps.set_option("compute.pandas_fallback", True)
    
    In [4]: psdf.asfreq(freq='30s')
    /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:1015: 
PandasAPIOnSparkAdviceWarning: `asfreq` is executed in fallback mode. It loads 
partial data into the driver's memory to infer the schema, and loads all data 
into one executor's memory to compute. It should only be used if the pandas 
DataFrame is expected to be small.
      warnings.warn(message, PandasAPIOnSparkAdviceWarning)
    /Users/ruifeng.zheng/Dev/spark/python/pyspark/pandas/utils.py:1015: 
PandasAPIOnSparkAdviceWarning: If the type hints is not specified for 
`groupby.apply`, it is expensive to infer the data type internally.
      warnings.warn(message, PandasAPIOnSparkAdviceWarning)
    Out[4]:
                           s
    2000-01-01 00:00:00  0.0
    2000-01-01 00:00:30  NaN
    2000-01-01 00:01:00  NaN
    2000-01-01 00:01:30  NaN
    2000-01-01 00:02:00  2.0
    2000-01-01 00:02:30  NaN
    2000-01-01 00:03:00  3.0
    ```
    
    ### How was this patch tested?
    added ut
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #44869 from zhengruifeng/ps_df_fallback.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 dev/sparktestsupport/modules.py                    |   6 ++
 .../source/user_guide/pandas_on_spark/options.rst  |   2 +
 python/pyspark/pandas/config.py                    |   9 ++
 python/pyspark/pandas/frame.py                     |  43 +++++++++
 .../tests/connect/frame/test_parity_asfreq.py      |  41 ++++++++
 .../pandas/tests/connect/frame/test_parity_asof.py |  41 ++++++++
 python/pyspark/pandas/tests/frame/test_asfreq.py   |  87 +++++++++++++++++
 python/pyspark/pandas/tests/frame/test_asof.py     | 106 +++++++++++++++++++++
 8 files changed, 335 insertions(+)

diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py
index 0500bf38ea8e..be3e798b0779 100644
--- a/dev/sparktestsupport/modules.py
+++ b/dev/sparktestsupport/modules.py
@@ -819,6 +819,9 @@ pyspark_pandas = Module(
         "pyspark.pandas.tests.io.test_dataframe_conversion",
         "pyspark.pandas.tests.io.test_dataframe_spark_io",
         "pyspark.pandas.tests.io.test_series_conversion",
+        # fallback
+        "pyspark.pandas.tests.frame.test_asfreq",
+        "pyspark.pandas.tests.frame.test_asof",
     ],
     excluded_python_implementations=[
         "PyPy"  # Skip these tests under PyPy since they require numpy, 
pandas, and pyarrow and
@@ -1200,6 +1203,9 @@ pyspark_pandas_connect_part1 = Module(
         "pyspark.pandas.tests.connect.reshape.test_parity_get_dummies_object",
         "pyspark.pandas.tests.connect.reshape.test_parity_get_dummies_prefix",
         "pyspark.pandas.tests.connect.reshape.test_parity_merge_asof",
+        # fallback
+        "pyspark.pandas.tests.connect.frame.test_parity_asfreq",
+        "pyspark.pandas.tests.connect.frame.test_parity_asof",
     ],
     excluded_python_implementations=[
         "PyPy"  # Skip these tests under PyPy since they require numpy, 
pandas, and pyarrow and
diff --git a/python/docs/source/user_guide/pandas_on_spark/options.rst 
b/python/docs/source/user_guide/pandas_on_spark/options.rst
index 82b1a921460a..e8fffea7e33b 100644
--- a/python/docs/source/user_guide/pandas_on_spark/options.rst
+++ b/python/docs/source/user_guide/pandas_on_spark/options.rst
@@ -304,6 +304,8 @@ compute.isin_limit              80                      
'compute.isin_limit' set
                                                         'Column.isin(list)'. 
If the length of the ‘list’ is
                                                         above the limit, 
broadcast join is used instead for
                                                         better performance.
+compute.pandas_fallback         False                   
'compute.pandas_fallback' sets whether or not to
+                                                        fallback automatically 
to Pandas' implementation.
 plotting.max_rows               1000                    'plotting.max_rows' 
sets the visual limit on top-n-
                                                         based plots such as 
`plot.bar` and `plot.pie`. If it
                                                         is set to 1000, the 
first 1000 data points will be
diff --git a/python/pyspark/pandas/config.py b/python/pyspark/pandas/config.py
index 2228e41c1dfb..a66cb5a16d2d 100644
--- a/python/pyspark/pandas/config.py
+++ b/python/pyspark/pandas/config.py
@@ -259,6 +259,15 @@ _options: List[Option] = [
             "'compute.isin_limit' should be greater than or equal to 0.",
         ),
     ),
+    Option(
+        key="compute.pandas_fallback",
+        doc=(
+            "'compute.pandas_fallback' sets whether or not to fallback 
automatically "
+            "to Pandas' implementation."
+        ),
+        default=False,
+        types=bool,
+    ),
     Option(
         key="plotting.max_rows",
         doc=(
diff --git a/python/pyspark/pandas/frame.py b/python/pyspark/pandas/frame.py
index a7edac5509b1..f7b2df30533c 100644
--- a/python/pyspark/pandas/frame.py
+++ b/python/pyspark/pandas/frame.py
@@ -13446,10 +13446,53 @@ defaultdict(<class 'list'>, {'col..., 'col...})]
 
         return psdf
 
+    def _fall_back_frame(self, method: str) -> Callable:
+        def _internal_fall_back_function(*inputs: Any, **kwargs: Any) -> 
"DataFrame":
+            log_advice(
+                f"`{method}` is executed in fallback mode. It loads partial 
data into the "
+                f"driver's memory to infer the schema, and loads all data into 
one executor's "
+                f"memory to compute. It should only be used if the pandas 
DataFrame is expected "
+                f"to be small."
+            )
+
+            input_df = self.copy()
+            index_names = input_df.index.names
+
+            sdf = input_df._internal.spark_frame
+            tmp_agg_column_name = verify_temp_column_name(
+                sdf, f"__tmp_aggregate_col_for_frame_{method}__"
+            )
+            input_df[tmp_agg_column_name] = 0
+
+            tmp_idx_column_name = verify_temp_column_name(
+                sdf, f"__tmp_index_col_for_frame_{method}__"
+            )
+            input_df[tmp_idx_column_name] = input_df.index
+
+            # TODO(SPARK-46859): specify the return type if possible
+            def compute_function(pdf: pd.DataFrame):  # type: 
ignore[no-untyped-def]
+                pdf = pdf.drop(columns=[tmp_agg_column_name])
+                pdf = pdf.set_index(tmp_idx_column_name, drop=True)
+                pdf = pdf.sort_index()
+                pdf = getattr(pdf, method)(*inputs, **kwargs)
+                pdf[tmp_idx_column_name] = pdf.index
+                return pdf.reset_index(drop=True)
+
+            output_df = 
input_df.groupby(tmp_agg_column_name).apply(compute_function)
+            output_df = output_df.set_index(tmp_idx_column_name)
+            output_df.index.names = index_names
+
+            return output_df
+
+        return _internal_fall_back_function
+
     def __getattr__(self, key: str) -> Any:
         if key.startswith("__"):
             raise AttributeError(key)
         if hasattr(MissingPandasLikeDataFrame, key):
+            if key in ["asfreq", "asof"] and 
get_option("compute.pandas_fallback"):
+                return self._fall_back_frame(key)
+
             property_or_func = getattr(MissingPandasLikeDataFrame, key)
             if isinstance(property_or_func, property):
                 return property_or_func.fget(self)
diff --git a/python/pyspark/pandas/tests/connect/frame/test_parity_asfreq.py 
b/python/pyspark/pandas/tests/connect/frame/test_parity_asfreq.py
new file mode 100644
index 000000000000..ee65ea39aefe
--- /dev/null
+++ b/python/pyspark/pandas/tests/connect/frame/test_parity_asfreq.py
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from pyspark.pandas.tests.frame.test_asfreq import AsFreqMixin
+from pyspark.testing.connectutils import ReusedConnectTestCase
+from pyspark.testing.pandasutils import PandasOnSparkTestUtils
+
+
+class AsFreqParityTests(
+    AsFreqMixin,
+    PandasOnSparkTestUtils,
+    ReusedConnectTestCase,
+):
+    pass
+
+
+if __name__ == "__main__":
+    from pyspark.pandas.tests.connect.frame.test_parity_asfreq import *  # 
noqa: F401
+
+    try:
+        import xmlrunner  # type: ignore[import]
+
+        testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", 
verbosity=2)
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/python/pyspark/pandas/tests/connect/frame/test_parity_asof.py 
b/python/pyspark/pandas/tests/connect/frame/test_parity_asof.py
new file mode 100644
index 000000000000..0a0fab4566c2
--- /dev/null
+++ b/python/pyspark/pandas/tests/connect/frame/test_parity_asof.py
@@ -0,0 +1,41 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from pyspark.pandas.tests.frame.test_asof import AsOfMixin
+from pyspark.testing.connectutils import ReusedConnectTestCase
+from pyspark.testing.pandasutils import PandasOnSparkTestUtils
+
+
+class AsOfParityTests(
+    AsOfMixin,
+    PandasOnSparkTestUtils,
+    ReusedConnectTestCase,
+):
+    pass
+
+
+if __name__ == "__main__":
+    from pyspark.pandas.tests.connect.frame.test_parity_asof import *  # noqa: 
F401
+
+    try:
+        import xmlrunner  # type: ignore[import]
+
+        testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", 
verbosity=2)
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/python/pyspark/pandas/tests/frame/test_asfreq.py 
b/python/pyspark/pandas/tests/frame/test_asfreq.py
new file mode 100644
index 000000000000..81dda8adf374
--- /dev/null
+++ b/python/pyspark/pandas/tests/frame/test_asfreq.py
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+import pandas as pd
+
+import pyspark.pandas as ps
+from pyspark.pandas.exceptions import PandasNotImplementedError
+from pyspark.testing.pandasutils import PandasOnSparkTestCase, TestUtils
+
+
+class AsFreqMixin:
+    @property
+    def pdf(self):
+        index = pd.date_range("1/1/2000", periods=4, freq="min")
+        series = pd.Series([0.0, None, 2.0, 3.0], index=index)
+        return pd.DataFrame({"s": series})
+
+    @property
+    def psdf(self):
+        return ps.from_pandas(self.pdf)
+
+    def test_disabled(self):
+        with self.assertRaises(PandasNotImplementedError):
+            self.psdf.asfreq(freq="30s")
+
+    def test_fallback(self):
+        ps.set_option("compute.pandas_fallback", True)
+
+        self.assert_eq(self.pdf.asfreq(freq="30s"), 
self.psdf.asfreq(freq="30s"))
+        self.assert_eq(
+            self.pdf.asfreq(freq="30s", fill_value=9.0),
+            self.psdf.asfreq(freq="30s", fill_value=9.0),
+        )
+        self.assert_eq(
+            self.pdf.asfreq(freq="30s", method="bfill"),
+            self.psdf.asfreq(freq="30s", method="bfill"),
+        )
+
+        # test with schema infered from partial dataset, len(pdf)==4
+        ps.set_option("compute.shortcut_limit", 2)
+        self.assert_eq(self.pdf.asfreq(freq="30s"), 
self.psdf.asfreq(freq="30s"))
+        self.assert_eq(
+            self.pdf.asfreq(freq="30s", fill_value=9.0),
+            self.psdf.asfreq(freq="30s", fill_value=9.0),
+        )
+        self.assert_eq(
+            self.pdf.asfreq(freq="30s", method="bfill"),
+            self.psdf.asfreq(freq="30s", method="bfill"),
+        )
+
+        ps.reset_option("compute.shortcut_limit")
+        ps.reset_option("compute.pandas_fallback")
+
+
+class AsFreqTests(
+    AsFreqMixin,
+    PandasOnSparkTestCase,
+    TestUtils,
+):
+    pass
+
+
+if __name__ == "__main__":
+    from pyspark.pandas.tests.frame.test_asfreq import *  # noqa: F401
+
+    try:
+        import xmlrunner
+
+        testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", 
verbosity=2)
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)
diff --git a/python/pyspark/pandas/tests/frame/test_asof.py 
b/python/pyspark/pandas/tests/frame/test_asof.py
new file mode 100644
index 000000000000..8b7ac513eb2a
--- /dev/null
+++ b/python/pyspark/pandas/tests/frame/test_asof.py
@@ -0,0 +1,106 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+import pandas as pd
+
+import pyspark.pandas as ps
+from pyspark.pandas.exceptions import PandasNotImplementedError
+from pyspark.testing.pandasutils import PandasOnSparkTestCase, TestUtils
+
+
+class AsOfMixin:
+    @property
+    def pdf(self):
+        return pd.DataFrame(
+            {"a": [10.0, 20.0, 30.0, 40.0, 50.0], "b": [None, None, None, 
None, 500]},
+            index=pd.DatetimeIndex(
+                [
+                    "2018-02-27 09:01:00",
+                    "2018-02-27 09:02:00",
+                    "2018-02-27 09:03:00",
+                    "2018-02-27 09:04:00",
+                    "2018-02-27 09:05:00",
+                ]
+            ),
+        )
+
+    @property
+    def psdf(self):
+        return ps.from_pandas(self.pdf)
+
+    def test_disabled(self):
+        with self.assertRaises(PandasNotImplementedError):
+            self.psdf.asof(pd.DatetimeIndex(["2018-02-27 09:03:30", 
"2018-02-27 09:04:30"]))
+
+    def test_fallback(self):
+        ps.set_option("compute.pandas_fallback", True)
+
+        self.assert_eq(
+            self.pdf.asof(pd.DatetimeIndex(["2018-02-27 09:03:30", "2018-02-27 
09:04:30"])),
+            self.psdf.asof(pd.DatetimeIndex(["2018-02-27 09:03:30", 
"2018-02-27 09:04:30"])),
+        )
+        self.assert_eq(
+            self.pdf.asof(
+                pd.DatetimeIndex(["2018-02-27 09:03:30", "2018-02-27 
09:04:30"]),
+                subset=["a"],
+            ),
+            self.psdf.asof(
+                pd.DatetimeIndex(["2018-02-27 09:03:30", "2018-02-27 
09:04:30"]),
+                subset=["a"],
+            ),
+        )
+
+        # test with schema infered from partial dataset, len(pdf)==5
+        ps.set_option("compute.shortcut_limit", 2)
+        self.assert_eq(
+            self.pdf.asof(pd.DatetimeIndex(["2018-02-27 09:03:30", "2018-02-27 
09:04:30"])),
+            self.psdf.asof(pd.DatetimeIndex(["2018-02-27 09:03:30", 
"2018-02-27 09:04:30"])),
+        )
+        self.assert_eq(
+            self.pdf.asof(
+                pd.DatetimeIndex(["2018-02-27 09:03:30", "2018-02-27 
09:04:30"]),
+                subset=["a"],
+            ),
+            self.psdf.asof(
+                pd.DatetimeIndex(["2018-02-27 09:03:30", "2018-02-27 
09:04:30"]),
+                subset=["a"],
+            ),
+        )
+
+        ps.reset_option("compute.shortcut_limit")
+        ps.reset_option("compute.pandas_fallback")
+
+
+class AsFreqTests(
+    AsOfMixin,
+    PandasOnSparkTestCase,
+    TestUtils,
+):
+    pass
+
+
+if __name__ == "__main__":
+    from pyspark.pandas.tests.frame.test_asof import *  # noqa: F401
+
+    try:
+        import xmlrunner
+
+        testRunner = xmlrunner.XMLTestRunner(output="target/test-reports", 
verbosity=2)
+    except ImportError:
+        testRunner = None
+    unittest.main(testRunner=testRunner, verbosity=2)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to