This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new b2966d762721 [SPARK-44624][CONNECT] Retry ExecutePlan in case initial 
request didn't reach server
b2966d762721 is described below

commit b2966d7627216845d6a1c3854077a02c6d4e84c5
Author: Juliusz Sompolski <ju...@databricks.com>
AuthorDate: Fri Aug 4 12:05:19 2023 +0900

    [SPARK-44624][CONNECT] Retry ExecutePlan in case initial request didn't 
reach server
    
    ### What changes were proposed in this pull request?
    
    If the ExecutePlan never reached the server, a ReattachExecute will fail 
with INVALID_HANDLE.OPERATION_NOT_FOUND. In that case, we could try to send 
ExecutePlan again.
    
    ### Why are the changes needed?
    
    This solves an edge case of reattachable execution where the initial 
execution never reached the server.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Testing these failures is difficult, will require some special testing setup
    
    Closes #42282 from juliuszsompolski/SPARK-44624-fix.
    
    Authored-by: Juliusz Sompolski <ju...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit 52437bc73695e392bee60fbb340b6de4324b25d8)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../ExecutePlanResponseReattachableIterator.scala  | 43 +++++++++++++++++-----
 .../sql/connect/client/GrpcRetryHandler.scala      | 10 ++++-
 2 files changed, 43 insertions(+), 10 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
index fc07deaa081f..41648c3c1004 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
@@ -20,7 +20,8 @@ import java.util.UUID
 
 import scala.util.control.NonFatal
 
-import io.grpc.ManagedChannel
+import io.grpc.{ManagedChannel, StatusRuntimeException}
+import io.grpc.protobuf.StatusProto
 import io.grpc.stub.StreamObserver
 
 import org.apache.spark.connect.proto
@@ -38,15 +39,12 @@ import org.apache.spark.internal.Logging
  * Initial iterator is the result of an ExecutePlan on the request, but it can 
be reattached with
  * ReattachExecute request. ReattachExecute request is provided the responseId 
of last returned
  * ExecutePlanResponse on the iterator to return a new iterator from server 
that continues after
- * that.
+ * that. If the initial ExecutePlan did not even reach the server, and hence 
reattach fails with
+ * INVALID_HANDLE.OPERATION_NOT_FOUND, we attempt to retry ExecutePlan.
  *
  * In reattachable execute the server does buffer some responses in case the 
client needs to
  * backtrack. To let server release this buffer sooner, this iterator 
asynchronously sends
  * ReleaseExecute RPCs that instruct the server to release responses that it 
already processed.
- *
- * Note: If the initial ExecutePlan did not even reach the server and 
execution didn't start, the
- * ReattachExecute can still fail with INVALID_HANDLE.OPERATION_NOT_FOUND, 
failing the whole
- * operation.
  */
 class ExecutePlanResponseReattachableIterator(
     request: proto.ExecutePlanRequest,
@@ -113,7 +111,7 @@ class ExecutePlanResponseReattachableIterator(
           // on retry, the iterator is borked, so we need a new one
           iterator = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
         }
-        iterator.next()
+        callIter(_.next())
       }
 
       // Record last returned response, to know where to restart in case of 
reattach.
@@ -146,7 +144,7 @@ class ExecutePlanResponseReattachableIterator(
           // on retry, the iterator is borked, so we need a new one
           iterator = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
         }
-        var hasNext = iterator.hasNext()
+        var hasNext = callIter(_.hasNext())
         // Graceful reattach:
         // If iterator ended, but there was no ResultComplete, it means that 
there is more,
         // and we need to reattach.
@@ -154,7 +152,7 @@ class ExecutePlanResponseReattachableIterator(
           do {
             iterator = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
             assert(!resultComplete) // shouldn't change...
-            hasNext = iterator.hasNext()
+            hasNext = callIter(_.hasNext())
             // It's possible that the new iterator will be empty, so we need 
to loop to get another.
             // Eventually, there will be a non empty iterator, because there 
is always a
             // ResultComplete inserted by the server at the end of the stream.
@@ -197,6 +195,33 @@ class ExecutePlanResponseReattachableIterator(
     }
   }
 
+  /**
+   * Call next() or hasNext() on the iterator. If this fails with this 
operationId not existing on
+   * the server, this means that the initial ExecutePlan request didn't even 
reach the server. In
+   * that case, attempt to start again with ExecutePlan.
+   *
+   * Called inside retry block, so retryable failure will get handled upstream.
+   */
+  private def callIter[V](iterFun: 
java.util.Iterator[proto.ExecutePlanResponse] => V) = {
+    try {
+      iterFun(iterator)
+    } catch {
+      case ex: StatusRuntimeException
+          if StatusProto
+            .fromThrowable(ex)
+            .getMessage
+            .contains("INVALID_HANDLE.OPERATION_NOT_FOUND") =>
+        if (lastReturnedResponseId.isDefined) {
+          throw new IllegalStateException(
+            "OPERATION_NOT_FOUND on the server but responses were already 
received from it.",
+            ex)
+        }
+        // Try a new ExecutePlan, and throw upstream for retry.
+        iterator = rawBlockingStub.executePlan(initialRequest)
+        throw new GrpcRetryHandler.RetryException
+    }
+  }
+
   /**
    * Create result callback to the asynchronouse ReleaseExecute. The client 
does not block on
    * ReleaseExecute and continues with iteration, but if it fails with a 
retryable error, the
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
index ef446399f167..47ff975b2675 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
@@ -164,7 +164,9 @@ private[client] object GrpcRetryHandler extends Logging {
     try {
       return fn
     } catch {
-      case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < 
retryPolicy.maxRetries =>
+      case NonFatal(e)
+          if (retryPolicy.canRetry(e) || e.isInstanceOf[RetryException])
+            && currentRetryNum < retryPolicy.maxRetries =>
         logWarning(
           s"Non fatal error during RPC execution: $e, " +
             s"retrying (currentRetryNum=$currentRetryNum)")
@@ -209,4 +211,10 @@ private[client] object GrpcRetryHandler extends Logging {
       maxBackoff: FiniteDuration = FiniteDuration(1, "min"),
       backoffMultiplier: Double = 4.0,
       canRetry: Throwable => Boolean = retryException) {}
+
+  /**
+   * An exception that can be thrown upstream when inside retry and which will 
be retryable
+   * regardless of policy.
+   */
+  class RetryException extends Throwable
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to