grundprinzip commented on code in PR #39695: URL: https://github.com/apache/spark/pull/39695#discussion_r1083641679
########## python/pyspark/sql/connect/client.py: ########## @@ -640,6 +669,136 @@ def _handle_error(self, rpc_error: grpc.RpcError) -> NoReturn: raise SparkConnectException(str(rpc_error)) from None +class RetryState: + """ + Simple state helper that captures the state between retries of the exceptions. It + keeps track of the last exception thrown and how many in total. when the task + finishes successfully done() returns True. + """ + + def __init__(self) -> None: + self._exception: Optional[BaseException] = None + self._done = False + self._count = 0 + + def set_exception(self, exc: Optional[BaseException]) -> None: + self._exception = exc + self._count += 1 + + def exception(self) -> Optional[BaseException]: + return self._exception + + def set_done(self) -> None: + self._done = True + + def count(self) -> int: + return self._count + + def done(self) -> bool: + return self._done + + +class AttemptManager: + """ + Simple ContextManager that is used to capture the exception thrown inside the context. + """ + + def __init__(self, check: Callable[..., bool], retry_state: RetryState) -> None: + self._retry_state = retry_state + self._can_retry = check + + def __enter__(self) -> None: + pass + + def __exit__( + self, + exc_type: Optional[Type[BaseException]], + exc_val: Optional[BaseException], + exc_tb: Optional[TracebackType], + ) -> Optional[bool]: + if isinstance(exc_val, BaseException): + # Swallow the exception. + if self._can_retry(exc_val): + self._retry_state.set_exception(exc_val) + return True + # Bubble up the exception. + return False + else: + self._retry_state.set_done() + return None + + +class Retrying: + """ + This helper class is used as a generator together with a context manager to + allow retrying exceptions in particular code blocks. The Retrying can be configured + with a lambda function that is can be filtered what kind of exceptions should be + retried. + + In addition, there are several parameters that are used to configure the exponential + backoff behavior. + + An example to use this class looks like this: + + for attempt in Retrying(lambda x: isinstance(x, TransientError)): + with attempt: + # do the work. + Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org