This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 87a6e70ec96b [SPARK-48889][SS] testStream to unload state stores before finishing 87a6e70ec96b is described below commit 87a6e70ec96bc3952fa4bc7f04b38be94a302fef Author: Siying Dong <siying.d...@databricks.com> AuthorDate: Wed Jul 17 12:26:59 2024 +0900 [SPARK-48889][SS] testStream to unload state stores before finishing ### What changes were proposed in this pull request? In the end of each testStream() call, unload all state stores from the executor ### Why are the changes needed? Currently, after a test, we don't unload state store or disable maintenance task. So after a test, the maintenance task can run and fail as the checkpoint directory is already deleted. This might cause an issue and fail the next test. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? See existing tests to pass ### Was this patch authored or co-authored using generative AI tooling? No. Closes #47339 from siying/SPARK-48889. Authored-by: Siying Dong <siying.d...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> (cherry picked from commit 3a245558be882ae94f507976e4e4fb8c1d9bf344) Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 579b017944a5..14d4985a407d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -808,6 +808,13 @@ trait StreamTest extends QueryTest with SharedSparkSession with TimeLimits with case (key, None) => sparkSession.conf.unset(key) } sparkSession.streams.removeListener(listener) + // The state store is stopped here to unload all state stores and terminate all maintenance + // threads. It is necessary because the temp directory used by the checkpoint directory + // may be deleted soon after, and the maintenance thread may see unexpected error and + // cause unexpected behavior. Doing it after a test finishes might be too late because + // sometimes the checkpoint directory is under `withTempDir`, and in this case the temp + // directory is deleted before the test finishes. + StateStore.stop() } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org