This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 13c8c8123b8 [SPARK-44731][PYTHON][CONNECT] Make TimestampNTZ works with literals in Python Spark Connect 13c8c8123b8 is described below commit 13c8c8123b875f2fa4fa75caeaa74ce0a68b88ac Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Fri Aug 11 17:35:00 2023 +0800 [SPARK-44731][PYTHON][CONNECT] Make TimestampNTZ works with literals in Python Spark Connect ### What changes were proposed in this pull request? This PR proposes: - Share the namespaces for `to_timestamp_ntz`, `to_timestamp_ltz` and `to_unix_timestamp` in Spark Connect. They were missed. - Adds the support of `TimestampNTZ` for literal handling in Python Spark Connect (by respecting `spark.sql.timestampType`). ### Why are the changes needed? For feature parity, and respect timestamp ntz in resampling in pandas API on Spark ### Does this PR introduce _any_ user-facing change? Yes, this virtually fixes the same bug: https://github.com/apache/spark/pull/42392 in Spark Connect with Python. ### How was this patch tested? Unittests reenabled. Closes #42445 from HyukjinKwon/SPARK-44731. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> (cherry picked from commit 73b0376ec1527197c215495c7957efbe9d3bfab7) Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- python/pyspark/pandas/tests/connect/test_parity_resample.py | 4 +--- python/pyspark/sql/connect/expressions.py | 3 +++ python/pyspark/sql/functions.py | 12 ++++++++++++ python/pyspark/sql/utils.py | 13 +++++++++++-- 4 files changed, 27 insertions(+), 5 deletions(-) diff --git a/python/pyspark/pandas/tests/connect/test_parity_resample.py b/python/pyspark/pandas/tests/connect/test_parity_resample.py index d5c901f113a..caca2f957b5 100644 --- a/python/pyspark/pandas/tests/connect/test_parity_resample.py +++ b/python/pyspark/pandas/tests/connect/test_parity_resample.py @@ -30,9 +30,7 @@ class ResampleParityTests( class ResampleWithTimezoneTests( ResampleWithTimezoneMixin, PandasOnSparkTestUtils, TestUtils, ReusedConnectTestCase ): - @unittest.skip("SPARK-44731: Support 'spark.sql.timestampType' in Python Spark Connect client") - def test_series_resample_with_timezone(self): - super().test_series_resample_with_timezone() + pass if __name__ == "__main__": diff --git a/python/pyspark/sql/connect/expressions.py b/python/pyspark/sql/connect/expressions.py index 44e6e174f70..d0a9b1d69ae 100644 --- a/python/pyspark/sql/connect/expressions.py +++ b/python/pyspark/sql/connect/expressions.py @@ -15,6 +15,7 @@ # limitations under the License. # from pyspark.sql.connect.utils import check_dependencies +from pyspark.sql.utils import is_timestamp_ntz_preferred check_dependencies(__name__) @@ -295,6 +296,8 @@ class LiteralExpression(Expression): return StringType() elif isinstance(value, decimal.Decimal): return DecimalType() + elif isinstance(value, datetime.datetime) and is_timestamp_ntz_preferred(): + return TimestampNTZType() elif isinstance(value, datetime.datetime): return TimestampType() elif isinstance(value, datetime.date): diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index b45e1daa0fd..0a8eccacbc0 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -7757,6 +7757,7 @@ def session_window(timeColumn: "ColumnOrName", gapDuration: Union[Column, str]) return _invoke_function("session_window", time_col, gap_duration) +@try_remote_functions def to_unix_timestamp( timestamp: "ColumnOrName", format: Optional["ColumnOrName"] = None, @@ -7766,6 +7767,9 @@ def to_unix_timestamp( .. versionadded:: 3.5.0 + .. versionchanged:: 3.5.0 + Supports Spark Connect. + Parameters ---------- timestamp : :class:`~pyspark.sql.Column` or str @@ -7793,6 +7797,7 @@ def to_unix_timestamp( return _invoke_function_over_columns("to_unix_timestamp", timestamp) +@try_remote_functions def to_timestamp_ltz( timestamp: "ColumnOrName", format: Optional["ColumnOrName"] = None, @@ -7803,6 +7808,9 @@ def to_timestamp_ltz( .. versionadded:: 3.5.0 + .. versionchanged:: 3.5.0 + Supports Spark Connect. + Parameters ---------- timestamp : :class:`~pyspark.sql.Column` or str @@ -7830,6 +7838,7 @@ def to_timestamp_ltz( return _invoke_function_over_columns("to_timestamp_ltz", timestamp) +@try_remote_functions def to_timestamp_ntz( timestamp: "ColumnOrName", format: Optional["ColumnOrName"] = None, @@ -7840,6 +7849,9 @@ def to_timestamp_ntz( .. versionadded:: 3.5.0 + .. versionchanged:: 3.5.0 + Supports Spark Connect. + Parameters ---------- timestamp : :class:`~pyspark.sql.Column` or str diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index b72d8d9a7c8..61486d6a8c6 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -140,8 +140,17 @@ def is_timestamp_ntz_preferred() -> bool: """ Return a bool if TimestampNTZType is preferred according to the SQL configuration set. """ - jvm = SparkContext._jvm - return jvm is not None and jvm.PythonSQLUtils.isTimestampNTZPreferred() + if is_remote(): + from pyspark.sql.connect.session import SparkSession as ConnectSparkSession + + session = ConnectSparkSession.getActiveSession() + if session is None: + return False + else: + return session.conf.get("spark.sql.timestampType", None) == "TIMESTAMP_NTZ" + else: + jvm = SparkContext._jvm + return jvm is not None and jvm.PythonSQLUtils.isTimestampNTZPreferred() def is_remote() -> bool: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org