This is an automated email from the ASF dual-hosted git repository. ruifengz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 92511b26c68b [SPARK-51591][PYTHON][CONNECT] Fix ThreadPoolExecutor failure in python 3.13 daily test 92511b26c68b is described below commit 92511b26c68b596e3fe196fa422cb5446820d90c Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Mon Mar 24 13:33:57 2025 +0800 [SPARK-51591][PYTHON][CONNECT] Fix ThreadPoolExecutor failure in python 3.13 daily test ### What changes were proposed in this pull request? Fix ThreadPoolExecutor failure in python 3.13 daily build ### Why are the changes needed? in the last 3 weeks, python 3.13 daily build keeps failing with ``` File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1823, in _handle_error raise error File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line 1534, in _execute_and_fetch_as_iterator for b in generator: ^^^^^^^^^ File "<frozen _collections_abc>", line 360, in __next__ File "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 146, in send self._release_all() ~~~~~~~~~~~~~~~~~^^ File "/__w/spark/spark/python/pyspark/sql/connect/client/reattach.py", line 241, in _release_all self._release_thread_pool.submit(target) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^^^ File "/usr/lib/python3.13/concurrent/futures/thread.py", line 171, in submit raise RuntimeError('cannot schedule new futures after shutdown') RuntimeError: cannot schedule new futures after shutdown ``` see https://github.com/apache/spark/actions/runs/13955602888/job/39065942222 In `ThreadPoolExecutor.submit`, this only happens when the pool is already shutdown: ``` def submit(self, fn, /, *args, **kwargs): with self._shutdown_lock, _global_shutdown_lock: if self._broken: raise BrokenThreadPool(self._broken) if self._shutdown: raise RuntimeError('cannot schedule new futures after shutdown') if _shutdown: raise RuntimeError('cannot schedule new futures after ' 'interpreter shutdown') f = _base.Future() w = _WorkItem(f, fn, args, kwargs) self._work_queue.put(w) self._adjust_thread_count() return f ``` ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? PR builder with ``` default: '{"PYSPARK_IMAGE_TO_TEST": "python-313", "PYTHON_TO_TEST": "python3.13"}' ``` ### Was this patch authored or co-authored using generative AI tooling? no Closes #50332 from zhengruifeng/py_pool_313. Authored-by: Ruifeng Zheng <ruife...@apache.org> Signed-off-by: Ruifeng Zheng <ruife...@apache.org> --- python/pyspark/sql/connect/client/reattach.py | 8 ++++++-- python/pyspark/sql/tests/connect/test_connect_session.py | 3 ++- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/python/pyspark/sql/connect/client/reattach.py b/python/pyspark/sql/connect/client/reattach.py index 57d529cef3b0..a827150cfb76 100644 --- a/python/pyspark/sql/connect/client/reattach.py +++ b/python/pyspark/sql/connect/client/reattach.py @@ -213,7 +213,9 @@ class ExecutePlanResponseReattachableIterator(Generator): with self._lock: if self._release_thread_pool_instance is not None: - self._release_thread_pool.submit(target) + thread_pool = self._release_thread_pool + if not thread_pool._shutdown: + thread_pool.submit(target) def _release_all(self) -> None: """ @@ -238,7 +240,9 @@ class ExecutePlanResponseReattachableIterator(Generator): with self._lock: if self._release_thread_pool_instance is not None: - self._release_thread_pool.submit(target) + thread_pool = self._release_thread_pool + if not thread_pool._shutdown: + thread_pool.submit(target) self._result_complete = True def _call_iter(self, iter_fun: Callable) -> Any: diff --git a/python/pyspark/sql/tests/connect/test_connect_session.py b/python/pyspark/sql/tests/connect/test_connect_session.py index 1fd59609d450..1e44f519c9ec 100644 --- a/python/pyspark/sql/tests/connect/test_connect_session.py +++ b/python/pyspark/sql/tests/connect/test_connect_session.py @@ -27,12 +27,12 @@ from pyspark.errors import ( RetriesExceeded, ) from pyspark.sql import SparkSession as PySparkSession - from pyspark.testing.connectutils import ( should_test_connect, ReusedConnectTestCase, connect_requirement_message, ) +from pyspark.testing.utils import timeout if should_test_connect: import grpc @@ -85,6 +85,7 @@ class SparkConnectSessionTests(ReusedConnectTestCase): def _check_no_active_session_error(self, e: PySparkException): self.check_error(exception=e, errorClass="NO_ACTIVE_SESSION", messageParameters=dict()) + @timeout(3) def test_stop_session(self): df = self.spark.sql("select 1 as a, 2 as b") catalog = self.spark.catalog --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org