This is an automated email from the ASF dual-hosted git repository.

sarutak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e42b95efa1e [SPARK-53673][CONNECT][TESTS] Fix a flaky test failure in 
`SparkSessionE2ESuite - interrupt tag` caused by the usage of `ForkJoinPool`
0e42b95efa1e is described below

commit 0e42b95efa1eda06bd1710c8271e4a951068a9ce
Author: Kousuke Saruta <[email protected]>
AuthorDate: Tue Sep 23 23:08:55 2025 +0900

    [SPARK-53673][CONNECT][TESTS] Fix a flaky test failure in 
`SparkSessionE2ESuite - interrupt tag` caused by the usage of `ForkJoinPool`
    
    ### What changes were proposed in this pull request?
    This PR aims to fix one of the issues which block SPARK-48139.
    In the problematic test `interrupt tag` in `SparkSessionE2ESuite`, four 
futures run on threads in `ForkJoinPool` and try to interrupt through tags.
    A thread in a `ForkJoinPool` can create a spare thread and make it 
available in the pool so any of threads can be parent and child. It can happen 
when a thread performs a blocking operation. One example is 
`ArrayBlockingQueue.take` and it is called in a method provided by 
[gRPC](https://github.com/grpc/grpc-java/blob/3e993a9f44ff52bd3d5ac59dfa978d8e7d30e28b/stub/src/main/java/io/grpc/stub/ClientCalls.java#L607).
    
    On the other hand, tags are inplemented as 
[InheritableThreadLocal](https://github.com/apache/spark/blob/f3ac67ee9b3b0ce63b30426f8bec825b20d91dde/sql/connect/common/src/main/scala/org/apache/spark/sql/connect/client/SparkConnectClient.scala#L303).
    So, if the futures q1 and q4, or q2 and q3 are parent and child, tags 
should be inheritd, which causes the flaky test faulure.
    
    You can easily reprodue the issue by inserting a sleep into the problematic 
test like as follows (don't forget to replace `ignore` with `test`).
    
    ```
       // TODO(SPARK-48139): Re-enable `SparkSessionE2ESuite.interrupt tag`
    -  ignore("interrupt tag") {
    +  test("interrupt tag") {
         val session = spark
         import session.implicits._
    
     -204,6 +204,7  class SparkSessionE2ESuite extends ConnectFunSuite with 
RemoteSparkSession {
             spark.clearTags() // clear for the case of thread reuse by another 
Future
           }
         }(executionContext)
    +    Thread.sleep(1000)
         val q4 = Future {
           assert(spark.getTags() == Set())
           spark.addTag("one")
    ```
    
    And then, run the test.
    ```
    $ build/sbt 'connect-client-jvm/testOnly 
org.apache.spark.sql.connect.SparkSessionE2ESuite -- -z "interrupt tag"'
    ```
    
    ### Why are the changes needed?
    For test stability.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Ran the problematic test with inserting sleep like mentioned above and it 
passed.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #52417 from sarutak/fix-thread-pool-issue.
    
    Authored-by: Kousuke Saruta <[email protected]>
    Signed-off-by: Kousuke Saruta <[email protected]>
---
 .../scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala     | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
index 1d022489b701..4c0073cad567 100644
--- 
a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
+++ 
b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.spark.sql.connect
 
-import java.util.concurrent.ForkJoinPool
+import java.util.concurrent.Executors
 
 import scala.collection.mutable
 import scala.concurrent.{ExecutionContext, ExecutionContextExecutor, Future}
@@ -146,7 +146,7 @@ class SparkSessionE2ESuite extends ConnectFunSuite with 
RemoteSparkSession {
     // global ExecutionContext has only 2 threads in Apache Spark CI
     // create own thread pool for four Futures used in this test
     val numThreads = 4
-    val fpool = new ForkJoinPool(numThreads)
+    val fpool = Executors.newFixedThreadPool(numThreads)
     val executionContext = ExecutionContext.fromExecutorService(fpool)
 
     val q1 = Future {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to