LuciferYang commented on code in PR #55473:
URL: https://github.com/apache/spark/pull/55473#discussion_r3161094190
##########
sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala:
##########
@@ -228,13 +229,65 @@ class SparkConnectSessionHolderSuite extends
SharedSparkSession {
}
}
- test("python foreachBatch process: process terminates after query is
stopped") {
- // scalastyle:off assume
- assume(IntegratedUDFTestUtils.shouldTestPandasUDFs)
- assume(PythonTestDepsChecker.isConnectDepsAvailable)
- // scalastyle:on assume
+ // Same semantics as SparkFunSuite.retry, but prints the retry events to
stdout so they
+ // appear in the GitHub Actions job log. SparkFunSuite.retry uses log4j,
which in our test
+ // setup only writes to target/unit-tests.log (surfaced as an artifact, not
in the live log).
+ private def retryWithVisibleLog(maxAttempts: Int)(body: => Unit): Unit = {
Review Comment:
Can we add a console appender for retry warnings in the test logger config,
then call `SparkFunSuite.retry(n = 2)` directly.
##########
sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala:
##########
@@ -288,8 +341,9 @@ class SparkConnectSessionHolderSuite extends
SharedSparkSession {
assert(runner2.isWorkerStopped().get)
}
- assert(spark.streams.active.isEmpty) // no running query
- assert(spark.streams.listListeners().length == 1) // only process
termination listener
+ // Only check this attempt's queries stopped (a previous timed-out
attempt may have
Review Comment:
Two assertions were collapsed into one:
- `assert(spark.streams.active.isEmpty)` →
`assert(!spark.streams.active.exists(q => q.name == q1Name || q.name ==
q2Name))`
- `assert(spark.streams.listListeners().length == 1)` → **removed
entirely**
The first relaxation is justified for the cross-attempt leak case, but the
second assertion (only the process-termination listener should remain) was a
real correctness check on this code path and the PR removes it.
##########
sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala:
##########
@@ -288,8 +341,9 @@ class SparkConnectSessionHolderSuite extends
SharedSparkSession {
assert(runner2.isWorkerStopped().get)
}
- assert(spark.streams.active.isEmpty) // no running query
- assert(spark.streams.listListeners().length == 1) // only process
termination listener
+ // Only check this attempt's queries stopped (a previous timed-out
attempt may have
+ // leaked queries into spark.streams.active that we cannot synchronously
clean up).
+ assert(!spark.streams.active.exists(q => q.name == q1Name || q.name ==
q2Name))
} finally {
SparkConnectService.stop()
Review Comment:
When `onTimeout` fails to unblock the worker within the 30s grace, the
worker thread is leaked. If its `dataIn.readInt()` later returns and the body
proceeds, the leaked worker's `finally` runs `SparkConnectService.stop()`,
sleeps 4s, then
`spark.streams.listListeners().foreach(spark.streams.removeListener)` — on the
**same** `SparkContext` and the **same** `SparkConnectService` singleton
(`object SparkConnectService` with `synchronized` start/stop) that the next
retry attempt is by then using. The leaked cleanup can therefore tear down the
active gRPC server and delete every listener belonging to the in-flight retry,
producing a fresh failure mode that the retry mechanism cannot detect.
##########
sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectSessionHolderSuite.scala:
##########
Review Comment:
Worst-case green-path budget = 30s + 30s + 4s + service start/stop + query
startup overhead, which can plausibly exceed the outer 60s
`awaitTestBodyInNewThread` budget on a slow CI runner without the test actually
hanging. If that happens, the outer timeout fires and `onTimeout` closes the
cleaner cache mid-test, burning one of the three attempts on a false positive.
The PR description notes a 5.57s reference run, so headroom looks fine in
steady state, but the budget arithmetic does not formally cover the worst case.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]