This is an automated email from the ASF dual-hosted git repository.
gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f8760286a679 [SPARK-54922][PYTHON] Unify how args are passed to python
workers
f8760286a679 is described below
commit f8760286a67973857ec9f1e433b31baf612aa1c9
Author: Tian Gao <[email protected]>
AuthorDate: Wed Jan 7 10:03:28 2026 +0900
[SPARK-54922][PYTHON] Unify how args are passed to python workers
### What changes were proposed in this pull request?
Unify two ways of passing arguments to python workers.
### Why are the changes needed?
We are cleaning up executor <-> worker protocols so we can put all the
protocol related code together - which is helpful for future improvement like
protocol check. Having two ways to do one thing is not ideal.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
CI
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #53699 from gaogaotiantian/unify-arg-reading.
Authored-by: Tian Gao <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
---
python/pyspark/worker.py | 36 ++++++----------------
.../sql/execution/python/PythonUDFRunner.scala | 21 +------------
2 files changed, 10 insertions(+), 47 deletions(-)
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index 386efadce58e..4b2be1bc8587 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -1430,33 +1430,15 @@ def wrap_memory_profiler(f, eval_type, result_id):
def read_single_udf(pickleSer, infile, eval_type, runner_conf, udf_index,
profiler):
num_arg = read_int(infile)
- if eval_type in (
- PythonEvalType.SQL_BATCHED_UDF,
- PythonEvalType.SQL_ARROW_BATCHED_UDF,
- PythonEvalType.SQL_SCALAR_PANDAS_UDF,
- PythonEvalType.SQL_SCALAR_ARROW_UDF,
- PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF,
- PythonEvalType.SQL_WINDOW_AGG_PANDAS_UDF,
- PythonEvalType.SQL_WINDOW_AGG_ARROW_UDF,
- # The below doesn't support named argument, but shares the same
protocol.
- PythonEvalType.SQL_SCALAR_PANDAS_ITER_UDF,
- PythonEvalType.SQL_SCALAR_ARROW_ITER_UDF,
- PythonEvalType.SQL_GROUPED_AGG_ARROW_UDF,
- PythonEvalType.SQL_GROUPED_AGG_ARROW_ITER_UDF,
- PythonEvalType.SQL_GROUPED_AGG_PANDAS_ITER_UDF,
- ):
- args_offsets = []
- kwargs_offsets = {}
- for _ in range(num_arg):
- offset = read_int(infile)
- if read_bool(infile):
- name = utf8_deserializer.loads(infile)
- kwargs_offsets[name] = offset
- else:
- args_offsets.append(offset)
- else:
- args_offsets = [read_int(infile) for i in range(num_arg)]
- kwargs_offsets = {}
+ args_offsets = []
+ kwargs_offsets = {}
+ for _ in range(num_arg):
+ offset = read_int(infile)
+ if read_bool(infile):
+ name = utf8_deserializer.loads(infile)
+ kwargs_offsets[name] = offset
+ else:
+ args_offsets.append(offset)
chained_func = None
for i in range(read_int(infile)):
diff --git
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
index aae87bd94834..c728102962cb 100644
---
a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
+++
b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonUDFRunner.scala
@@ -173,26 +173,7 @@ object PythonUDFRunner {
funcs: Seq[(ChainedPythonFunctions, Long)],
argOffsets: Array[Array[Int]],
profiler: Option[String]): Unit = {
- profiler match {
- case Some(p) =>
- dataOut.writeBoolean(true)
- PythonWorkerUtils.writeUTF(p, dataOut)
- case _ => dataOut.writeBoolean(false)
- }
- dataOut.writeInt(funcs.length)
- funcs.zip(argOffsets).foreach { case ((chained, resultId), offsets) =>
- dataOut.writeInt(offsets.length)
- offsets.foreach { offset =>
- dataOut.writeInt(offset)
- }
- dataOut.writeInt(chained.funcs.length)
- chained.funcs.foreach { f =>
- PythonWorkerUtils.writePythonFunction(f, dataOut)
- }
- if (profiler.isDefined) {
- dataOut.writeLong(resultId)
- }
- }
+ writeUDFs(dataOut, funcs, argOffsets.map(_.map(ArgumentMetadata(_, None,
false))), profiler)
}
def writeUDFs(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]