viirya commented on code in PR #55552:
URL: https://github.com/apache/spark/pull/55552#discussion_r3158073246


##########
python/pyspark/worker.py:
##########
@@ -3588,12 +3588,93 @@ def process():
                 if hasattr(out_iter, "close"):
                     out_iter.close()
 
+        def pipelined_process():
+            """
+            Pipelined variant of process() that pre-fetches input batches in a 
background
+            reader thread while the main thread computes the UDF and writes 
output.
+            This allows input deserialization to overlap with UDF computation.
+            """
+            # Mark that pipelined mode is active so UDFs can verify the code 
path.
+            os.environ["SPARK_PIPELINED_UDF_ACTIVE"] = "1"
+            import queue
+            import threading
+
+            queue_depth = 
int(os.environ.get("SPARK_PIPELINED_UDF_QUEUE_DEPTH", "2"))
+            _SENTINEL = object()
+            input_queue = queue.Queue(maxsize=queue_depth)
+            reader_error = [None]
+            stop_event = threading.Event()
+
+            def _reader_thread():
+                try:
+                    for batch in deserializer.load_stream(infile):
+                        # Some serializers (e.g., ArrowStreamGroupSerializer,
+                        # ArrowStreamAggPandasUDFSerializer) yield lazy 
iterators
+                        # that still read from infile. Materialize them here 
so the
+                        # main thread can consume them without touching infile.
+                        if hasattr(batch, "__next__"):
+                            batch = list(batch)

Review Comment:
   The list(batch) materialize happens for grouped/aggregate UDF serializers 
whose load_stream() yields lazy iterators. In sync mode, the same data is 
already materialized in mapper(batch_iter) via list(batch_iter) — so per-group 
peak memory is the same.
   
   The difference is that with queueDepth=2 (default), up to 2 additional 
groups can be buffered in the queue. In the worst case (skewed key with one 
very large group), this could increase peak memory by ~2x the group size. Users 
can set spark.python.udf.pipelined.queueDepth=1 to reduce this, or disable 
pipelined mode entirely for memory-sensitive workloads.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to