viirya commented on code in PR #55552:
URL: https://github.com/apache/spark/pull/55552#discussion_r3172437210
##########
python/pyspark/worker.py:
##########
@@ -3588,12 +3588,93 @@ def process():
if hasattr(out_iter, "close"):
out_iter.close()
+ def pipelined_process():
+ """
+ Pipelined variant of process() that pre-fetches input batches in a
background
+ reader thread while the main thread computes the UDF and writes
output.
+ This allows input deserialization to overlap with UDF computation.
+ """
+ # Mark that pipelined mode is active so UDFs can verify the code
path.
+ os.environ["SPARK_PIPELINED_UDF_ACTIVE"] = "1"
+ import queue
+ import threading
+
+ queue_depth =
int(os.environ.get("SPARK_PIPELINED_UDF_QUEUE_DEPTH", "2"))
+ _SENTINEL = object()
+ input_queue = queue.Queue(maxsize=queue_depth)
+ reader_error = [None]
+ stop_event = threading.Event()
+
+ def _reader_thread():
+ try:
+ for batch in deserializer.load_stream(infile):
+ # Some serializers (e.g., ArrowStreamGroupSerializer,
+ # ArrowStreamAggPandasUDFSerializer) yield lazy
iterators
+ # that still read from infile. Materialize them here
so the
+ # main thread can consume them without touching infile.
+ if hasattr(batch, "__next__"):
+ batch = list(batch)
Review Comment:
The eager materialization in the reader thread produces the same data volume
that sync mode already materializes — in sync mode, mapper(batch_iter) calls
list(batch_iter) on the same lazy iterator. The only additional memory comes
from the pre-fetch queue: with the default queueDepth=2, up to 2 extra groups
can be buffered ahead. For the ASV peakmem benchmarks, both modes show ~110M
because the test data is small enough that the queue overhead is negligible. A
workload with very large skewed groups could see higher peak memory; users can
set spark.python.udf.pipelined.queueDepth=1 to reduce this to at most 1 extra
group, or disable pipelined mode entirely.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]