This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 70becf29070 [SPARK-39155][PYTHON] Access to JVM through passed-in GatewayClient during type conversion 70becf29070 is described below commit 70becf290700e88c7be248e4277421dd17f3af4b Author: Xinrong Meng <xinrong.m...@databricks.com> AuthorDate: Thu May 12 12:22:11 2022 +0900 [SPARK-39155][PYTHON] Access to JVM through passed-in GatewayClient during type conversion ### What changes were proposed in this pull request? Access to JVM through passed-in GatewayClient during type conversion. ### Why are the changes needed? In customized type converters, we may utilize the passed-in GatewayClient to access JVM, rather than rely on the `SparkContext._jvm`. That's [how](https://github.com/py4j/py4j/blob/master/py4j-python/src/py4j/java_collections.py#L508) Py4J explicit converters access JVM. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #36504 from xinrong-databricks/gateway_client_jvm. Authored-by: Xinrong Meng <xinrong.m...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 92fcf214c107358c1a70566b644cec2d35c096c0) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/types.py | 30 ++++++++++++++---------------- 1 file changed, 14 insertions(+), 16 deletions(-) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index 2a41508d634..123fd628980 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -44,7 +44,7 @@ from typing import ( ) from py4j.protocol import register_input_converter -from py4j.java_gateway import JavaClass, JavaGateway, JavaObject +from py4j.java_gateway import GatewayClient, JavaClass, JavaObject from pyspark.serializers import CloudPickleSerializer @@ -1929,7 +1929,7 @@ class DateConverter: def can_convert(self, obj: Any) -> bool: return isinstance(obj, datetime.date) - def convert(self, obj: datetime.date, gateway_client: JavaGateway) -> JavaObject: + def convert(self, obj: datetime.date, gateway_client: GatewayClient) -> JavaObject: Date = JavaClass("java.sql.Date", gateway_client) return Date.valueOf(obj.strftime("%Y-%m-%d")) @@ -1938,7 +1938,7 @@ class DatetimeConverter: def can_convert(self, obj: Any) -> bool: return isinstance(obj, datetime.datetime) - def convert(self, obj: datetime.datetime, gateway_client: JavaGateway) -> JavaObject: + def convert(self, obj: datetime.datetime, gateway_client: GatewayClient) -> JavaObject: Timestamp = JavaClass("java.sql.Timestamp", gateway_client) seconds = ( calendar.timegm(obj.utctimetuple()) if obj.tzinfo else time.mktime(obj.timetuple()) @@ -1958,27 +1958,25 @@ class DatetimeNTZConverter: and is_timestamp_ntz_preferred() ) - def convert(self, obj: datetime.datetime, gateway_client: JavaGateway) -> JavaObject: - from pyspark import SparkContext - + def convert(self, obj: datetime.datetime, gateway_client: GatewayClient) -> JavaObject: seconds = calendar.timegm(obj.utctimetuple()) - jvm = SparkContext._jvm - assert jvm is not None - return jvm.org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToLocalDateTime( - int(seconds) * 1000000 + obj.microsecond + DateTimeUtils = JavaClass( + "org.apache.spark.sql.catalyst.util.DateTimeUtils", + gateway_client, ) + return DateTimeUtils.microsToLocalDateTime(int(seconds) * 1000000 + obj.microsecond) class DayTimeIntervalTypeConverter: def can_convert(self, obj: Any) -> bool: return isinstance(obj, datetime.timedelta) - def convert(self, obj: datetime.timedelta, gateway_client: JavaGateway) -> JavaObject: - from pyspark import SparkContext - - jvm = SparkContext._jvm - assert jvm is not None - return jvm.org.apache.spark.sql.catalyst.util.IntervalUtils.microsToDuration( + def convert(self, obj: datetime.timedelta, gateway_client: GatewayClient) -> JavaObject: + IntervalUtils = JavaClass( + "org.apache.spark.sql.catalyst.util.IntervalUtils", + gateway_client, + ) + return IntervalUtils.microsToDuration( (math.floor(obj.total_seconds()) * 1000000) + obj.microseconds ) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org