LuciferYang commented on code in PR #55473:
URL: https://github.com/apache/spark/pull/55473#discussion_r3161094190


##########
sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala:
##########
@@ -228,13 +229,65 @@ class SparkConnectSessionHolderSuite extends 
SharedSparkSession {
     }
   }
 
-  test("python foreachBatch process: process terminates after query is 
stopped") {
-    // scalastyle:off assume
-    assume(IntegratedUDFTestUtils.shouldTestPandasUDFs)
-    assume(PythonTestDepsChecker.isConnectDepsAvailable)
-    // scalastyle:on assume
+  // Same semantics as SparkFunSuite.retry, but prints the retry events to 
stdout so they
+  // appear in the GitHub Actions job log. SparkFunSuite.retry uses log4j, 
which in our test
+  // setup only writes to target/unit-tests.log (surfaced as an artifact, not 
in the live log).
+  private def retryWithVisibleLog(maxAttempts: Int)(body: => Unit): Unit = {

Review Comment:
   Can we add a console appender for retry warnings in the test logger config, 
then call `SparkFunSuite.retry(n = 2)` directly.
   



##########
sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala:
##########
@@ -288,8 +341,9 @@ class SparkConnectSessionHolderSuite extends 
SharedSparkSession {
         assert(runner2.isWorkerStopped().get)
       }
 
-      assert(spark.streams.active.isEmpty) // no running query
-      assert(spark.streams.listListeners().length == 1) // only process 
termination listener
+      // Only check this attempt's queries stopped (a previous timed-out 
attempt may have

Review Comment:
   Two assertions were collapsed into one:
     - `assert(spark.streams.active.isEmpty)` → 
`assert(!spark.streams.active.exists(q => q.name == q1Name || q.name == 
q2Name))`
     - `assert(spark.streams.listListeners().length == 1)` → **removed 
entirely**
     
   The first relaxation is justified for the cross-attempt leak case, but the 
second assertion (only the process-termination listener should remain) was a 
real correctness check on this code path and the PR removes it. 



##########
sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala:
##########
@@ -288,8 +341,9 @@ class SparkConnectSessionHolderSuite extends 
SharedSparkSession {
         assert(runner2.isWorkerStopped().get)
       }
 
-      assert(spark.streams.active.isEmpty) // no running query
-      assert(spark.streams.listListeners().length == 1) // only process 
termination listener
+      // Only check this attempt's queries stopped (a previous timed-out 
attempt may have
+      // leaked queries into spark.streams.active that we cannot synchronously 
clean up).
+      assert(!spark.streams.active.exists(q => q.name == q1Name || q.name == 
q2Name))
     } finally {
       SparkConnectService.stop()

Review Comment:
   When `onTimeout` fails to unblock the worker within the 30s grace, the 
worker thread is leaked. If its `dataIn.readInt()` later returns and the body 
proceeds, the leaked worker's `finally` runs `SparkConnectService.stop()`, 
sleeps 4s, then 
`spark.streams.listListeners().foreach(spark.streams.removeListener)` — on the 
**same** `SparkContext` and the **same** `SparkConnectService` singleton 
(`object SparkConnectService` with `synchronized` start/stop) that the next 
retry attempt is by then using. The leaked cleanup can therefore tear down the 
active gRPC server and delete every listener belonging to the in-flight retry, 
producing a fresh failure mode that the retry mechanism cannot detect.



##########
sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala:
##########


Review Comment:
   Worst-case green-path budget = 30s + 30s + 4s + service start/stop + query 
startup overhead, which can plausibly exceed the outer 60s 
`awaitTestBodyInNewThread` budget on a slow CI runner without the test actually 
hanging. If that happens, the outer timeout fires and `onTimeout` closes the 
cleaner cache mid-test, burning one of the three attempts on a false positive. 
The PR description notes a 5.57s reference run, so headroom looks fine in 
steady state, but the budget arithmetic does not formally cover the worst case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to