This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 43a853660b0 [SPARK-44642][CONNECT] ReleaseExecute in ExecutePlanResponseReattachableIterator after it gets error from server 43a853660b0 is described below commit 43a853660b08aa176bb8eb194ec74043006f219f Author: Juliusz Sompolski <ju...@databricks.com> AuthorDate: Fri Aug 4 08:45:44 2023 +0900 [SPARK-44642][CONNECT] ReleaseExecute in ExecutePlanResponseReattachableIterator after it gets error from server ### What changes were proposed in this pull request? Client: When server returns error on the response stream via onError, the ExecutePlanResponseReattachableIterator will not see the stream finish with a ResultsComplete. Instead, a StatusRuntimeException will be thrown from next() or hasNext(). Handle catching that exception, telling the server to ReleaseExecute when we receive it, and rethrow it to the user. Server: We also have to tweak the behaviour of ReleaseAll to also interrupt the query. The previous behaviour that in case of a running query one has to first send an interrupt, and then release was done to prevent race conditions of an interrupt coming after ResultComplete. Now, this has been resolved with proper synchronization at the final moments of execution in ExecuteThreadRunner, and as we want the release to be async, having one ReleaseExecutel vs. needing a combination of Interrupt+R [...] ### Why are the changes needed? If ReleaseExecute is not called by the client to acknowledge that the error was received, the execution will keep dangling on the server until cleaned up by timeout. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Automated tests will come with https://issues.apache.org/jira/browse/SPARK-44625. Closes #42304 from juliuszsompolski/SPARK-44642. Authored-by: Juliusz Sompolski <ju...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit d2d43b888aebbb5d4099faec26b076ef390890ce) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../ExecutePlanResponseReattachableIterator.scala | 120 ++++++++++++--------- .../src/main/protobuf/spark/connect/base.proto | 4 +- .../connect/execution/ExecuteThreadRunner.scala | 31 ++++-- .../spark/sql/connect/service/ExecuteHolder.scala | 6 +- .../connect/planner/SparkConnectServiceSuite.scala | 25 +++-- python/pyspark/sql/connect/proto/base_pb2.pyi | 4 +- 6 files changed, 116 insertions(+), 74 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala index 008b3c3dd5c..fc07deaa081 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala @@ -102,28 +102,33 @@ class ExecutePlanResponseReattachableIterator( throw new java.util.NoSuchElementException() } - // Get next response, possibly triggering reattach in case of stream error. - var firstTry = true - val ret = retry { - if (firstTry) { - // on first try, we use the existing iterator. - firstTry = false - } else { - // on retry, the iterator is borked, so we need a new one - iterator = rawBlockingStub.reattachExecute(createReattachExecuteRequest()) + try { + // Get next response, possibly triggering reattach in case of stream error. + var firstTry = true + val ret = retry { + if (firstTry) { + // on first try, we use the existing iterator. + firstTry = false + } else { + // on retry, the iterator is borked, so we need a new one + iterator = rawBlockingStub.reattachExecute(createReattachExecuteRequest()) + } + iterator.next() } - iterator.next() - } - // Record last returned response, to know where to restart in case of reattach. - lastReturnedResponseId = Some(ret.getResponseId) - if (ret.hasResultComplete) { - resultComplete = true - releaseExecute(None) // release all - } else { - releaseExecute(lastReturnedResponseId) // release until this response + // Record last returned response, to know where to restart in case of reattach. + lastReturnedResponseId = Some(ret.getResponseId) + if (ret.hasResultComplete) { + releaseAll() + } else { + releaseUntil(lastReturnedResponseId.get) + } + ret + } catch { + case NonFatal(ex) => + releaseAll() // ReleaseExecute on server after error. + throw ex } - ret } override def hasNext(): Boolean = synchronized { @@ -132,47 +137,64 @@ class ExecutePlanResponseReattachableIterator( return false } var firstTry = true - retry { - if (firstTry) { - // on first try, we use the existing iterator. - firstTry = false - } else { - // on retry, the iterator is borked, so we need a new one - iterator = rawBlockingStub.reattachExecute(createReattachExecuteRequest()) - } - var hasNext = iterator.hasNext() - // Graceful reattach: - // If iterator ended, but there was no ResultComplete, it means that there is more, - // and we need to reattach. - if (!hasNext && !resultComplete) { - do { + try { + retry { + if (firstTry) { + // on first try, we use the existing iterator. + firstTry = false + } else { + // on retry, the iterator is borked, so we need a new one iterator = rawBlockingStub.reattachExecute(createReattachExecuteRequest()) - assert(!resultComplete) // shouldn't change... - hasNext = iterator.hasNext() - // It's possible that the new iterator will be empty, so we need to loop to get another. - // Eventually, there will be a non empty iterator, because there's always a ResultComplete - // at the end of the stream. - } while (!hasNext) + } + var hasNext = iterator.hasNext() + // Graceful reattach: + // If iterator ended, but there was no ResultComplete, it means that there is more, + // and we need to reattach. + if (!hasNext && !resultComplete) { + do { + iterator = rawBlockingStub.reattachExecute(createReattachExecuteRequest()) + assert(!resultComplete) // shouldn't change... + hasNext = iterator.hasNext() + // It's possible that the new iterator will be empty, so we need to loop to get another. + // Eventually, there will be a non empty iterator, because there is always a + // ResultComplete inserted by the server at the end of the stream. + } while (!hasNext) + } + hasNext } - hasNext + } catch { + case NonFatal(ex) => + releaseAll() // ReleaseExecute on server after error. + throw ex } } /** - * Inform the server to release the execution. + * Inform the server to release the buffered execution results until and including given result. * * This will send an asynchronous RPC which will not block this iterator, the iterator can * continue to be consumed. + */ + private def releaseUntil(untilResponseId: String): Unit = { + if (!resultComplete) { + val request = createReleaseExecuteRequest(Some(untilResponseId)) + rawAsyncStub.releaseExecute(request, createRetryingReleaseExecuteResponseObserer(request)) + } + } + + /** + * Inform the server to release the execution, either because all results were consumed, or the + * execution finished with error and the error was received. * - * Release with untilResponseId informs the server that the iterator has been consumed until and - * including response with that responseId, and these responses can be freed. - * - * Release with None means that the responses have been completely consumed and informs the - * server that the completed execution can be completely freed. + * This will send an asynchronous RPC which will not block this. The client continues executing, + * and if the release fails, server is equipped to deal with abandoned executions. */ - private def releaseExecute(untilResponseId: Option[String]): Unit = { - val request = createReleaseExecuteRequest(untilResponseId) - rawAsyncStub.releaseExecute(request, createRetryingReleaseExecuteResponseObserer(request)) + private def releaseAll(): Unit = { + if (!resultComplete) { + val request = createReleaseExecuteRequest(None) + rawAsyncStub.releaseExecute(request, createRetryingReleaseExecuteResponseObserer(request)) + resultComplete = true + } } /** diff --git a/connector/connect/common/src/main/protobuf/spark/connect/base.proto b/connector/connect/common/src/main/protobuf/spark/connect/base.proto index 151e828b3e9..79dbadba5bb 100644 --- a/connector/connect/common/src/main/protobuf/spark/connect/base.proto +++ b/connector/connect/common/src/main/protobuf/spark/connect/base.proto @@ -750,9 +750,7 @@ message ReleaseExecuteRequest { optional string client_type = 4; // Release and close operation completely. - // Note: This should be called when the server side operation is finished, and ExecutePlan or - // ReattachExecute are finished processing the result stream, or inside onComplete / onError. - // This will not interrupt a running execution, but block until it's finished. + // This will also interrupt the query if it is running execution, and wait for it to be torn down. message ReleaseAll {} // Release all responses from the operation response stream up to and including diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala index 662288177dc..930ccae5d4c 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala @@ -46,6 +46,8 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends private var completed: Boolean = false + private val lock = new Object + /** Launches the execution in a background thread, returns immediately. */ def start(): Unit = { executionThread.start() @@ -62,7 +64,7 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends * true if it was not interrupted before, false if it was already interrupted or completed. */ def interrupt(): Boolean = { - synchronized { + lock.synchronized { if (!interrupted && !completed) { // checking completed prevents sending interrupt onError after onCompleted interrupted = true @@ -119,7 +121,7 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends // Inner executeInternal is wrapped by execute() for error handling. private def executeInternal() = { // synchronized - check if already got interrupted while starting. - synchronized { + lock.synchronized { if (interrupted) { throw new InterruptedException() } @@ -160,14 +162,23 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends s"${executeHolder.request.getPlan.getOpTypeCase} not supported.") } - if (executeHolder.reattachable) { - // Reattachable execution sends a ResultComplete at the end of the stream - // to signal that there isn't more coming. - executeHolder.responseObserver.onNext(createResultComplete()) - } - synchronized { - // Prevent interrupt after onCompleted, and throwing error to an already closed stream. - completed = true + lock.synchronized { + // Synchronized before sending ResultComplete, and up until completing the result stream + // to prevent a situation in which a client of reattachable execution receives + // ResultComplete, and proceeds to send ReleaseExecute, and that triggers an interrupt + // before it finishes. + + if (interrupted) { + // check if it got interrupted at the very last moment + throw new InterruptedException() + } + completed = true // no longer interruptible + + if (executeHolder.reattachable) { + // Reattachable execution sends a ResultComplete at the end of the stream + // to signal that there isn't more coming. + executeHolder.responseObserver.onNext(createResultComplete()) + } executeHolder.responseObserver.onCompleted() } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala index a49c0a8bacf..4eb90f9f163 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala @@ -156,11 +156,11 @@ private[connect] class ExecuteHolder( } /** - * Close the execution and remove it from the session. Note: It blocks joining the - * ExecuteThreadRunner thread, so it assumes that it's called when the execution is ending or - * ended. If it is desired to kill the execution, interrupt() should be called first. + * Close the execution and remove it from the session. Note: it first interrupts the runner if + * it's still running, and it waits for it to finish. */ def close(): Unit = { + runner.interrupt() runner.join() eventsManager.postClosed() sessionHolder.removeExecuteHolder(operationId) diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala index c29a9b9b629..e833d12c4f5 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/planner/SparkConnectServiceSuite.scala @@ -493,24 +493,37 @@ class SparkConnectServiceSuite extends SharedSparkSession with MockitoSugar with .setSessionId(sessionId) .build() - // The observer is executed inside this thread. So - // we can perform the checks inside the observer. + // Even though the observer is executed inside this thread, this thread is also executing + // the SparkConnectService. If we throw an exception inside it, it will be caught by + // the ErrorUtils.handleError wrapping instance.executePlan and turned into an onError + // call with StatusRuntimeException, which will be eaten here. + var failures: mutable.ArrayBuffer[String] = new mutable.ArrayBuffer[String]() instance.executePlan( request, new StreamObserver[proto.ExecutePlanResponse] { override def onNext(v: proto.ExecutePlanResponse): Unit = { - fail("this should not receive responses") + // The query receives some pre-execution responses such as schema, but should + // never proceed to execution and get query results. + if (v.hasArrowBatch) { + failures += s"this should not receive query results but got $v" + } } override def onError(throwable: Throwable): Unit = { - assert(throwable.isInstanceOf[StatusRuntimeException]) - verifyEvents.onError(throwable) + try { + assert(throwable.isInstanceOf[StatusRuntimeException]) + verifyEvents.onError(throwable) + } catch { + case t: Throwable => + failures += s"assertion $t validating processing onError($throwable)." + } } override def onCompleted(): Unit = { - fail("this should not complete") + failures += "this should not complete" } }) + assert(failures.isEmpty, s"this should have no failures but got $failures") verifyEvents.onCompleted() } } diff --git a/python/pyspark/sql/connect/proto/base_pb2.pyi b/python/pyspark/sql/connect/proto/base_pb2.pyi index e870221594c..a886ecbd618 100644 --- a/python/pyspark/sql/connect/proto/base_pb2.pyi +++ b/python/pyspark/sql/connect/proto/base_pb2.pyi @@ -2554,9 +2554,7 @@ class ReleaseExecuteRequest(google.protobuf.message.Message): class ReleaseAll(google.protobuf.message.Message): """Release and close operation completely. - Note: This should be called when the server side operation is finished, and ExecutePlan or - ReattachExecute are finished processing the result stream, or inside onComplete / onError. - This will not interrupt a running execution, but block until it's finished. + This will also interrupt the query if it is running execution, and wait for it to be torn down. """ DESCRIPTOR: google.protobuf.descriptor.Descriptor --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org