This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 6b55d618d36 [SPARK-44461][SS][PYTHON][CONNECT] Verify Python Version for spark connect streaming workers 6b55d618d36 is described below commit 6b55d618d36bdd296b3883916328d26863e94b8a Author: Wei Liu <wei....@databricks.com> AuthorDate: Thu Aug 10 12:08:13 2023 +0900 [SPARK-44461][SS][PYTHON][CONNECT] Verify Python Version for spark connect streaming workers ### What changes were proposed in this pull request? Add python version check for spark connect streaming `foreachBatch_worker` and `listener_worker` ### Why are the changes needed? Necessary check ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? I believe it can be skipped here Closes #42421 from WweiL/SPARK-44461-verify-python-ver. Authored-by: Wei Liu <wei....@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 0eeb6042859e82cedaa0807abe5d6be0229ecd09) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala | 2 +- python/pyspark/sql/connect/streaming/query.py | 3 ++- python/pyspark/sql/connect/streaming/readwriter.py | 3 ++- python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py | 3 +++ python/pyspark/sql/connect/streaming/worker/listener_worker.py | 3 +++ 5 files changed, 11 insertions(+), 3 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala index 1a75965eb92..cddda6fb7a7 100644 --- a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala @@ -83,7 +83,7 @@ private[spark] class StreamingPythonRunner( val stream = new BufferedOutputStream(pythonWorker.get.getOutputStream, bufferSize) val dataOut = new DataOutputStream(stream) - // TODO(SPARK-44461): verify python version + PythonWorkerUtils.writePythonVersion(pythonVer, dataOut) // Send sessionId PythonRDD.writeUTF(sessionId, dataOut) diff --git a/python/pyspark/sql/connect/streaming/query.py b/python/pyspark/sql/connect/streaming/query.py index 59e98e7bc30..021d27e939d 100644 --- a/python/pyspark/sql/connect/streaming/query.py +++ b/python/pyspark/sql/connect/streaming/query.py @@ -23,6 +23,7 @@ from pyspark.errors import StreamingQueryException, PySparkValueError import pyspark.sql.connect.proto as pb2 from pyspark.serializers import CloudPickleSerializer from pyspark.sql.connect import proto +from pyspark.sql.connect.utils import get_python_ver from pyspark.sql.streaming import StreamingQueryListener from pyspark.sql.streaming.query import ( StreamingQuery as PySparkStreamingQuery, @@ -237,7 +238,7 @@ class StreamingQueryManager: cmd = pb2.StreamingQueryManagerCommand() expr = proto.PythonUDF() expr.command = CloudPickleSerializer().dumps(listener) - expr.python_ver = "%d.%d" % sys.version_info[:2] + expr.python_ver = get_python_ver() cmd.add_listener.python_listener_payload.CopyFrom(expr) cmd.add_listener.id = listener._id self._execute_streaming_query_manager_cmd(cmd) diff --git a/python/pyspark/sql/connect/streaming/readwriter.py b/python/pyspark/sql/connect/streaming/readwriter.py index c8cd408404f..89097fcf43a 100644 --- a/python/pyspark/sql/connect/streaming/readwriter.py +++ b/python/pyspark/sql/connect/streaming/readwriter.py @@ -31,6 +31,7 @@ from pyspark.sql.streaming.readwriter import ( DataStreamReader as PySparkDataStreamReader, DataStreamWriter as PySparkDataStreamWriter, ) +from pyspark.sql.connect.utils import get_python_ver from pyspark.sql.types import Row, StructType from pyspark.errors import PySparkTypeError, PySparkValueError @@ -499,7 +500,7 @@ class DataStreamWriter: self._write_proto.foreach_batch.python_function.command = CloudPickleSerializer().dumps( func ) - self._write_proto.foreach_batch.python_function.python_ver = "%d.%d" % sys.version_info[:2] + self._write_proto.foreach_batch.python_function.python_ver = get_python_ver() return self foreachBatch.__doc__ = PySparkDataStreamWriter.foreachBatch.__doc__ diff --git a/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py b/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py index 48a9848de40..cf61463cd68 100644 --- a/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py +++ b/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py @@ -31,12 +31,15 @@ from pyspark.serializers import ( from pyspark import worker from pyspark.sql import SparkSession from typing import IO +from pyspark.worker_util import check_python_version pickle_ser = CPickleSerializer() utf8_deserializer = UTF8Deserializer() def main(infile: IO, outfile: IO) -> None: + check_python_version(infile) + connect_url = os.environ["SPARK_CONNECT_LOCAL_URL"] session_id = utf8_deserializer.loads(infile) diff --git a/python/pyspark/sql/connect/streaming/worker/listener_worker.py b/python/pyspark/sql/connect/streaming/worker/listener_worker.py index 7aef911426d..e1f4678e42f 100644 --- a/python/pyspark/sql/connect/streaming/worker/listener_worker.py +++ b/python/pyspark/sql/connect/streaming/worker/listener_worker.py @@ -39,12 +39,15 @@ from pyspark.sql.streaming.listener import ( QueryTerminatedEvent, QueryIdleEvent, ) +from pyspark.worker_util import check_python_version pickle_ser = CPickleSerializer() utf8_deserializer = UTF8Deserializer() def main(infile: IO, outfile: IO) -> None: + check_python_version(infile) + connect_url = os.environ["SPARK_CONNECT_LOCAL_URL"] session_id = utf8_deserializer.loads(infile) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org