This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new aa564b95bb3 [SPARK-41225][CONNECT][PYTHON] Disable unsupported 
functions
aa564b95bb3 is described below

commit aa564b95bb396b025997651b3fd0ebca418ee78e
Author: Martin Grund <martin.gr...@databricks.com>
AuthorDate: Thu Nov 24 09:07:29 2022 +0800

    [SPARK-41225][CONNECT][PYTHON] Disable unsupported functions
    
    ### What changes were proposed in this pull request?
    For the better user experience we need to properly throw exceptions for 
functions that are not supported in Spark Connect.
    
    ### Why are the changes needed?
    User Experience
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
    
    Closes #38762 from grundprinzip/SPARK-41225.
    
    Authored-by: Martin Grund <martin.gr...@databricks.com>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 python/pyspark/sql/connect/dataframe.py            | 57 ++++++++++++++++++++++
 .../sql/tests/connect/test_connect_plan_only.py    | 19 ++++++++
 2 files changed, 76 insertions(+)

diff --git a/python/pyspark/sql/connect/dataframe.py 
b/python/pyspark/sql/connect/dataframe.py
index 82dc1f6a558..7b42bdf747b 100644
--- a/python/pyspark/sql/connect/dataframe.py
+++ b/python/pyspark/sql/connect/dataframe.py
@@ -975,6 +975,63 @@ class DataFrame(object):
         ).command(session=self._session)
         self._session.execute_command(command)
 
+    def rdd(self, *args: Any, **kwargs: Any) -> None:
+        raise NotImplementedError("RDD Support for Spark Connect is not 
implemented.")
+
+    def unpersist(self, *args: Any, **kwargs: Any) -> None:
+        raise NotImplementedError("unpersist() is not implemented.")
+
+    def cache(self, *args: Any, **kwargs: Any) -> None:
+        raise NotImplementedError("cache() is not implemented.")
+
+    def persist(self, *args: Any, **kwargs: Any) -> None:
+        raise NotImplementedError("persist() is not implemented.")
+
+    def withWatermark(self, *args: Any, **kwargs: Any) -> None:
+        raise NotImplementedError("withWatermark() is not implemented.")
+
+    def observe(self, *args: Any, **kwargs: Any) -> None:
+        raise NotImplementedError("observe() is not implemented.")
+
+    def foreach(self, *args: Any, **kwargs: Any) -> None:
+        raise NotImplementedError("foreach() is not implemented.")
+
+    def foreachPartition(self, *args: Any, **kwargs: Any) -> None:
+        raise NotImplementedError("foreachPartition() is not implemented.")
+
+    def toLocalIterator(self, *args: Any, **kwargs: Any) -> None:
+        raise NotImplementedError("toLocalIterator() is not implemented.")
+
+    def checkpoint(self, *args: Any, **kwargs: Any) -> None:
+        raise NotImplementedError("checkpoint() is not implemented.")
+
+    def localCheckpoint(self, *args: Any, **kwargs: Any) -> None:
+        raise NotImplementedError("localCheckpoint() is not implemented.")
+
+    def to_pandas_on_spark(self, *args: Any, **kwargs: Any) -> None:
+        raise NotImplementedError("to_pandas_on_spark() is not implemented.")
+
+    def pandas_api(self, *args: Any, **kwargs: Any) -> None:
+        raise NotImplementedError("pandas_api() is not implemented.")
+
+    def registerTempTable(self, *args: Any, **kwargs: Any) -> None:
+        raise NotImplementedError("registerTempTable() is not implemented.")
+
+    def storageLevel(self, *args: Any, **kwargs: Any) -> None:
+        raise NotImplementedError("storageLevel() is not implemented.")
+
+    def mapInPandas(self, *args: Any, **kwargs: Any) -> None:
+        raise NotImplementedError("mapInPandas() is not implemented.")
+
+    def mapInArrow(self, *args: Any, **kwargs: Any) -> None:
+        raise NotImplementedError("mapInArrow() is not implemented.")
+
+    def writeStream(self, *args: Any, **kwargs: Any) -> None:
+        raise NotImplementedError("writeStream() is not implemented.")
+
+    def toJSON(self, *args: Any, **kwargs: Any) -> None:
+        raise NotImplementedError("toJSON() is not implemented.")
+
 
 class DataFrameNaFunctions:
     """Functionality for working with missing data in :class:`DataFrame`.
diff --git a/python/pyspark/sql/tests/connect/test_connect_plan_only.py 
b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
index 26a8da46ac3..4d9ee04bf3d 100644
--- a/python/pyspark/sql/tests/connect/test_connect_plan_only.py
+++ b/python/pyspark/sql/tests/connect/test_connect_plan_only.py
@@ -313,6 +313,25 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture):
             df.repartition(-1)._plan.to_proto(self.connect)
         self.assertTrue("numPartitions must be positive" in 
str(context.exception))
 
+    def test_unsupported_functions(self):
+        # SPARK-41225: Disable unsupported functions.
+        df = self.connect.readTable(table_name=self.tbl_name)
+        for f in (
+            "rdd",
+            "unpersist",
+            "cache",
+            "persist",
+            "withWatermark",
+            "observe",
+            "foreach",
+            "foreachPartition",
+            "toLocalIterator",
+            "checkpoint",
+            "localCheckpoint",
+        ):
+            with self.assertRaises(NotImplementedError):
+                getattr(df, f)()
+
 
 if __name__ == "__main__":
     from pyspark.sql.tests.connect.test_connect_plan_only import *  # noqa: 
F401


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to