This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 62824a8f0236 [SPARK-55368][PYTHON][TESTS] Make sure `worker_util.py` 
can only be imported in python workers
62824a8f0236 is described below

commit 62824a8f02363f56a3e3a6a9deb519d5802eff28
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Fri Feb 6 07:35:43 2026 +0900

    [SPARK-55368][PYTHON][TESTS] Make sure `worker_util.py` can only be 
imported in python workers
    
    ### What changes were proposed in this pull request?
    Make sure `worker_util.py` can only be imported in python workers
    
    ### Why are the changes needed?
    `worker_util.py` is expected to only available in python workers
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    ci
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #54139 from zhengruifeng/where_am_i.
    
    Lead-authored-by: Ruifeng Zheng <[email protected]>
    Co-authored-by: Ruifeng Zheng <[email protected]>
    Signed-off-by: Hyukjin Kwon <[email protected]>
---
 core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala   | 1 +
 .../scala/org/apache/spark/api/python/StreamingPythonRunner.scala    | 1 +
 python/pyspark/worker_util.py                                        | 5 +++++
 .../org/apache/spark/sql/execution/python/PythonPlannerRunner.scala  | 1 +
 .../sql/execution/python/streaming/PythonStreamingSourceRunner.scala | 1 +
 5 files changed, 9 insertions(+)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 6bc64f54d3a2..4e6089b50a46 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -324,6 +324,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
     envVars.put("PYTHON_UDF_BATCH_SIZE", batchSizeForPythonUDF.toString)
 
     envVars.put("SPARK_JOB_ARTIFACT_UUID", 
jobArtifactUUID.getOrElse("default"))
+    envVars.put("SPARK_PYTHON_RUNTIME", "PYTHON_WORKER")
 
     val (worker: PythonWorker, handle: Option[ProcessHandle]) = 
env.createPythonWorker(
       pythonExec, workerModule, daemonModule, envVars.asScala.toMap, useDaemon)
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
index e61fa01db113..7b659941c5ef 100644
--- 
a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
+++ 
b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
@@ -73,6 +73,7 @@ private[spark] class StreamingPythonRunner(
     if (!connectUrl.isEmpty) {
       envVars.put("SPARK_CONNECT_LOCAL_URL", connectUrl)
     }
+    envVars.put("SPARK_PYTHON_RUNTIME", "PYTHON_WORKER")
 
     val workerFactory =
       new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap, 
false)
diff --git a/python/pyspark/worker_util.py b/python/pyspark/worker_util.py
index 6c6db7e2a53d..ac090bc955bd 100644
--- a/python/pyspark/worker_util.py
+++ b/python/pyspark/worker_util.py
@@ -25,6 +25,11 @@ import sys
 from typing import Any, IO, Optional
 import warnings
 
+if "SPARK_TESTING" in os.environ:
+    assert (
+        os.environ.get("SPARK_PYTHON_RUNTIME") == "PYTHON_WORKER"
+    ), "This module can only be imported in python woker"
+
 # 'resource' is a Unix specific module.
 has_resource_module = True
 try:
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
index dd1a869ddf7d..5311f34f41c0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
@@ -109,6 +109,7 @@ abstract class PythonPlannerRunner[T](func: PythonFunction) 
extends Logging {
     sessionUUID.foreach { uuid =>
       envVars.put("PYSPARK_SPARK_SESSION_UUID", uuid)
     }
+    envVars.put("SPARK_PYTHON_RUNTIME", "PYTHON_WORKER")
 
     EvaluatePython.registerPicklers()
     val pickler = new Pickler(/* useMemo = */ true,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala
index 270d816e9bd9..638a2e9d2062 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala
@@ -92,6 +92,7 @@ class PythonStreamingSourceRunner(
 
     envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
     envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+    envVars.put("SPARK_PYTHON_RUNTIME", "PYTHON_WORKER")
 
     val workerFactory =
       new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap, 
false)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to