This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new cd653cff3cbe [SPARK-44664][PYTHON][CONNECT] Release the execute when 
closing the iterator in Python client
cd653cff3cbe is described below

commit cd653cff3cbe37a1ab06d74a58d29b4264335506
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Fri Aug 4 12:03:54 2023 +0900

    [SPARK-44664][PYTHON][CONNECT] Release the execute when closing the 
iterator in Python client
    
    This PR implements the symmetry of 
https://github.com/apache/spark/pull/42331 and 
https://github.com/apache/spark/pull/42304
    
    1. It releases the execute when the error is raised during the iteration
    2. When you explicitly close the generator, (e.g., either 
`generator.close()` or explicit `GeneratorExit`), it releases the execution.
    
    For the feature parity, see also https://github.com/apache/spark/pull/42331 
and https://github.com/apache/spark/pull/42304
    
    See also https://github.com/apache/spark/pull/42331 and 
https://github.com/apache/spark/pull/42304
    
    Tests will be added separately.
    
    Closes #42330 from HyukjinKwon/python-error-release.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 492f6fac02a00b9ad545d84fa3f10a021a8e71b9)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/sql/connect/client/reattach.py | 110 +++++++++++++++++---------
 1 file changed, 72 insertions(+), 38 deletions(-)

diff --git a/python/pyspark/sql/connect/client/reattach.py 
b/python/pyspark/sql/connect/client/reattach.py
index 4d4cce0ca441..702107d97f54 100644
--- a/python/pyspark/sql/connect/client/reattach.py
+++ b/python/pyspark/sql/connect/client/reattach.py
@@ -111,9 +111,9 @@ class ExecutePlanResponseReattachableIterator(Generator):
         self._last_returned_response_id = ret.response_id
         if ret.HasField("result_complete"):
             self._result_complete = True
-            self._release_execute(None)  # release all
+            self._release_all()
         else:
-            self._release_execute(self._last_returned_response_id)
+            self._release_until(self._last_returned_response_id)
         self._current = None
         return ret
 
@@ -125,61 +125,93 @@ class ExecutePlanResponseReattachableIterator(Generator):
             # After response complete response
             return False
         else:
-            for attempt in Retrying(
-                can_retry=SparkConnectClient.retry_exception, 
**self._retry_policy
-            ):
-                with attempt:
-                    # on first try, we use the existing iterator.
-                    if not attempt.is_first_try():
-                        # on retry, the iterator is borked, so we need a new 
one
-                        self._iterator = iter(
-                            
self._stub.ReattachExecute(self._create_reattach_execute_request())
-                        )
-
-                    if self._current is None:
-                        try:
-                            self._current = next(self._iterator)
-                        except StopIteration:
-                            pass
-
-                    has_next = self._current is not None
-
-                    # Graceful reattach:
-                    # If iterator ended, but there was no ResponseComplete, it 
means that
-                    # there is more, and we need to reattach. While 
ResponseComplete didn't
-                    # arrive, we keep reattaching.
-                    if not self._result_complete and not has_next:
-                        while not has_next:
+            try:
+                for attempt in Retrying(
+                    can_retry=SparkConnectClient.retry_exception, 
**self._retry_policy
+                ):
+                    with attempt:
+                        # on first try, we use the existing iterator.
+                        if not attempt.is_first_try():
+                            # on retry, the iterator is borked, so we need a 
new one
                             self._iterator = iter(
                                 
self._stub.ReattachExecute(self._create_reattach_execute_request())
                             )
-                            # shouldn't change
-                            assert not self._result_complete
+
+                        if self._current is None:
                             try:
                                 self._current = next(self._iterator)
                             except StopIteration:
                                 pass
-                            has_next = self._current is not None
-                    return has_next
+
+                        has_next = self._current is not None
+
+                        # Graceful reattach:
+                        # If iterator ended, but there was no 
ResponseComplete, it means that
+                        # there is more, and we need to reattach. While 
ResponseComplete didn't
+                        # arrive, we keep reattaching.
+                        if not self._result_complete and not has_next:
+                            while not has_next:
+                                self._iterator = iter(
+                                    self._stub.ReattachExecute(
+                                        self._create_reattach_execute_request()
+                                    )
+                                )
+                                # shouldn't change
+                                assert not self._result_complete
+                                try:
+                                    self._current = next(self._iterator)
+                                except StopIteration:
+                                    pass
+                                has_next = self._current is not None
+                        return has_next
+            except Exception as e:
+                self._release_all()
+                raise e
             return False
 
-    def _release_execute(self, until_response_id: Optional[str]) -> None:
+    def _release_until(self, until_response_id: str) -> None:
         """
-        Inform the server to release the execution.
+        Inform the server to release the buffered execution results until and 
including given
+        result.
 
         This will send an asynchronous RPC which will not block this iterator, 
the iterator can
         continue to be consumed.
+        """
+        if self._result_complete:
+            return
+
+        from pyspark.sql.connect.client.core import SparkConnectClient
+        from pyspark.sql.connect.client.core import Retrying
 
-        Release with untilResponseId informs the server that the iterator has 
been consumed until
-        and including response with that responseId, and these responses can 
be freed.
+        request = self._create_release_execute_request(until_response_id)
 
-        Release with None means that the responses have been completely 
consumed and informs the
-        server that the completed execution can be completely freed.
+        def target() -> None:
+            try:
+                for attempt in Retrying(
+                    can_retry=SparkConnectClient.retry_exception, 
**self._retry_policy
+                ):
+                    with attempt:
+                        self._stub.ReleaseExecute(request)
+            except Exception as e:
+                warnings.warn(f"ReleaseExecute failed with exception: {e}.")
+
+        
ExecutePlanResponseReattachableIterator._release_thread_pool.apply_async(target)
+
+    def _release_all(self) -> None:
+        """
+        Inform the server to release the execution, either because all results 
were consumed,
+        or the execution finished with error and the error was received.
+
+        This will send an asynchronous RPC which will not block this. The 
client continues
+        executing, and if the release fails, server is equipped to deal with 
abandoned executions.
         """
+        if self._result_complete:
+            return
+
         from pyspark.sql.connect.client.core import SparkConnectClient
         from pyspark.sql.connect.client.core import Retrying
 
-        request = self._create_release_execute_request(until_response_id)
+        request = self._create_release_execute_request(None)
 
         def target() -> None:
             try:
@@ -192,6 +224,7 @@ class ExecutePlanResponseReattachableIterator(Generator):
                 warnings.warn(f"ReleaseExecute failed with exception: {e}.")
 
         
ExecutePlanResponseReattachableIterator._release_thread_pool.apply_async(target)
+        self._result_complete = True
 
     def _create_reattach_execute_request(self) -> pb2.ReattachExecuteRequest:
         reattach = pb2.ReattachExecuteRequest(
@@ -231,6 +264,7 @@ class ExecutePlanResponseReattachableIterator(Generator):
         super().throw(type, value, traceback)
 
     def close(self) -> None:
+        self._release_all()
         return super().close()
 
     def __del__(self) -> None:


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to