This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new aa564b95bb3 [SPARK-41225][CONNECT][PYTHON] Disable unsupported functions aa564b95bb3 is described below commit aa564b95bb396b025997651b3fd0ebca418ee78e Author: Martin Grund <martin.gr...@databricks.com> AuthorDate: Thu Nov 24 09:07:29 2022 +0800 [SPARK-41225][CONNECT][PYTHON] Disable unsupported functions ### What changes were proposed in this pull request? For the better user experience we need to properly throw exceptions for functions that are not supported in Spark Connect. ### Why are the changes needed? User Experience ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT Closes #38762 from grundprinzip/SPARK-41225. Authored-by: Martin Grund <martin.gr...@databricks.com> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- python/pyspark/sql/connect/dataframe.py | 57 ++++++++++++++++++++++ .../sql/tests/connect/test_connect_plan_only.py | 19 ++++++++ 2 files changed, 76 insertions(+) diff --git a/python/pyspark/sql/connect/dataframe.py b/python/pyspark/sql/connect/dataframe.py index 82dc1f6a558..7b42bdf747b 100644 --- a/python/pyspark/sql/connect/dataframe.py +++ b/python/pyspark/sql/connect/dataframe.py @@ -975,6 +975,63 @@ class DataFrame(object): ).command(session=self._session) self._session.execute_command(command) + def rdd(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("RDD Support for Spark Connect is not implemented.") + + def unpersist(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("unpersist() is not implemented.") + + def cache(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("cache() is not implemented.") + + def persist(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("persist() is not implemented.") + + def withWatermark(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("withWatermark() is not implemented.") + + def observe(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("observe() is not implemented.") + + def foreach(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("foreach() is not implemented.") + + def foreachPartition(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("foreachPartition() is not implemented.") + + def toLocalIterator(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("toLocalIterator() is not implemented.") + + def checkpoint(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("checkpoint() is not implemented.") + + def localCheckpoint(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("localCheckpoint() is not implemented.") + + def to_pandas_on_spark(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("to_pandas_on_spark() is not implemented.") + + def pandas_api(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("pandas_api() is not implemented.") + + def registerTempTable(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("registerTempTable() is not implemented.") + + def storageLevel(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("storageLevel() is not implemented.") + + def mapInPandas(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("mapInPandas() is not implemented.") + + def mapInArrow(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("mapInArrow() is not implemented.") + + def writeStream(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("writeStream() is not implemented.") + + def toJSON(self, *args: Any, **kwargs: Any) -> None: + raise NotImplementedError("toJSON() is not implemented.") + class DataFrameNaFunctions: """Functionality for working with missing data in :class:`DataFrame`. diff --git a/python/pyspark/sql/tests/connect/test_connect_plan_only.py b/python/pyspark/sql/tests/connect/test_connect_plan_only.py index 26a8da46ac3..4d9ee04bf3d 100644 --- a/python/pyspark/sql/tests/connect/test_connect_plan_only.py +++ b/python/pyspark/sql/tests/connect/test_connect_plan_only.py @@ -313,6 +313,25 @@ class SparkConnectTestsPlanOnly(PlanOnlyTestFixture): df.repartition(-1)._plan.to_proto(self.connect) self.assertTrue("numPartitions must be positive" in str(context.exception)) + def test_unsupported_functions(self): + # SPARK-41225: Disable unsupported functions. + df = self.connect.readTable(table_name=self.tbl_name) + for f in ( + "rdd", + "unpersist", + "cache", + "persist", + "withWatermark", + "observe", + "foreach", + "foreachPartition", + "toLocalIterator", + "checkpoint", + "localCheckpoint", + ): + with self.assertRaises(NotImplementedError): + getattr(df, f)() + if __name__ == "__main__": from pyspark.sql.tests.connect.test_connect_plan_only import * # noqa: F401 --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org