This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new cd653cff3cbe [SPARK-44664][PYTHON][CONNECT] Release the execute when closing the iterator in Python client cd653cff3cbe is described below commit cd653cff3cbe37a1ab06d74a58d29b4264335506 Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Fri Aug 4 12:03:54 2023 +0900 [SPARK-44664][PYTHON][CONNECT] Release the execute when closing the iterator in Python client This PR implements the symmetry of https://github.com/apache/spark/pull/42331 and https://github.com/apache/spark/pull/42304 1. It releases the execute when the error is raised during the iteration 2. When you explicitly close the generator, (e.g., either `generator.close()` or explicit `GeneratorExit`), it releases the execution. For the feature parity, see also https://github.com/apache/spark/pull/42331 and https://github.com/apache/spark/pull/42304 See also https://github.com/apache/spark/pull/42331 and https://github.com/apache/spark/pull/42304 Tests will be added separately. Closes #42330 from HyukjinKwon/python-error-release. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 492f6fac02a00b9ad545d84fa3f10a021a8e71b9) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/sql/connect/client/reattach.py | 110 +++++++++++++++++--------- 1 file changed, 72 insertions(+), 38 deletions(-) diff --git a/python/pyspark/sql/connect/client/reattach.py b/python/pyspark/sql/connect/client/reattach.py index 4d4cce0ca441..702107d97f54 100644 --- a/python/pyspark/sql/connect/client/reattach.py +++ b/python/pyspark/sql/connect/client/reattach.py @@ -111,9 +111,9 @@ class ExecutePlanResponseReattachableIterator(Generator): self._last_returned_response_id = ret.response_id if ret.HasField("result_complete"): self._result_complete = True - self._release_execute(None) # release all + self._release_all() else: - self._release_execute(self._last_returned_response_id) + self._release_until(self._last_returned_response_id) self._current = None return ret @@ -125,61 +125,93 @@ class ExecutePlanResponseReattachableIterator(Generator): # After response complete response return False else: - for attempt in Retrying( - can_retry=SparkConnectClient.retry_exception, **self._retry_policy - ): - with attempt: - # on first try, we use the existing iterator. - if not attempt.is_first_try(): - # on retry, the iterator is borked, so we need a new one - self._iterator = iter( - self._stub.ReattachExecute(self._create_reattach_execute_request()) - ) - - if self._current is None: - try: - self._current = next(self._iterator) - except StopIteration: - pass - - has_next = self._current is not None - - # Graceful reattach: - # If iterator ended, but there was no ResponseComplete, it means that - # there is more, and we need to reattach. While ResponseComplete didn't - # arrive, we keep reattaching. - if not self._result_complete and not has_next: - while not has_next: + try: + for attempt in Retrying( + can_retry=SparkConnectClient.retry_exception, **self._retry_policy + ): + with attempt: + # on first try, we use the existing iterator. + if not attempt.is_first_try(): + # on retry, the iterator is borked, so we need a new one self._iterator = iter( self._stub.ReattachExecute(self._create_reattach_execute_request()) ) - # shouldn't change - assert not self._result_complete + + if self._current is None: try: self._current = next(self._iterator) except StopIteration: pass - has_next = self._current is not None - return has_next + + has_next = self._current is not None + + # Graceful reattach: + # If iterator ended, but there was no ResponseComplete, it means that + # there is more, and we need to reattach. While ResponseComplete didn't + # arrive, we keep reattaching. + if not self._result_complete and not has_next: + while not has_next: + self._iterator = iter( + self._stub.ReattachExecute( + self._create_reattach_execute_request() + ) + ) + # shouldn't change + assert not self._result_complete + try: + self._current = next(self._iterator) + except StopIteration: + pass + has_next = self._current is not None + return has_next + except Exception as e: + self._release_all() + raise e return False - def _release_execute(self, until_response_id: Optional[str]) -> None: + def _release_until(self, until_response_id: str) -> None: """ - Inform the server to release the execution. + Inform the server to release the buffered execution results until and including given + result. This will send an asynchronous RPC which will not block this iterator, the iterator can continue to be consumed. + """ + if self._result_complete: + return + + from pyspark.sql.connect.client.core import SparkConnectClient + from pyspark.sql.connect.client.core import Retrying - Release with untilResponseId informs the server that the iterator has been consumed until - and including response with that responseId, and these responses can be freed. + request = self._create_release_execute_request(until_response_id) - Release with None means that the responses have been completely consumed and informs the - server that the completed execution can be completely freed. + def target() -> None: + try: + for attempt in Retrying( + can_retry=SparkConnectClient.retry_exception, **self._retry_policy + ): + with attempt: + self._stub.ReleaseExecute(request) + except Exception as e: + warnings.warn(f"ReleaseExecute failed with exception: {e}.") + + ExecutePlanResponseReattachableIterator._release_thread_pool.apply_async(target) + + def _release_all(self) -> None: + """ + Inform the server to release the execution, either because all results were consumed, + or the execution finished with error and the error was received. + + This will send an asynchronous RPC which will not block this. The client continues + executing, and if the release fails, server is equipped to deal with abandoned executions. """ + if self._result_complete: + return + from pyspark.sql.connect.client.core import SparkConnectClient from pyspark.sql.connect.client.core import Retrying - request = self._create_release_execute_request(until_response_id) + request = self._create_release_execute_request(None) def target() -> None: try: @@ -192,6 +224,7 @@ class ExecutePlanResponseReattachableIterator(Generator): warnings.warn(f"ReleaseExecute failed with exception: {e}.") ExecutePlanResponseReattachableIterator._release_thread_pool.apply_async(target) + self._result_complete = True def _create_reattach_execute_request(self) -> pb2.ReattachExecuteRequest: reattach = pb2.ReattachExecuteRequest( @@ -231,6 +264,7 @@ class ExecutePlanResponseReattachableIterator(Generator): super().throw(type, value, traceback) def close(self) -> None: + self._release_all() return super().close() def __del__(self) -> None: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org