This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.4 by this push:
     new 87a6e70ec96b [SPARK-48889][SS] testStream to unload state stores 
before finishing
87a6e70ec96b is described below

commit 87a6e70ec96bc3952fa4bc7f04b38be94a302fef
Author: Siying Dong <siying.d...@databricks.com>
AuthorDate: Wed Jul 17 12:26:59 2024 +0900

    [SPARK-48889][SS] testStream to unload state stores before finishing
    
    ### What changes were proposed in this pull request?
    In the end of each testStream() call, unload all state stores from the 
executor
    
    ### Why are the changes needed?
    Currently, after a test, we don't unload state store or disable maintenance 
task. So after a test, the maintenance task can run and fail as the checkpoint 
directory is already deleted. This might cause an issue and fail the next test.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    See existing tests to pass
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No.
    
    Closes #47339 from siying/SPARK-48889.
    
    Authored-by: Siying Dong <siying.d...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit 3a245558be882ae94f507976e4e4fb8c1d9bf344)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 579b017944a5..14d4985a407d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -808,6 +808,13 @@ trait StreamTest extends QueryTest with SharedSparkSession 
with TimeLimits with
         case (key, None) => sparkSession.conf.unset(key)
       }
       sparkSession.streams.removeListener(listener)
+      // The state store is stopped here to unload all state stores and 
terminate all maintenance
+      // threads. It is necessary because the temp directory used by the 
checkpoint directory
+      // may be deleted soon after, and the maintenance thread may see 
unexpected error and
+      // cause unexpected behavior. Doing it after a test finishes might be 
too late because
+      // sometimes the checkpoint directory is under `withTempDir`, and in 
this case the temp
+      // directory is deleted before the test finishes.
+      StateStore.stop()
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to