This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 9e0e4182f62 [SPARK-40532][CONNECT] Add Python Version into Python UDF message 9e0e4182f62 is described below commit 9e0e4182f62e3dd30aa9d6f57a746a258830e351 Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Tue Feb 7 00:31:16 2023 -0800 [SPARK-40532][CONNECT] Add Python Version into Python UDF message ### What changes were proposed in this pull request? This PR adds the Python version from the remote client. See also: https://github.com/apache/spark/blob/56c7cf33929d7d42b7d299c0bb7e895963241214/python/pyspark/context.py#L312 https://github.com/apache/spark/blob/603dc5098217d9580f611873165d25392f41cdfe/python/pyspark/worker.py#L682-L691 ### Why are the changes needed? In order to make sure to run it in the same Python version of executors. ### Does this PR introduce _any_ user-facing change? No to end users. Yes for the dev. Now other Python versions can be used for Spark Connect. ### How was this patch tested? Manually tested. Closes #39914 from HyukjinKwon/SPARK-40532. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Dongjoon Hyun <dongj...@apache.org> (cherry picked from commit e49466a8be617da86b614d0fab49f00b0da0d952) Signed-off-by: Dongjoon Hyun <dongj...@apache.org> --- .../src/main/protobuf/spark/connect/expressions.proto | 2 ++ .../spark/sql/connect/planner/SparkConnectPlanner.scala | 2 +- .../sql/connect/messages/ConnectProtoMessagesSuite.scala | 2 ++ python/pyspark/sql/connect/expressions.py | 5 ++++- python/pyspark/sql/connect/proto/expressions_pb2.py | 10 +++++----- python/pyspark/sql/connect/proto/expressions_pb2.pyi | 13 ++++++++++++- python/pyspark/sql/connect/udf.py | 2 ++ 7 files changed, 28 insertions(+), 8 deletions(-) diff --git a/connector/connect/common/src/main/protobuf/spark/connect/expressions.proto b/connector/connect/common/src/main/protobuf/spark/connect/expressions.proto index 66361883321..8682e1ee27b 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/expressions.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/expressions.proto @@ -318,6 +318,8 @@ message PythonUDF { int32 eval_type = 2; // (Required) The encoded commands of the Python UDF bytes command = 3; + // (Required) Python version being used in the client. + string python_ver = 4; } message ScalarScalaUDF { diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 07a5e5bc156..c8a0860b871 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -894,7 +894,7 @@ class SparkConnectPlanner(val session: SparkSession) { // No imported Python libraries pythonIncludes = Lists.newArrayList(), pythonExec = pythonExec, - pythonVer = "3.9", // TODO(SPARK-40532) This needs to be an actual Python version. + pythonVer = fun.getPythonVer, // Empty broadcast variables broadcastVars = Lists.newArrayList(), // Null accumulator diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/messages/ConnectProtoMessagesSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/messages/ConnectProtoMessagesSuite.scala index 240f6573c7d..09462ce18c2 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/messages/ConnectProtoMessagesSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/messages/ConnectProtoMessagesSuite.scala @@ -63,6 +63,7 @@ class ConnectProtoMessagesSuite extends SparkFunSuite { .setEvalType(100) .setOutputType("\"integer\"") .setCommand(ByteString.copyFrom("command".getBytes())) + .setPythonVer("3.10") .build() val commonInlineUserDefinedFunctionExpr = proto.Expression @@ -81,5 +82,6 @@ class ConnectProtoMessagesSuite extends SparkFunSuite { assert(fun.getDeterministic == true) assert(fun.getArgumentsCount == 1) assert(fun.hasPythonUdf == true) + assert(pythonUdf.getPythonVer == "3.10") } } diff --git a/python/pyspark/sql/connect/expressions.py b/python/pyspark/sql/connect/expressions.py index 300a1a006d5..dcd7c5ebba6 100644 --- a/python/pyspark/sql/connect/expressions.py +++ b/python/pyspark/sql/connect/expressions.py @@ -491,22 +491,25 @@ class PythonUDF: output_type: str, eval_type: int, command: bytes, + python_ver: str, ) -> None: self._output_type = output_type self._eval_type = eval_type self._command = command + self._python_ver = python_ver def to_plan(self, session: "SparkConnectClient") -> proto.PythonUDF: expr = proto.PythonUDF() expr.output_type = self._output_type expr.eval_type = self._eval_type expr.command = self._command + expr.python_ver = self._python_ver return expr def __repr__(self) -> str: return ( f"{self._output_type}, {self._eval_type}, " - f"{self._command}" # type: ignore[str-bytes-safe] + f"{self._command}, f{self._python_ver}" # type: ignore[str-bytes-safe] ) diff --git a/python/pyspark/sql/connect/proto/expressions_pb2.py b/python/pyspark/sql/connect/proto/expressions_pb2.py index 3a06e80c21e..92d9e6a610a 100644 --- a/python/pyspark/sql/connect/proto/expressions_pb2.py +++ b/python/pyspark/sql/connect/proto/expressions_pb2.py @@ -34,7 +34,7 @@ from pyspark.sql.connect.proto import types_pb2 as spark_dot_connect_dot_types__ DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( - b'\n\x1fspark/connect/expressions.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x19spark/connect/types.proto"\x92%\n\nExpression\x12=\n\x07literal\x18\x01 \x01(\x0b\x32!.spark.connect.Expression.LiteralH\x00R\x07literal\x12\x62\n\x14unresolved_attribute\x18\x02 \x01(\x0b\x32-.spark.connect.Expression.UnresolvedAttributeH\x00R\x13unresolvedAttribute\x12_\n\x13unresolved_function\x18\x03 \x01(\x0b\x32,.spark.connect.Expression.UnresolvedFunctionH\x00R\x12unresolvedFunct [...] + b'\n\x1fspark/connect/expressions.proto\x12\rspark.connect\x1a\x19google/protobuf/any.proto\x1a\x19spark/connect/types.proto"\x92%\n\nExpression\x12=\n\x07literal\x18\x01 \x01(\x0b\x32!.spark.connect.Expression.LiteralH\x00R\x07literal\x12\x62\n\x14unresolved_attribute\x18\x02 \x01(\x0b\x32-.spark.connect.Expression.UnresolvedAttributeH\x00R\x13unresolvedAttribute\x12_\n\x13unresolved_function\x18\x03 \x01(\x0b\x32,.spark.connect.Expression.UnresolvedFunctionH\x00R\x12unresolvedFunct [...] ) @@ -345,8 +345,8 @@ if _descriptor._USE_C_DESCRIPTORS == False: _EXPRESSION_UNRESOLVEDNAMEDLAMBDAVARIABLE._serialized_end = 4846 _COMMONINLINEUSERDEFINEDFUNCTION._serialized_start = 4862 _COMMONINLINEUSERDEFINEDFUNCTION._serialized_end = 5173 - _PYTHONUDF._serialized_start = 5175 - _PYTHONUDF._serialized_end = 5274 - _SCALARSCALAUDF._serialized_start = 5277 - _SCALARSCALAUDF._serialized_end = 5461 + _PYTHONUDF._serialized_start = 5176 + _PYTHONUDF._serialized_end = 5306 + _SCALARSCALAUDF._serialized_start = 5309 + _SCALARSCALAUDF._serialized_end = 5493 # @@protoc_insertion_point(module_scope) diff --git a/python/pyspark/sql/connect/proto/expressions_pb2.pyi b/python/pyspark/sql/connect/proto/expressions_pb2.pyi index 604672a9ad7..934e0016c90 100644 --- a/python/pyspark/sql/connect/proto/expressions_pb2.pyi +++ b/python/pyspark/sql/connect/proto/expressions_pb2.pyi @@ -1163,23 +1163,34 @@ class PythonUDF(google.protobuf.message.Message): OUTPUT_TYPE_FIELD_NUMBER: builtins.int EVAL_TYPE_FIELD_NUMBER: builtins.int COMMAND_FIELD_NUMBER: builtins.int + PYTHON_VER_FIELD_NUMBER: builtins.int output_type: builtins.str """(Required) Output type of the Python UDF""" eval_type: builtins.int """(Required) EvalType of the Python UDF""" command: builtins.bytes """(Required) The encoded commands of the Python UDF""" + python_ver: builtins.str + """(Required) Python version being used in the client.""" def __init__( self, *, output_type: builtins.str = ..., eval_type: builtins.int = ..., command: builtins.bytes = ..., + python_ver: builtins.str = ..., ) -> None: ... def ClearField( self, field_name: typing_extensions.Literal[ - "command", b"command", "eval_type", b"eval_type", "output_type", b"output_type" + "command", + b"command", + "eval_type", + b"eval_type", + "output_type", + b"output_type", + "python_ver", + b"python_ver", ], ) -> None: ... diff --git a/python/pyspark/sql/connect/udf.py b/python/pyspark/sql/connect/udf.py index 46d45d8bc70..6571cf76929 100644 --- a/python/pyspark/sql/connect/udf.py +++ b/python/pyspark/sql/connect/udf.py @@ -21,6 +21,7 @@ from pyspark.sql.connect import check_dependencies check_dependencies(__name__, __file__) +import sys import functools from typing import Callable, Any, TYPE_CHECKING, Optional @@ -131,6 +132,7 @@ class UserDefinedFunction: output_type=data_type_str, eval_type=self.evalType, command=CloudPickleSerializer().dumps((self.func, self._returnType)), + python_ver="%d.%d" % sys.version_info[:2], ) return Column( CommonInlineUserDefinedFunction( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org