This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c1c4a79ff17 [SPARK-44421][CONNECT][FOLLOWUP] Minor comment improvements
c1c4a79ff17 is described below

commit c1c4a79ff1728dca0c1536b944c10d282eb13f9f
Author: Juliusz Sompolski <ju...@databricks.com>
AuthorDate: Wed Aug 2 12:49:11 2023 +0900

    [SPARK-44421][CONNECT][FOLLOWUP] Minor comment improvements
    
    ### What changes were proposed in this pull request?
    
    Improve some comments about iterator retries.
    
    ### Why are the changes needed?
    
    Improve comments based on followup questions.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    No code changes, only comment changes.
    
    Closes #42281 from juliuszsompolski/SPARK-44624-comment-only.
    
    Lead-authored-by: Juliusz Sompolski <ju...@databricks.com>
    Co-authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../client/ExecutePlanResponseReattachableIterator.scala      | 11 +++++++++--
 .../apache/spark/sql/connect/client/GrpcRetryHandler.scala    |  3 +++
 2 files changed, 12 insertions(+), 2 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
index c6f75928a3a..00787b8f94d 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
@@ -40,8 +40,13 @@ import org.apache.spark.internal.Logging
  * ExecutePlanResponse on the iterator to return a new iterator from server 
that continues after
  * that.
  *
- * Since in reattachable execute the server does buffer some responses in case 
the client needs to
- * backtrack
+ * In reattachable execute the server does buffer some responses in case the 
client needs to
+ * backtrack. To let server release this buffer sooner, this iterator 
asynchronously sends
+ * ReleaseExecute RPCs that instruct the server to release responses that it 
already processed.
+ *
+ * Note: If the initial ExecutePlan did not even reach the server and 
execution didn't start, the
+ * ReattachExecute can still fail with INVALID_HANDLE.OPERATION_NOT_FOUND, 
failing the whole
+ * operation.
  */
 class ExecutePlanResponseReattachableIterator(
     request: proto.ExecutePlanRequest,
@@ -86,6 +91,8 @@ class ExecutePlanResponseReattachableIterator(
   private var responseComplete: Boolean = false
 
   // Initial iterator comes from ExecutePlan request.
+  // Note: This is not retried, because no error would ever be thrown here, 
and GRPC will only
+  // throw error on first iterator.hasNext() or iterator.next()
   private var iterator: java.util.Iterator[proto.ExecutePlanResponse] =
     rawBlockingStub.executePlan(initialRequest)
 
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
index 16352bb90b5..ef446399f16 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/connect/client/GrpcRetryHandler.scala
@@ -99,6 +99,9 @@ private[client] class GrpcRetryHandler(private val 
retryPolicy: GrpcRetryHandler
       extends StreamObserver[U] {
 
     private var opened = false // only retries on first call
+
+    // Note: This is not retried, because no error would ever be thrown here, 
and GRPC will only
+    // throw error on first iterator.hasNext() or iterator.next()
     private var streamObserver = call(request)
 
     override def onNext(v: U): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to