This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0eeb6042859 [SPARK-44461][SS][PYTHON][CONNECT] Verify Python Version 
for spark connect streaming workers
0eeb6042859 is described below

commit 0eeb6042859e82cedaa0807abe5d6be0229ecd09
Author: Wei Liu <wei....@databricks.com>
AuthorDate: Thu Aug 10 12:08:13 2023 +0900

    [SPARK-44461][SS][PYTHON][CONNECT] Verify Python Version for spark connect 
streaming workers
    
    ### What changes were proposed in this pull request?
    
    Add python version check for spark connect streaming `foreachBatch_worker` 
and `listener_worker`
    
    ### Why are the changes needed?
    
    Necessary check
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    I believe it can be skipped here
    
    Closes #42421 from WweiL/SPARK-44461-verify-python-ver.
    
    Authored-by: Wei Liu <wei....@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala | 2 +-
 python/pyspark/sql/connect/streaming/query.py                          | 3 ++-
 python/pyspark/sql/connect/streaming/readwriter.py                     | 3 ++-
 python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py     | 3 +++
 python/pyspark/sql/connect/streaming/worker/listener_worker.py         | 3 +++
 5 files changed, 11 insertions(+), 3 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
index a079743c847..fdfe388db2d 100644
--- 
a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
+++ 
b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
@@ -81,7 +81,7 @@ private[spark] class StreamingPythonRunner(
     val stream = new BufferedOutputStream(pythonWorker.get.getOutputStream, 
bufferSize)
     val dataOut = new DataOutputStream(stream)
 
-    // TODO(SPARK-44461): verify python version
+    PythonWorkerUtils.writePythonVersion(pythonVer, dataOut)
 
     // Send sessionId
     PythonRDD.writeUTF(sessionId, dataOut)
diff --git a/python/pyspark/sql/connect/streaming/query.py 
b/python/pyspark/sql/connect/streaming/query.py
index 59e98e7bc30..021d27e939d 100644
--- a/python/pyspark/sql/connect/streaming/query.py
+++ b/python/pyspark/sql/connect/streaming/query.py
@@ -23,6 +23,7 @@ from pyspark.errors import StreamingQueryException, 
PySparkValueError
 import pyspark.sql.connect.proto as pb2
 from pyspark.serializers import CloudPickleSerializer
 from pyspark.sql.connect import proto
+from pyspark.sql.connect.utils import get_python_ver
 from pyspark.sql.streaming import StreamingQueryListener
 from pyspark.sql.streaming.query import (
     StreamingQuery as PySparkStreamingQuery,
@@ -237,7 +238,7 @@ class StreamingQueryManager:
         cmd = pb2.StreamingQueryManagerCommand()
         expr = proto.PythonUDF()
         expr.command = CloudPickleSerializer().dumps(listener)
-        expr.python_ver = "%d.%d" % sys.version_info[:2]
+        expr.python_ver = get_python_ver()
         cmd.add_listener.python_listener_payload.CopyFrom(expr)
         cmd.add_listener.id = listener._id
         self._execute_streaming_query_manager_cmd(cmd)
diff --git a/python/pyspark/sql/connect/streaming/readwriter.py 
b/python/pyspark/sql/connect/streaming/readwriter.py
index c8cd408404f..89097fcf43a 100644
--- a/python/pyspark/sql/connect/streaming/readwriter.py
+++ b/python/pyspark/sql/connect/streaming/readwriter.py
@@ -31,6 +31,7 @@ from pyspark.sql.streaming.readwriter import (
     DataStreamReader as PySparkDataStreamReader,
     DataStreamWriter as PySparkDataStreamWriter,
 )
+from pyspark.sql.connect.utils import get_python_ver
 from pyspark.sql.types import Row, StructType
 from pyspark.errors import PySparkTypeError, PySparkValueError
 
@@ -499,7 +500,7 @@ class DataStreamWriter:
         self._write_proto.foreach_batch.python_function.command = 
CloudPickleSerializer().dumps(
             func
         )
-        self._write_proto.foreach_batch.python_function.python_ver = "%d.%d" % 
sys.version_info[:2]
+        self._write_proto.foreach_batch.python_function.python_ver = 
get_python_ver()
         return self
 
     foreachBatch.__doc__ = PySparkDataStreamWriter.foreachBatch.__doc__
diff --git a/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py 
b/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py
index 48a9848de40..cf61463cd68 100644
--- a/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py
+++ b/python/pyspark/sql/connect/streaming/worker/foreachBatch_worker.py
@@ -31,12 +31,15 @@ from pyspark.serializers import (
 from pyspark import worker
 from pyspark.sql import SparkSession
 from typing import IO
+from pyspark.worker_util import check_python_version
 
 pickle_ser = CPickleSerializer()
 utf8_deserializer = UTF8Deserializer()
 
 
 def main(infile: IO, outfile: IO) -> None:
+    check_python_version(infile)
+
     connect_url = os.environ["SPARK_CONNECT_LOCAL_URL"]
     session_id = utf8_deserializer.loads(infile)
 
diff --git a/python/pyspark/sql/connect/streaming/worker/listener_worker.py 
b/python/pyspark/sql/connect/streaming/worker/listener_worker.py
index 7aef911426d..e1f4678e42f 100644
--- a/python/pyspark/sql/connect/streaming/worker/listener_worker.py
+++ b/python/pyspark/sql/connect/streaming/worker/listener_worker.py
@@ -39,12 +39,15 @@ from pyspark.sql.streaming.listener import (
     QueryTerminatedEvent,
     QueryIdleEvent,
 )
+from pyspark.worker_util import check_python_version
 
 pickle_ser = CPickleSerializer()
 utf8_deserializer = UTF8Deserializer()
 
 
 def main(infile: IO, outfile: IO) -> None:
+    check_python_version(infile)
+
     connect_url = os.environ["SPARK_CONNECT_LOCAL_URL"]
     session_id = utf8_deserializer.loads(infile)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to