This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 62824a8f0236 [SPARK-55368][PYTHON][TESTS] Make sure `worker_util.py`
can only be imported in python workers
62824a8f0236 is described below
commit 62824a8f02363f56a3e3a6a9deb519d5802eff28
Author: Ruifeng Zheng <[email protected]>
AuthorDate: Fri Feb 6 07:35:43 2026 +0900
[SPARK-55368][PYTHON][TESTS] Make sure `worker_util.py` can only be
imported in python workers
### What changes were proposed in this pull request?
Make sure `worker_util.py` can only be imported in python workers
### Why are the changes needed?
`worker_util.py` is expected to only available in python workers
### Does this PR introduce _any_ user-facing change?
no
### How was this patch tested?
ci
### Was this patch authored or co-authored using generative AI tooling?
no
Closes #54139 from zhengruifeng/where_am_i.
Lead-authored-by: Ruifeng Zheng <[email protected]>
Co-authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala | 1 +
.../scala/org/apache/spark/api/python/StreamingPythonRunner.scala | 1 +
python/pyspark/worker_util.py | 5 +++++
.../org/apache/spark/sql/execution/python/PythonPlannerRunner.scala | 1 +
.../sql/execution/python/streaming/PythonStreamingSourceRunner.scala | 1 +
5 files changed, 9 insertions(+)
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 6bc64f54d3a2..4e6089b50a46 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -324,6 +324,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
envVars.put("PYTHON_UDF_BATCH_SIZE", batchSizeForPythonUDF.toString)
envVars.put("SPARK_JOB_ARTIFACT_UUID",
jobArtifactUUID.getOrElse("default"))
+ envVars.put("SPARK_PYTHON_RUNTIME", "PYTHON_WORKER")
val (worker: PythonWorker, handle: Option[ProcessHandle]) =
env.createPythonWorker(
pythonExec, workerModule, daemonModule, envVars.asScala.toMap, useDaemon)
diff --git
a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
index e61fa01db113..7b659941c5ef 100644
---
a/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
+++
b/core/src/main/scala/org/apache/spark/api/python/StreamingPythonRunner.scala
@@ -73,6 +73,7 @@ private[spark] class StreamingPythonRunner(
if (!connectUrl.isEmpty) {
envVars.put("SPARK_CONNECT_LOCAL_URL", connectUrl)
}
+ envVars.put("SPARK_PYTHON_RUNTIME", "PYTHON_WORKER")
val workerFactory =
new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap,
false)
diff --git a/python/pyspark/worker_util.py b/python/pyspark/worker_util.py
index 6c6db7e2a53d..ac090bc955bd 100644
--- a/python/pyspark/worker_util.py
+++ b/python/pyspark/worker_util.py
@@ -25,6 +25,11 @@ import sys
from typing import Any, IO, Optional
import warnings
+if "SPARK_TESTING" in os.environ:
+ assert (
+ os.environ.get("SPARK_PYTHON_RUNTIME") == "PYTHON_WORKER"
+ ), "This module can only be imported in python woker"
+
# 'resource' is a Unix specific module.
has_resource_module = True
try:
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
index dd1a869ddf7d..5311f34f41c0 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonPlannerRunner.scala
@@ -109,6 +109,7 @@ abstract class PythonPlannerRunner[T](func: PythonFunction)
extends Logging {
sessionUUID.foreach { uuid =>
envVars.put("PYSPARK_SPARK_SESSION_UUID", uuid)
}
+ envVars.put("SPARK_PYTHON_RUNTIME", "PYTHON_WORKER")
EvaluatePython.registerPicklers()
val pickler = new Pickler(/* useMemo = */ true,
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala
index 270d816e9bd9..638a2e9d2062 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/PythonStreamingSourceRunner.scala
@@ -92,6 +92,7 @@ class PythonStreamingSourceRunner(
envVars.put("SPARK_AUTH_SOCKET_TIMEOUT", authSocketTimeout.toString)
envVars.put("SPARK_BUFFER_SIZE", bufferSize.toString)
+ envVars.put("SPARK_PYTHON_RUNTIME", "PYTHON_WORKER")
val workerFactory =
new PythonWorkerFactory(pythonExec, workerModule, envVars.asScala.toMap,
false)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]