This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 2a9dd2b3968 [SPARK-45167][CONNECT][PYTHON][FOLLOW-UP] Use lighter threading Rlock, and use the existing eventually util function 2a9dd2b3968 is described below commit 2a9dd2b3968da7c2e96c502aaf4c158ee782e5f4 Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Mon Sep 18 13:46:34 2023 +0900 [SPARK-45167][CONNECT][PYTHON][FOLLOW-UP] Use lighter threading Rlock, and use the existing eventually util function This PR is a followup of https://github.com/apache/spark/pull/42929 that: - Use lighter threading `Rlock` instead of multithreading `Rlock`. Multiprocessing does not work with PySpark due to the ser/de problem for socket connections, and many others. - Use the existing eventually util function `pyspark.testing.eventually` instead of `assertEventually` to deduplicate code. Mainly for code clean-up. No. Existing tests should pass them. No. Closes #42965 from HyukjinKwon/SPARK-45167-followup. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit d5ff04da217df483d27011f6e38417df2eaa42bd) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/connect/client/reattach.py | 5 ++--- .../sql/tests/connect/client/test_client.py | 23 +++++----------------- 2 files changed, 7 insertions(+), 21 deletions(-) diff --git a/python/pyspark/sql/connect/client/reattach.py b/python/pyspark/sql/connect/client/reattach.py index e58864b965b..6addb5bd2c6 100644 --- a/python/pyspark/sql/connect/client/reattach.py +++ b/python/pyspark/sql/connect/client/reattach.py @@ -18,12 +18,11 @@ from pyspark.sql.connect.utils import check_dependencies check_dependencies(__name__) +from threading import RLock import warnings import uuid from collections.abc import Generator from typing import Optional, Dict, Any, Iterator, Iterable, Tuple, Callable, cast, Type, ClassVar -from multiprocessing import RLock -from multiprocessing.synchronize import RLock as RLockBase from multiprocessing.pool import ThreadPool import os @@ -56,7 +55,7 @@ class ExecutePlanResponseReattachableIterator(Generator): """ # Lock to manage the pool - _lock: ClassVar[RLockBase] = RLock() + _lock: ClassVar[RLock] = RLock() _release_thread_pool: Optional[ThreadPool] = ThreadPool(os.cpu_count() if os.cpu_count() else 8) @classmethod diff --git a/python/pyspark/sql/tests/connect/client/test_client.py b/python/pyspark/sql/tests/connect/client/test_client.py index cf43fb16df7..93b7006799b 100644 --- a/python/pyspark/sql/tests/connect/client/test_client.py +++ b/python/pyspark/sql/tests/connect/client/test_client.py @@ -25,6 +25,7 @@ import grpc from pyspark.sql.connect.client import SparkConnectClient, ChannelBuilder import pyspark.sql.connect.proto as proto from pyspark.testing.connectutils import should_test_connect, connect_requirement_message +from pyspark.testing.utils import eventually from pyspark.sql.connect.client.core import Retrying from pyspark.sql.connect.client.reattach import ( @@ -152,20 +153,6 @@ class SparkConnectClientReattachTestCase(unittest.TestCase): attach_ops=ResponseGenerator(attach) if attach is not None else None, ) - def assertEventually(self, callable, timeout_ms=1000): - """Helper method that will continuously evaluate the callable to not raise an - exception.""" - import time - - limit = time.monotonic_ns() + timeout_ms * 1000 * 1000 - while time.monotonic_ns() < limit: - try: - callable() - break - except Exception: - time.sleep(0.1) - callable() - def test_basic_flow(self): stub = self._stub_with([self.response, self.finished]) ite = ExecutePlanResponseReattachableIterator(self.request, stub, self.policy, []) @@ -178,7 +165,7 @@ class SparkConnectClientReattachTestCase(unittest.TestCase): self.assertEqual(1, stub.release_calls) self.assertEqual(1, stub.execute_calls) - self.assertEventually(check_all, timeout_ms=1000) + eventually(timeout=1, catch_assertions=True)(check_all)() def test_fail_during_execute(self): def fatal(): @@ -196,7 +183,7 @@ class SparkConnectClientReattachTestCase(unittest.TestCase): self.assertEqual(1, stub.release_until_calls) self.assertEqual(1, stub.execute_calls) - self.assertEventually(check, timeout_ms=1000) + eventually(timeout=1, catch_assertions=True)(check)() def test_fail_and_retry_during_execute(self): def non_fatal(): @@ -215,7 +202,7 @@ class SparkConnectClientReattachTestCase(unittest.TestCase): self.assertEqual(3, stub.release_until_calls) self.assertEqual(1, stub.execute_calls) - self.assertEventually(check, timeout_ms=1000) + eventually(timeout=1, catch_assertions=True)(check)() def test_fail_and_retry_during_reattach(self): count = 0 @@ -241,7 +228,7 @@ class SparkConnectClientReattachTestCase(unittest.TestCase): self.assertEqual(1, stub.release_calls) self.assertEqual(1, stub.execute_calls) - self.assertEventually(check, timeout_ms=1000) + eventually(timeout=1, catch_assertions=True)(check)() class TestException(grpc.RpcError, grpc.Call): --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org