This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new aff9eab9039 [SPARK-45511][SS] Fix state reader suite flakiness by 
clean up resources after each test run
aff9eab9039 is described below

commit aff9eab90392f22c0037abdf50e6894615e4dbf9
Author: Chaoqin Li <chaoqin...@databricks.com>
AuthorDate: Fri Nov 17 07:27:28 2023 +0900

    [SPARK-45511][SS] Fix state reader suite flakiness by clean up resources 
after each test run
    
    ### What changes were proposed in this pull request?
    Fix state reader suite flakiness by clean up resources after each test.
    
    The reason we have to clean up StateStore per test is due to maintenance 
task. When we run the streaming query, state store is being initialized in to 
the executor, and registration is performed against the coordinator in driver. 
The lifecycle of the state store provider is not strictly tied to the the 
lifecycle of the streaming query - the executor closes the state store provider 
when coordinator indicates to the executor that the state store provider is no 
longer valid, which is not [...]
    
    This means maintenance task against the provider can run after test A. We 
are clearing the temp directory in test A after the test A has completed, which 
can break the operation being performed against state store provider being used 
in test A. E.g. directory no longer exists while maintenance task is running.
    
    This won't be an issue in practice because we do not expect the checkpoint 
location to be temporary, but it is indeed an issue for how we setup and 
cleanup env for tests.
    
    ### Why are the changes needed?
    
    To deflake the test.
    
    Closes #43831 from chaoqin-li1123/fix_state_reader_suite.
    
    Authored-by: Chaoqin Li <chaoqin...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../datasources/v2/state/StateDataSourceTestBase.scala       | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala
index 890a716bbef..f5392cc823f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceTestBase.scala
@@ -20,6 +20,7 @@ import java.sql.Timestamp
 
 import org.apache.spark.sql.{DataFrame, Dataset}
 import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.StateStore
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.streaming._
@@ -28,6 +29,17 @@ import org.apache.spark.sql.streaming.util.StreamManualClock
 trait StateDataSourceTestBase extends StreamTest with StateStoreMetricsTest {
   import testImplicits._
 
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    spark.streams.stateStoreCoordinator // initialize the lazy coordinator
+  }
+
+  override def afterEach(): Unit = {
+    // Stop maintenance tasks because they may access already deleted 
checkpoint.
+    StateStore.stop()
+    super.afterEach()
+  }
+
   protected def runCompositeKeyStreamingAggregationQuery(checkpointRoot: 
String): Unit = {
     val inputData = MemoryStream[Int]
     val aggregated = getCompositeKeyStreamingAggregationQuery(inputData)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to