viirya commented on code in PR #55552:
URL: https://github.com/apache/spark/pull/55552#discussion_r3172454902


##########
python/pyspark/worker.py:
##########
@@ -3609,12 +3588,93 @@ def process():
                 if hasattr(out_iter, "close"):
                     out_iter.close()
 
+        def pipelined_process():
+            """
+            Pipelined variant of process() that pre-fetches input batches in a 
background
+            reader thread while the main thread computes the UDF and writes 
output.
+            This allows input deserialization to overlap with UDF computation.
+            """
+            # Mark that pipelined mode is active so UDFs can verify the code 
path.
+            os.environ["SPARK_PIPELINED_UDF_ACTIVE"] = "1"
+            import queue
+            import threading
+
+            queue_depth = 
int(os.environ.get("SPARK_PIPELINED_UDF_QUEUE_DEPTH", "2"))
+            _SENTINEL = object()
+            input_queue = queue.Queue(maxsize=queue_depth)
+            reader_error = [None]
+            stop_event = threading.Event()
+
+            def _reader_thread():
+                try:
+                    for batch in deserializer.load_stream(infile):
+                        # Some serializers (e.g., ArrowStreamGroupSerializer,
+                        # ArrowStreamAggPandasUDFSerializer) yield lazy 
iterators
+                        # that still read from infile. Materialize them here 
so the
+                        # main thread can consume them without touching infile.
+                        if hasattr(batch, "__next__"):
+                            batch = list(batch)
+                        # Use timeout put so we can check stop_event 
periodically.
+                        # This prevents the reader from blocking forever if 
the main
+                        # thread stops consuming (e.g., due to UDF exception).
+                        while not stop_event.is_set():
+                            try:
+                                input_queue.put(batch, timeout=1)
+                                break
+                            except queue.Full:
+                                continue
+                        if stop_event.is_set():
+                            return

Review Comment:
   Python's queue.Queue doesn't support "put and wait on either queue space or 
an external event" in a single call — it only offers put(timeout). To avoid a 
busy loop while still responding to the stop event, the reader uses 
put(timeout=0.1) and checks stop_event between attempts. On the normal path 
(queue not full), put succeeds immediately with no polling. The 0.1s timeout 
only kicks in when the queue is full and the consumer has stopped — at which 
point stop_event will be set and the reader exits within 0.1s. An alternative 
would be using threading.Condition with a custom bounded buffer, but that adds 
complexity for the same result.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to