This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2966c791cb90 [SPARK-46042][CONNECT] Reenable a `releaseSession` test 
case in SparkConnectServiceE2ESuite
2966c791cb90 is described below

commit 2966c791cb9048e86407b03053a16033a56f422e
Author: Hyukjin Kwon <gurwls...@apache.org>
AuthorDate: Wed Nov 22 02:15:13 2023 -0800

    [SPARK-46042][CONNECT] Reenable a `releaseSession` test case in 
SparkConnectServiceE2ESuite
    
    ### What changes were proposed in this pull request?
    
    This PR fixes the request stream iterator within 
`ExecutePlanResponseReattachableIterator` explicitly lazy.
    
    ### Why are the changes needed?
    
    In order to preserve the previous behaviour at gRPC 1.56.0. After upgrading 
gRPC from 1.56.0 to 1.59.3, it makes the first request when the stream is 
created, but our code here assumes that no request is made before that.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, the upgrade (SPARK-46039) has not been released out yet.
    
    ### How was this patch tested?
    
    Fixed unittests. I manually debugged via IDE.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    Np.
    
    Closes #43955 from HyukjinKwon/SPARK-46042.
    
    Authored-by: Hyukjin Kwon <gurwls...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../ExecutePlanResponseReattachableIterator.scala     | 19 ++++++++++++++++---
 .../connect/service/SparkConnectServiceE2ESuite.scala |  8 ++++----
 2 files changed, 20 insertions(+), 7 deletions(-)

diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
index 5854a9225dbe..dff9a53991f8 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala
@@ -101,7 +101,20 @@ class ExecutePlanResponseReattachableIterator(
   // throw error on first iter.hasNext() or iter.next()
   // Visible for testing.
   private[connect] var iter: 
Option[java.util.Iterator[proto.ExecutePlanResponse]] =
-    Some(rawBlockingStub.executePlan(initialRequest))
+    Some(makeLazyIter(rawBlockingStub.executePlan(initialRequest)))
+
+  // Creates a request that contains the query and returns a stream of 
`ExecutePlanResponse`.
+  // After upgrading gRPC from 1.56.0 to 1.59.3, it makes the first request 
when
+  // the stream is created, but here the code here assumes that no request is 
made before
+  // that, see also SPARK-46042
+  private def makeLazyIter(f: => java.util.Iterator[proto.ExecutePlanResponse])
+      : java.util.Iterator[proto.ExecutePlanResponse] = {
+    new java.util.Iterator[proto.ExecutePlanResponse] {
+      private lazy val internalIter = f
+      override def hasNext: Boolean = internalIter.hasNext
+      override def next(): proto.ExecutePlanResponse = internalIter.next
+    }
+  }
 
   // Server side session ID, used to detect if the server side session 
changed. This is set upon
   // receiving the first response from the server.
@@ -228,7 +241,7 @@ class ExecutePlanResponseReattachableIterator(
   private def callIter[V](iterFun: 
java.util.Iterator[proto.ExecutePlanResponse] => V) = {
     try {
       if (iter.isEmpty) {
-        iter = 
Some(rawBlockingStub.reattachExecute(createReattachExecuteRequest()))
+        iter = 
Some(makeLazyIter(rawBlockingStub.reattachExecute(createReattachExecuteRequest())))
       }
       iterFun(iter.get)
     } catch {
@@ -241,7 +254,7 @@ class ExecutePlanResponseReattachableIterator(
             ex)
         }
         // Try a new ExecutePlan, and throw upstream for retry.
-        iter = Some(rawBlockingStub.executePlan(initialRequest))
+        iter = Some(makeLazyIter(rawBlockingStub.executePlan(initialRequest)))
         val error = new RetryException()
         error.addSuppressed(ex)
         throw error
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
index aee0ad6e7859..7bd4bd742c95 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala
@@ -77,10 +77,10 @@ class SparkConnectServiceE2ESuite extends 
SparkConnectServerTest {
 
       // query3 has not been submitted before, so it should now fail with 
SESSION_CLOSED
       // TODO(SPARK-46042) Reenable a `releaseSession` test case in 
SparkConnectServiceE2ESuite
-      // val query3Error = intercept[SparkException] {
-      //   query3.hasNext
-      // }
-      // 
assert(query3Error.getMessage.contains("INVALID_HANDLE.SESSION_CLOSED"))
+      val query3Error = intercept[SparkException] {
+        query3.hasNext
+      }
+      assert(query3Error.getMessage.contains("INVALID_HANDLE.SESSION_CLOSED"))
 
       // No other requests should be allowed in the session, failing with 
SESSION_CLOSED
       val requestError = intercept[SparkException] {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to