This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 98cd980f57e [SPARK-44765][CONNECT] Simplify retries of ReleaseExecute
98cd980f57e is described below

commit 98cd980f57e9e78e5a18288cc28f3f4653a02ebe
Author: Juliusz Sompolski <ju...@databricks.com>
AuthorDate: Fri Aug 11 10:28:25 2023 +0900

    [SPARK-44765][CONNECT] Simplify retries of ReleaseExecute
    
    ### What changes were proposed in this pull request?
    
    Simplify retries of ReleaseExecute in 
ExecutePlanResponseReattachableIterator.
    Instead of chaining asynchronous calls, use the common retry loop logic 
from the asynchronous onError after first error.
    This allows to reuse the common `retry` function instead of having to 
duplicate
    ```
            case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < 
retryPolicy.maxRetries =>
              Thread.sleep(
                (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
                  .pow(retryPolicy.backoffMultiplier, 
currentRetryNum)).toMillis)
    ```
    logic.
    
    This also brings this retries to be more similar to how it is in the python 
client.
    
    ### Why are the changes needed?
    
    Code simplification.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Checked retries by printing `new Exception().printStackTrace` from the 
handler:
    ```
    java.lang.Exception
            at 
org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator$$anon$1.$anonfun$onError$1(ExecutePlanResponseReattachableIterator.scala:242)
            at 
org.apache.spark.sql.connect.client.GrpcRetryHandler$.retry(GrpcRetryHandler.scala:169)
            at 
org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator.org$apache$spark$sql$connect$client$ExecutePlanResponseReattachableIterator$$retry(ExecutePlanResponseReattachableIterator.scala:306)
            at 
org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator$$anon$1.onError(ExecutePlanResponseReattachableIterator.scala:241)
            at 
io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:491)
            at 
io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567)
            at 
io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71)
            at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735)
            at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716)
            at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
            at 
io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    java.lang.Exception
            at 
org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator$$anon$1.$anonfun$onError$1(ExecutePlanResponseReattachableIterator.scala:242)
            at 
org.apache.spark.sql.connect.client.GrpcRetryHandler$.retry(GrpcRetryHandler.scala:169)
            at 
org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator.org$apache$spark$sql$connect$client$ExecutePlanResponseReattachableIterator$$retry(ExecutePlanResponseReattachableIterator.scala:306)
            at 
org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator$$anon$1.onError(ExecutePlanResponseReattachableIterator.scala:241)
            at 
io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:491)
            at 
io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567)
            at 
io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71)
            at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735)
            at 
io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716)
            at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
            at 
io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
            at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
            at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
            at java.lang.Thread.run(Thread.java:748)
    ...
    ```
    
    Closes #42438 from juliuszsompolski/SPARK-44765.
    
    Authored-by: Juliusz Sompolski <ju...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 9bde882fcb39e9fedced0df9702df2a36c1a84e6)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../ExecutePlanResponseReattachableIterator.scala  | 36 +++++++++++++---------
 1 file changed, 21 insertions(+), 15 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
index 7a50801d8a6..5ef1151682b 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
@@ -180,7 +180,7 @@ class ExecutePlanResponseReattachableIterator(
   private def releaseUntil(untilResponseId: String): Unit = {
     if (!resultComplete) {
       val request = createReleaseExecuteRequest(Some(untilResponseId))
-      rawAsyncStub.releaseExecute(request, 
createRetryingReleaseExecuteResponseObserer(request))
+      rawAsyncStub.releaseExecute(request, 
createRetryingReleaseExecuteResponseObserver(request))
     }
   }
 
@@ -194,7 +194,7 @@ class ExecutePlanResponseReattachableIterator(
   private def releaseAll(): Unit = {
     if (!resultComplete) {
       val request = createReleaseExecuteRequest(None)
-      rawAsyncStub.releaseExecute(request, 
createRetryingReleaseExecuteResponseObserer(request))
+      rawAsyncStub.releaseExecute(request, 
createRetryingReleaseExecuteResponseObserver(request))
       resultComplete = true
     }
   }
@@ -229,22 +229,28 @@ class ExecutePlanResponseReattachableIterator(
    * ReleaseExecute and continues with iteration, but if it fails with a 
retryable error, the
    * callback will retrigger the asynchronous ReleaseExecute.
    */
-  private def createRetryingReleaseExecuteResponseObserer(
-      requestForRetry: proto.ReleaseExecuteRequest,
-      currentRetryNum: Int = 0): StreamObserver[proto.ReleaseExecuteResponse] 
= {
+  private def createRetryingReleaseExecuteResponseObserver(
+      requestForRetry: proto.ReleaseExecuteRequest)
+      : StreamObserver[proto.ReleaseExecuteResponse] = {
     new StreamObserver[proto.ReleaseExecuteResponse] {
       override def onNext(v: proto.ReleaseExecuteResponse): Unit = {}
       override def onCompleted(): Unit = {}
-      override def onError(t: Throwable): Unit = t match {
-        case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < 
retryPolicy.maxRetries =>
-          Thread.sleep(
-            (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math
-              .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis)
-          rawAsyncStub.releaseExecute(
-            requestForRetry,
-            createRetryingReleaseExecuteResponseObserer(requestForRetry, 
currentRetryNum + 1))
-        case _ =>
-          logWarning(s"ReleaseExecute failed with exception: $t.")
+      override def onError(t: Throwable): Unit = {
+        var firstTry = true
+        try {
+          retry {
+            if (firstTry) {
+              firstTry = false
+              throw t // we already failed once, handle first retry
+            } else {
+              // we already are in async execution thread, can execute further 
retries sync
+              rawBlockingStub.releaseExecute(requestForRetry)
+            }
+          }
+        } catch {
+          case NonFatal(e) =>
+            logWarning(s"ReleaseExecute failed with exception: $e.")
+        }
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to