This is an automated email from the ASF dual-hosted git repository.

ruifengz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 92511b26c68b [SPARK-51591][PYTHON][CONNECT] Fix ThreadPoolExecutor 
failure in python 3.13 daily test
92511b26c68b is described below

commit 92511b26c68b596e3fe196fa422cb5446820d90c
Author: Ruifeng Zheng <ruife...@apache.org>
AuthorDate: Mon Mar 24 13:33:57 2025 +0800

    [SPARK-51591][PYTHON][CONNECT] Fix ThreadPoolExecutor failure in python 
3.13 daily test
    
    ### What changes were proposed in this pull request?
    Fix ThreadPoolExecutor failure in python 3.13 daily build
    
    ### Why are the changes needed?
    in the last 3 weeks, python 3.13 daily build keeps failing with
    ```
      File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 
1823, in _handle_error
        raise error
      File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 
1534, in _execute_and_fetch_as_iterator
        for b in generator:
                 ^^^^^^^^^
      File "<frozen _collections_abc>", line 360, in __next__
      File "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", 
line 146, in send
        self._release_all()
        ~~~~~~~~~~~~~~~~~^^
      File "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", 
line 241, in _release_all
        self._release_thread_pool.submit(target)
        ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^
      File "/usr/lib/python3.13/concurrent/futures/thread.py", line 171, in 
submit
        raise RuntimeError('cannot schedule new futures after shutdown')
    RuntimeError: cannot schedule new futures after shutdown
    ```
    
    see https://github.com/apache/spark/actions/runs/13955602888/job/39065942222
    
    In `ThreadPoolExecutor.submit`, this only happens when the pool is already 
shutdown:
    ```
        def submit(self, fn, /, *args, **kwargs):
            with self._shutdown_lock, _global_shutdown_lock:
                if self._broken:
                    raise BrokenThreadPool(self._broken)
    
                if self._shutdown:
                    raise RuntimeError('cannot schedule new futures after 
shutdown')
                if _shutdown:
                    raise RuntimeError('cannot schedule new futures after '
                                       'interpreter shutdown')
    
                f = _base.Future()
                w = _WorkItem(f, fn, args, kwargs)
    
                self._work_queue.put(w)
                self._adjust_thread_count()
                return f
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    no
    
    ### How was this patch tested?
    PR builder with
    ```
    default: '{"PYSPARK_IMAGE_TO_TEST": "python-313", "PYTHON_TO_TEST": 
"python3.13"}'
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    no
    
    Closes #50332 from zhengruifeng/py_pool_313.
    
    Authored-by: Ruifeng Zheng <ruife...@apache.org>
    Signed-off-by: Ruifeng Zheng <ruife...@apache.org>
---
 python/pyspark/sql/connect/client/reattach.py            | 8 ++++++--
 python/pyspark/sql/tests/connect/test_connect_session.py | 3 ++-
 2 files changed, 8 insertions(+), 3 deletions(-)

diff --git a/python/pyspark/sql/connect/client/reattach.py 
b/python/pyspark/sql/connect/client/reattach.py
index 57d529cef3b0..a827150cfb76 100644
--- a/python/pyspark/sql/connect/client/reattach.py
+++ b/python/pyspark/sql/connect/client/reattach.py
@@ -213,7 +213,9 @@ class ExecutePlanResponseReattachableIterator(Generator):
 
         with self._lock:
             if self._release_thread_pool_instance is not None:
-                self._release_thread_pool.submit(target)
+                thread_pool = self._release_thread_pool
+                if not thread_pool._shutdown:
+                    thread_pool.submit(target)
 
     def _release_all(self) -> None:
         """
@@ -238,7 +240,9 @@ class ExecutePlanResponseReattachableIterator(Generator):
 
         with self._lock:
             if self._release_thread_pool_instance is not None:
-                self._release_thread_pool.submit(target)
+                thread_pool = self._release_thread_pool
+                if not thread_pool._shutdown:
+                    thread_pool.submit(target)
         self._result_complete = True
 
     def _call_iter(self, iter_fun: Callable) -> Any:
diff --git a/python/pyspark/sql/tests/connect/test_connect_session.py 
b/python/pyspark/sql/tests/connect/test_connect_session.py
index 1fd59609d450..1e44f519c9ec 100644
--- a/python/pyspark/sql/tests/connect/test_connect_session.py
+++ b/python/pyspark/sql/tests/connect/test_connect_session.py
@@ -27,12 +27,12 @@ from pyspark.errors import (
     RetriesExceeded,
 )
 from pyspark.sql import SparkSession as PySparkSession
-
 from pyspark.testing.connectutils import (
     should_test_connect,
     ReusedConnectTestCase,
     connect_requirement_message,
 )
+from pyspark.testing.utils import timeout
 
 if should_test_connect:
     import grpc
@@ -85,6 +85,7 @@ class SparkConnectSessionTests(ReusedConnectTestCase):
     def _check_no_active_session_error(self, e: PySparkException):
         self.check_error(exception=e, errorClass="NO_ACTIVE_SESSION", 
messageParameters=dict())
 
+    @timeout(3)
     def test_stop_session(self):
         df = self.spark.sql("select 1 as a, 2 as b")
         catalog = self.spark.catalog


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to