This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 2966c791cb90 [SPARK-46042][CONNECT] Reenable a `releaseSession` test case in SparkConnectServiceE2ESuite 2966c791cb90 is described below commit 2966c791cb9048e86407b03053a16033a56f422e Author: Hyukjin Kwon <gurwls...@apache.org> AuthorDate: Wed Nov 22 02:15:13 2023 -0800 [SPARK-46042][CONNECT] Reenable a `releaseSession` test case in SparkConnectServiceE2ESuite ### What changes were proposed in this pull request? This PR fixes the request stream iterator within `ExecutePlanResponseReattachableIterator` explicitly lazy. ### Why are the changes needed? In order to preserve the previous behaviour at gRPC 1.56.0. After upgrading gRPC from 1.56.0 to 1.59.3, it makes the first request when the stream is created, but our code here assumes that no request is made before that. ### Does this PR introduce _any_ user-facing change? No, the upgrade (SPARK-46039) has not been released out yet. ### How was this patch tested? Fixed unittests. I manually debugged via IDE. ### Was this patch authored or co-authored using generative AI tooling? Np. Closes #43955 from HyukjinKwon/SPARK-46042. Authored-by: Hyukjin Kwon <gurwls...@apache.org> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../ExecutePlanResponseReattachableIterator.scala | 19 ++++++++++++++++--- .../connect/service/SparkConnectServiceE2ESuite.scala | 8 ++++---- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala index 5854a9225dbe..dff9a53991f8 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/ExecutePlanResponseReattachableIterator.scala @@ -101,7 +101,20 @@ class ExecutePlanResponseReattachableIterator( // throw error on first iter.hasNext() or iter.next() // Visible for testing. private[connect] var iter: Option[java.util.Iterator[proto.ExecutePlanResponse]] = - Some(rawBlockingStub.executePlan(initialRequest)) + Some(makeLazyIter(rawBlockingStub.executePlan(initialRequest))) + + // Creates a request that contains the query and returns a stream of `ExecutePlanResponse`. + // After upgrading gRPC from 1.56.0 to 1.59.3, it makes the first request when + // the stream is created, but here the code here assumes that no request is made before + // that, see also SPARK-46042 + private def makeLazyIter(f: => java.util.Iterator[proto.ExecutePlanResponse]) + : java.util.Iterator[proto.ExecutePlanResponse] = { + new java.util.Iterator[proto.ExecutePlanResponse] { + private lazy val internalIter = f + override def hasNext: Boolean = internalIter.hasNext + override def next(): proto.ExecutePlanResponse = internalIter.next + } + } // Server side session ID, used to detect if the server side session changed. This is set upon // receiving the first response from the server. @@ -228,7 +241,7 @@ class ExecutePlanResponseReattachableIterator( private def callIter[V](iterFun: java.util.Iterator[proto.ExecutePlanResponse] => V) = { try { if (iter.isEmpty) { - iter = Some(rawBlockingStub.reattachExecute(createReattachExecuteRequest())) + iter = Some(makeLazyIter(rawBlockingStub.reattachExecute(createReattachExecuteRequest()))) } iterFun(iter.get) } catch { @@ -241,7 +254,7 @@ class ExecutePlanResponseReattachableIterator( ex) } // Try a new ExecutePlan, and throw upstream for retry. - iter = Some(rawBlockingStub.executePlan(initialRequest)) + iter = Some(makeLazyIter(rawBlockingStub.executePlan(initialRequest))) val error = new RetryException() error.addSuppressed(ex) throw error diff --git a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala index aee0ad6e7859..7bd4bd742c95 100644 --- a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala +++ b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectServiceE2ESuite.scala @@ -77,10 +77,10 @@ class SparkConnectServiceE2ESuite extends SparkConnectServerTest { // query3 has not been submitted before, so it should now fail with SESSION_CLOSED // TODO(SPARK-46042) Reenable a `releaseSession` test case in SparkConnectServiceE2ESuite - // val query3Error = intercept[SparkException] { - // query3.hasNext - // } - // assert(query3Error.getMessage.contains("INVALID_HANDLE.SESSION_CLOSED")) + val query3Error = intercept[SparkException] { + query3.hasNext + } + assert(query3Error.getMessage.contains("INVALID_HANDLE.SESSION_CLOSED")) // No other requests should be allowed in the session, failing with SESSION_CLOSED val requestError = intercept[SparkException] { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org