This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.5 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.5 by this push: new 98cd980f57e [SPARK-44765][CONNECT] Simplify retries of ReleaseExecute 98cd980f57e is described below commit 98cd980f57e9e78e5a18288cc28f3f4653a02ebe Author: Juliusz Sompolski <ju...@databricks.com> AuthorDate: Fri Aug 11 10:28:25 2023 +0900 [SPARK-44765][CONNECT] Simplify retries of ReleaseExecute ### What changes were proposed in this pull request? Simplify retries of ReleaseExecute in ExecutePlanResponseReattachableIterator. Instead of chaining asynchronous calls, use the common retry loop logic from the asynchronous onError after first error. This allows to reuse the common `retry` function instead of having to duplicate ``` case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries => Thread.sleep( (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis) ``` logic. This also brings this retries to be more similar to how it is in the python client. ### Why are the changes needed? Code simplification. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Checked retries by printing `new Exception().printStackTrace` from the handler: ``` java.lang.Exception at org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator$$anon$1.$anonfun$onError$1(ExecutePlanResponseReattachableIterator.scala:242) at org.apache.spark.sql.connect.client.GrpcRetryHandler$.retry(GrpcRetryHandler.scala:169) at org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator.org$apache$spark$sql$connect$client$ExecutePlanResponseReattachableIterator$$retry(ExecutePlanResponseReattachableIterator.scala:306) at org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator$$anon$1.onError(ExecutePlanResponseReattachableIterator.scala:241) at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:491) at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567) at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) java.lang.Exception at org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator$$anon$1.$anonfun$onError$1(ExecutePlanResponseReattachableIterator.scala:242) at org.apache.spark.sql.connect.client.GrpcRetryHandler$.retry(GrpcRetryHandler.scala:169) at org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator.org$apache$spark$sql$connect$client$ExecutePlanResponseReattachableIterator$$retry(ExecutePlanResponseReattachableIterator.scala:306) at org.apache.spark.sql.connect.client.ExecutePlanResponseReattachableIterator$$anon$1.onError(ExecutePlanResponseReattachableIterator.scala:241) at io.grpc.stub.ClientCalls$StreamObserverToCallListenerAdapter.onClose(ClientCalls.java:491) at io.grpc.internal.ClientCallImpl.closeObserver(ClientCallImpl.java:567) at io.grpc.internal.ClientCallImpl.access$300(ClientCallImpl.java:71) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInternal(ClientCallImpl.java:735) at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl$1StreamClosed.runInContext(ClientCallImpl.java:716) at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ... ``` Closes #42438 from juliuszsompolski/SPARK-44765. Authored-by: Juliusz Sompolski <ju...@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 9bde882fcb39e9fedced0df9702df2a36c1a84e6) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../ExecutePlanResponseReattachableIterator.scala | 36 +++++++++++++--------- 1 file changed, 21 insertions(+), 15 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala index 7a50801d8a6..5ef1151682b 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala @@ -180,7 +180,7 @@ class ExecutePlanResponseReattachableIterator( private def releaseUntil(untilResponseId: String): Unit = { if (!resultComplete) { val request = createReleaseExecuteRequest(Some(untilResponseId)) - rawAsyncStub.releaseExecute(request, createRetryingReleaseExecuteResponseObserer(request)) + rawAsyncStub.releaseExecute(request, createRetryingReleaseExecuteResponseObserver(request)) } } @@ -194,7 +194,7 @@ class ExecutePlanResponseReattachableIterator( private def releaseAll(): Unit = { if (!resultComplete) { val request = createReleaseExecuteRequest(None) - rawAsyncStub.releaseExecute(request, createRetryingReleaseExecuteResponseObserer(request)) + rawAsyncStub.releaseExecute(request, createRetryingReleaseExecuteResponseObserver(request)) resultComplete = true } } @@ -229,22 +229,28 @@ class ExecutePlanResponseReattachableIterator( * ReleaseExecute and continues with iteration, but if it fails with a retryable error, the * callback will retrigger the asynchronous ReleaseExecute. */ - private def createRetryingReleaseExecuteResponseObserer( - requestForRetry: proto.ReleaseExecuteRequest, - currentRetryNum: Int = 0): StreamObserver[proto.ReleaseExecuteResponse] = { + private def createRetryingReleaseExecuteResponseObserver( + requestForRetry: proto.ReleaseExecuteRequest) + : StreamObserver[proto.ReleaseExecuteResponse] = { new StreamObserver[proto.ReleaseExecuteResponse] { override def onNext(v: proto.ReleaseExecuteResponse): Unit = {} override def onCompleted(): Unit = {} - override def onError(t: Throwable): Unit = t match { - case NonFatal(e) if retryPolicy.canRetry(e) && currentRetryNum < retryPolicy.maxRetries => - Thread.sleep( - (retryPolicy.maxBackoff min retryPolicy.initialBackoff * Math - .pow(retryPolicy.backoffMultiplier, currentRetryNum)).toMillis) - rawAsyncStub.releaseExecute( - requestForRetry, - createRetryingReleaseExecuteResponseObserer(requestForRetry, currentRetryNum + 1)) - case _ => - logWarning(s"ReleaseExecute failed with exception: $t.") + override def onError(t: Throwable): Unit = { + var firstTry = true + try { + retry { + if (firstTry) { + firstTry = false + throw t // we already failed once, handle first retry + } else { + // we already are in async execution thread, can execute further retries sync + rawBlockingStub.releaseExecute(requestForRetry) + } + } + } catch { + case NonFatal(e) => + logWarning(s"ReleaseExecute failed with exception: $e.") + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org