This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new 43a853660b0 [SPARK-44642][CONNECT] ReleaseExecute in 
ExecutePlanResponseReattachableIterator after it gets error from server
43a853660b0 is described below

commit 43a853660b08aa176bb8eb194ec74043006f219f
Author: Juliusz Sompolski <ju...@databricks.com>
AuthorDate: Fri Aug 4 08:45:44 2023 +0900

    [SPARK-44642][CONNECT] ReleaseExecute in 
ExecutePlanResponseReattachableIterator after it gets error from server
    
    ### What changes were proposed in this pull request?
    
    Client:
    When server returns error on the response stream via onError, the 
ExecutePlanResponseReattachableIterator will not see the stream finish with a 
ResultsComplete. Instead, a StatusRuntimeException will be thrown from next() 
or hasNext(). Handle catching that exception, telling the server to 
ReleaseExecute when we receive it, and rethrow it to the user.
    
    Server:
    We also have to tweak the behaviour of ReleaseAll to also interrupt the 
query. The previous behaviour that in case of a running query one has to first 
send an interrupt, and then release was done to prevent race conditions of an 
interrupt coming after ResultComplete. Now, this has been resolved with proper 
synchronization at the final moments of execution in ExecuteThreadRunner, and 
as we want the release to be async, having one ReleaseExecutel vs. needing a 
combination of Interrupt+R [...]
    
    ### Why are the changes needed?
    
    If ReleaseExecute is not called by the client to acknowledge that the error 
was received, the execution will keep dangling on the server until cleaned up 
by timeout.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Automated tests will come with 
https://issues.apache.org/jira/browse/SPARK-44625.
    
    Closes #42304 from juliuszsompolski/SPARK-44642.
    
    Authored-by: Juliusz Sompolski <ju...@databricks.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
    (cherry picked from commit d2d43b888aebbb5d4099faec26b076ef390890ce)
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../ExecutePlanResponseReattachableIterator.scala  | 120 ++++++++++++---------
 .../src/main/protobuf/spark/connect/base.proto     |   4 +-
 .../connect/execution/ExecuteThreadRunner.scala    |  31 ++++--
 .../spark/sql/connect/service/ExecuteHolder.scala  |   6 +-
 .../connect/planner/SparkConnectServiceSuite.scala |  25 +++--
 python/pyspark/sql/connect/proto/base_pb2.pyi      |   4 +-
 6 files changed, 116 insertions(+), 74 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
index 008b3c3dd5c..fc07deaa081 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
@@ -102,28 +102,33 @@ class ExecutePlanResponseReattachableIterator(
       throw new java.util.NoSuchElementException()
     }
 
-    // Get next response, possibly triggering reattach in case of stream error.
-    var firstTry = true
-    val ret = retry {
-      if (firstTry) {
-        // on first try, we use the existing iterator.
-        firstTry = false
-      } else {
-        // on retry, the iterator is borked, so we need a new one
-        iterator = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
+    try {
+      // Get next response, possibly triggering reattach in case of stream 
error.
+      var firstTry = true
+      val ret = retry {
+        if (firstTry) {
+          // on first try, we use the existing iterator.
+          firstTry = false
+        } else {
+          // on retry, the iterator is borked, so we need a new one
+          iterator = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
+        }
+        iterator.next()
       }
-      iterator.next()
-    }
 
-    // Record last returned response, to know where to restart in case of 
reattach.
-    lastReturnedResponseId = Some(ret.getResponseId)
-    if (ret.hasResultComplete) {
-      resultComplete = true
-      releaseExecute(None) // release all
-    } else {
-      releaseExecute(lastReturnedResponseId) // release until this response
+      // Record last returned response, to know where to restart in case of 
reattach.
+      lastReturnedResponseId = Some(ret.getResponseId)
+      if (ret.hasResultComplete) {
+        releaseAll()
+      } else {
+        releaseUntil(lastReturnedResponseId.get)
+      }
+      ret
+    } catch {
+      case NonFatal(ex) =>
+        releaseAll() // ReleaseExecute on server after error.
+        throw ex
     }
-    ret
   }
 
   override def hasNext(): Boolean = synchronized {
@@ -132,47 +137,64 @@ class ExecutePlanResponseReattachableIterator(
       return false
     }
     var firstTry = true
-    retry {
-      if (firstTry) {
-        // on first try, we use the existing iterator.
-        firstTry = false
-      } else {
-        // on retry, the iterator is borked, so we need a new one
-        iterator = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
-      }
-      var hasNext = iterator.hasNext()
-      // Graceful reattach:
-      // If iterator ended, but there was no ResultComplete, it means that 
there is more,
-      // and we need to reattach.
-      if (!hasNext && !resultComplete) {
-        do {
+    try {
+      retry {
+        if (firstTry) {
+          // on first try, we use the existing iterator.
+          firstTry = false
+        } else {
+          // on retry, the iterator is borked, so we need a new one
           iterator = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
-          assert(!resultComplete) // shouldn't change...
-          hasNext = iterator.hasNext()
-          // It's possible that the new iterator will be empty, so we need to 
loop to get another.
-          // Eventually, there will be a non empty iterator, because there's 
always a ResultComplete
-          // at the end of the stream.
-        } while (!hasNext)
+        }
+        var hasNext = iterator.hasNext()
+        // Graceful reattach:
+        // If iterator ended, but there was no ResultComplete, it means that 
there is more,
+        // and we need to reattach.
+        if (!hasNext && !resultComplete) {
+          do {
+            iterator = 
rawBlockingStub.reattachExecute(createReattachExecuteRequest())
+            assert(!resultComplete) // shouldn't change...
+            hasNext = iterator.hasNext()
+            // It's possible that the new iterator will be empty, so we need 
to loop to get another.
+            // Eventually, there will be a non empty iterator, because there 
is always a
+            // ResultComplete inserted by the server at the end of the stream.
+          } while (!hasNext)
+        }
+        hasNext
       }
-      hasNext
+    } catch {
+      case NonFatal(ex) =>
+        releaseAll() // ReleaseExecute on server after error.
+        throw ex
     }
   }
 
   /**
-   * Inform the server to release the execution.
+   * Inform the server to release the buffered execution results until and 
including given result.
    *
    * This will send an asynchronous RPC which will not block this iterator, 
the iterator can
    * continue to be consumed.
+   */
+  private def releaseUntil(untilResponseId: String): Unit = {
+    if (!resultComplete) {
+      val request = createReleaseExecuteRequest(Some(untilResponseId))
+      rawAsyncStub.releaseExecute(request, 
createRetryingReleaseExecuteResponseObserer(request))
+    }
+  }
+
+  /**
+   * Inform the server to release the execution, either because all results 
were consumed, or the
+   * execution finished with error and the error was received.
    *
-   * Release with untilResponseId informs the server that the iterator has 
been consumed until and
-   * including response with that responseId, and these responses can be freed.
-   *
-   * Release with None means that the responses have been completely consumed 
and informs the
-   * server that the completed execution can be completely freed.
+   * This will send an asynchronous RPC which will not block this. The client 
continues executing,
+   * and if the release fails, server is equipped to deal with abandoned 
executions.
    */
-  private def releaseExecute(untilResponseId: Option[String]): Unit = {
-    val request = createReleaseExecuteRequest(untilResponseId)
-    rawAsyncStub.releaseExecute(request, 
createRetryingReleaseExecuteResponseObserer(request))
+  private def releaseAll(): Unit = {
+    if (!resultComplete) {
+      val request = createReleaseExecuteRequest(None)
+      rawAsyncStub.releaseExecute(request, 
createRetryingReleaseExecuteResponseObserer(request))
+      resultComplete = true
+    }
   }
 
   /**
diff --git 
a/connector/connect/common/src/main/protobuf/spark/connect/base.proto 
b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
index 151e828b3e9..79dbadba5bb 100644
--- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto
+++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto
@@ -750,9 +750,7 @@ message ReleaseExecuteRequest {
   optional string client_type = 4;
 
   // Release and close operation completely.
-  // Note: This should be called when the server side operation is finished, 
and ExecutePlan or
-  // ReattachExecute are finished processing the result stream, or inside 
onComplete / onError.
-  // This will not interrupt a running execution, but block until it's 
finished.
+  // This will also interrupt the query if it is running execution, and wait 
for it to be torn down.
   message ReleaseAll {}
 
   // Release all responses from the operation response stream up to and 
including
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
index 662288177dc..930ccae5d4c 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala
@@ -46,6 +46,8 @@ private[connect] class ExecuteThreadRunner(executeHolder: 
ExecuteHolder) extends
 
   private var completed: Boolean = false
 
+  private val lock = new Object
+
   /** Launches the execution in a background thread, returns immediately. */
   def start(): Unit = {
     executionThread.start()
@@ -62,7 +64,7 @@ private[connect] class ExecuteThreadRunner(executeHolder: 
ExecuteHolder) extends
    *   true if it was not interrupted before, false if it was already 
interrupted or completed.
    */
   def interrupt(): Boolean = {
-    synchronized {
+    lock.synchronized {
       if (!interrupted && !completed) {
         // checking completed prevents sending interrupt onError after 
onCompleted
         interrupted = true
@@ -119,7 +121,7 @@ private[connect] class ExecuteThreadRunner(executeHolder: 
ExecuteHolder) extends
   // Inner executeInternal is wrapped by execute() for error handling.
   private def executeInternal() = {
     // synchronized - check if already got interrupted while starting.
-    synchronized {
+    lock.synchronized {
       if (interrupted) {
         throw new InterruptedException()
       }
@@ -160,14 +162,23 @@ private[connect] class ExecuteThreadRunner(executeHolder: 
ExecuteHolder) extends
             s"${executeHolder.request.getPlan.getOpTypeCase} not supported.")
       }
 
-      if (executeHolder.reattachable) {
-        // Reattachable execution sends a ResultComplete at the end of the 
stream
-        // to signal that there isn't more coming.
-        executeHolder.responseObserver.onNext(createResultComplete())
-      }
-      synchronized {
-        // Prevent interrupt after onCompleted, and throwing error to an 
already closed stream.
-        completed = true
+      lock.synchronized {
+        // Synchronized before sending ResultComplete, and up until completing 
the result stream
+        // to prevent a situation in which a client of reattachable execution 
receives
+        // ResultComplete, and proceeds to send ReleaseExecute, and that 
triggers an interrupt
+        // before it finishes.
+
+        if (interrupted) {
+          // check if it got interrupted at the very last moment
+          throw new InterruptedException()
+        }
+        completed = true // no longer interruptible
+
+        if (executeHolder.reattachable) {
+          // Reattachable execution sends a ResultComplete at the end of the 
stream
+          // to signal that there isn't more coming.
+          executeHolder.responseObserver.onNext(createResultComplete())
+        }
         executeHolder.responseObserver.onCompleted()
       }
     }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
index a49c0a8bacf..4eb90f9f163 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala
@@ -156,11 +156,11 @@ private[connect] class ExecuteHolder(
   }
 
   /**
-   * Close the execution and remove it from the session. Note: It blocks 
joining the
-   * ExecuteThreadRunner thread, so it assumes that it's called when the 
execution is ending or
-   * ended. If it is desired to kill the execution, interrupt() should be 
called first.
+   * Close the execution and remove it from the session. Note: it first 
interrupts the runner if
+   * it's still running, and it waits for it to finish.
    */
   def close(): Unit = {
+    runner.interrupt()
     runner.join()
     eventsManager.postClosed()
     sessionHolder.removeExecuteHolder(operationId)
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
index c29a9b9b629..e833d12c4f5 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala
@@ -493,24 +493,37 @@ class SparkConnectServiceSuite extends SharedSparkSession 
with MockitoSugar with
         .setSessionId(sessionId)
         .build()
 
-      // The observer is executed inside this thread. So
-      // we can perform the checks inside the observer.
+      // Even though the observer is executed inside this thread, this thread 
is also executing
+      // the SparkConnectService. If we throw an exception inside it, it will 
be caught by
+      // the ErrorUtils.handleError wrapping instance.executePlan and turned 
into an onError
+      // call with StatusRuntimeException, which will be eaten here.
+      var failures: mutable.ArrayBuffer[String] = new 
mutable.ArrayBuffer[String]()
       instance.executePlan(
         request,
         new StreamObserver[proto.ExecutePlanResponse] {
           override def onNext(v: proto.ExecutePlanResponse): Unit = {
-            fail("this should not receive responses")
+            // The query receives some pre-execution responses such as schema, 
but should
+            // never proceed to execution and get query results.
+            if (v.hasArrowBatch) {
+              failures += s"this should not receive query results but got $v"
+            }
           }
 
           override def onError(throwable: Throwable): Unit = {
-            assert(throwable.isInstanceOf[StatusRuntimeException])
-            verifyEvents.onError(throwable)
+            try {
+              assert(throwable.isInstanceOf[StatusRuntimeException])
+              verifyEvents.onError(throwable)
+            } catch {
+              case t: Throwable =>
+                failures += s"assertion $t validating processing 
onError($throwable)."
+            }
           }
 
           override def onCompleted(): Unit = {
-            fail("this should not complete")
+            failures += "this should not complete"
           }
         })
+      assert(failures.isEmpty, s"this should have no failures but got 
$failures")
       verifyEvents.onCompleted()
     }
   }
diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi 
b/python/pyspark/sql/connect/proto/base_pb2.pyi
index e870221594c..a886ecbd618 100644
--- a/python/pyspark/sql/connect/proto/base_pb2.pyi
+++ b/python/pyspark/sql/connect/proto/base_pb2.pyi
@@ -2554,9 +2554,7 @@ class 
ReleaseExecuteRequest(google.protobuf.message.Message):
 
     class ReleaseAll(google.protobuf.message.Message):
         """Release and close operation completely.
-        Note: This should be called when the server side operation is 
finished, and ExecutePlan or
-        ReattachExecute are finished processing the result stream, or inside 
onComplete / onError.
-        This will not interrupt a running execution, but block until it's 
finished.
+        This will also interrupt the query if it is running execution, and 
wait for it to be torn down.
         """
 
         DESCRIPTOR: google.protobuf.descriptor.Descriptor


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to