This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new b2aead9f98d [SPARK-44872][CONNECT][FOLLOWUP] Deflake 
ReattachableExecuteSuite and increase retry buffer
b2aead9f98d is described below

commit b2aead9f98d900d139cff41d53f79a37e1e09e81
Author: Juliusz Sompolski <ju...@databricks.com>
AuthorDate: Fri Sep 15 19:06:58 2023 -0700

    [SPARK-44872][CONNECT][FOLLOWUP] Deflake ReattachableExecuteSuite and 
increase retry buffer
    
    ### What changes were proposed in this pull request?
    
    Deflake tests in ReattachableExecuteSuite and increase 
CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE.
    
    ### Why are the changes needed?
    
    Two tests could be flaky with errors 
`INVALID_CURSOR.POSITION_NOT_AVAILABLE`.
    This is caused when a server releases the response when it falls more than 
CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE behind the latest 
response it sent. However, because of HTTP2 flow control, the responses could 
still be in transit. In the test suite, we were explicitly disconnecting the 
iterators and later reconnect... In some cases they could not reconnect, 
because the response they last seen have fallen too fare behind.
    
    This not only changes the suite, but also adjust the default config. This 
potentially makes the reconnecting more robust. In normal situation, it should 
not lead to increased memory pressure, because the clients also release the 
responses using ReleaseExecute as soon as they are received. Normally, buffered 
responses should be freed by ReleaseExecute and this retry buffer is only a 
fallback mechanism. Therefore, it is safe to increase the default.
    
    In practice, this would only have effect in cases where there are actual 
network errors, and the increased buffer size should make the reconnects more 
robust in these cases.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    ReattachableExecuteSuite.
    Did more manual experiments of how far the response sent by client can be 
behind the response sent by server (because of HTTP2 flow control window)
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No.
    
    Closes #42908 from juliuszsompolski/SPARK-44872-followup.
    
    Authored-by: Juliusz Sompolski <ju...@databricks.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/sql/connect/config/Connect.scala  |  2 +-
 .../spark/sql/connect/SparkConnectServerTest.scala |  2 +-
 .../execution/ReattachableExecuteSuite.scala       | 26 +++++++++++++---------
 3 files changed, 18 insertions(+), 12 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
index 7b8b05ce11a..253ac38f9cf 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/config/Connect.scala
@@ -133,7 +133,7 @@ object Connect {
           "With any value greater than 0, the last sent response will always 
be buffered.")
       .version("3.5.0")
       .bytesConf(ByteUnit.BYTE)
-      .createWithDefaultString("1m")
+      .createWithDefaultString("10m")
 
   val CONNECT_EXTENSIONS_RELATION_CLASSES =
     buildStaticConf("spark.connect.extensions.relation.classes")
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
index 488858d33ea..eddd1c6be72 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/SparkConnectServerTest.scala
@@ -35,7 +35,7 @@ import org.apache.spark.sql.test.SharedSparkSession
  * Base class and utilities for a test suite that starts and tests the real 
SparkConnectService
  * with a real SparkConnectClient, communicating over RPC, but both in-process.
  */
-class SparkConnectServerTest extends SharedSparkSession {
+trait SparkConnectServerTest extends SharedSparkSession {
 
   // Server port
   val serverPort: Int =
diff --git 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
index 169b15582b6..0e29a07b719 100644
--- 
a/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
+++ 
b/connector/connect/server/src/test/scala/org/apache/spark/sql/connect/execution/ReattachableExecuteSuite.scala
@@ -22,7 +22,7 @@ import io.grpc.StatusRuntimeException
 import org.scalatest.concurrent.Eventually
 import org.scalatest.time.SpanSugar._
 
-import org.apache.spark.SparkException
+import org.apache.spark.{SparkEnv, SparkException}
 import org.apache.spark.sql.connect.SparkConnectServerTest
 import org.apache.spark.sql.connect.config.Connect
 import org.apache.spark.sql.connect.service.SparkConnectService
@@ -32,7 +32,7 @@ class ReattachableExecuteSuite extends SparkConnectServerTest 
{
   // Tests assume that this query will result in at least a couple 
ExecutePlanResponses on the
   // stream. If this is no longer the case because of changes in how much is 
returned in a single
   // ExecutePlanResponse, it may need to be adjusted.
-  val MEDIUM_RESULTS_QUERY = "select * from range(1000000)"
+  val MEDIUM_RESULTS_QUERY = "select * from range(10000000)"
 
   test("reattach after initial RPC ends") {
     withClient { client =>
@@ -138,13 +138,12 @@ class ReattachableExecuteSuite extends 
SparkConnectServerTest {
         val reattachIter = stub.reattachExecute(
           buildReattachExecuteRequest(operationId, 
Some(response.getResponseId)))
         assert(reattachIter.hasNext)
-        reattachIter.next()
-
-        // Nevertheless, the original iterator will handle the 
INVALID_CURSOR.DISCONNECTED error
-        iter.next()
-        // iterator changed because it had to reconnect
-        assert(reattachableIter.innerIterator ne initialInnerIter)
       }
+
+      // Nevertheless, the original iterator will handle the 
INVALID_CURSOR.DISCONNECTED error
+      iter.next()
+      // iterator changed because it had to reconnect
+      assert(reattachableIter.innerIterator ne initialInnerIter)
     }
   }
 
@@ -246,19 +245,26 @@ class ReattachableExecuteSuite extends 
SparkConnectServerTest {
       val iter = stub.executePlan(
         buildExecutePlanRequest(buildPlan(MEDIUM_RESULTS_QUERY), operationId = 
operationId))
       var lastSeenResponse: String = null
+      val serverRetryBuffer = SparkEnv.get.conf
+        .get(Connect.CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE)
+        .toLong
 
       iter.hasNext // open iterator
       val execution = getExecutionHolder
 
       // after consuming enough from the iterator, server should automatically 
start releasing
       var lastSeenIndex = 0
-      while (iter.hasNext && execution.responseObserver.releasedUntilIndex == 
0) {
+      var totalSizeSeen = 0
+      while (iter.hasNext && totalSizeSeen <= 1.1 * serverRetryBuffer) {
         val r = iter.next()
         lastSeenResponse = r.getResponseId()
+        totalSizeSeen += r.getSerializedSize
         lastSeenIndex += 1
       }
       assert(iter.hasNext)
-      assert(execution.responseObserver.releasedUntilIndex > 0)
+      Eventually.eventually(timeout(eventuallyTimeout)) {
+        assert(execution.responseObserver.releasedUntilIndex > 0)
+      }
 
       // Reattach from the beginning is not available.
       val reattach = 
stub.reattachExecute(buildReattachExecuteRequest(operationId, None))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to