chaoqin-li1123 commented on code in PR #37935:
URL: https://github.com/apache/spark/pull/37935#discussion_r978283545


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -357,6 +357,75 @@ class StateStoreSuite extends 
StateStoreSuiteBase[HDFSBackedStateStoreProvider]
     }
   }
 
+  test("SPARK-40492: maintenance before unload") {
+    val conf = new SparkConf()
+      .setMaster("local")
+      .setAppName("SPARK-40492")
+    val opId = 0
+    val dir1 = newDir()
+    val storeProviderId1 = StateStoreProviderId(StateStoreId(dir1, opId, 0), 
UUID.randomUUID)
+    val sqlConf = 
getDefaultSQLConf(SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get,
+      SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get)
+    sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2)
+    val storeConf = StateStoreConf(sqlConf)
+    val hadoopConf = new Configuration()
+
+    var latestStoreVersion = 0
+
+    def generateStoreVersions(): Unit = {
+      for (i <- 1 to 20) {
+        val store = StateStore.get(storeProviderId1, keySchema, valueSchema, 
numColsPrefixKey = 0,
+          latestStoreVersion, storeConf, hadoopConf)
+        put(store, "a", 0, i)
+        store.commit()
+        latestStoreVersion += 1
+      }
+    }
+
+    val timeoutDuration = 1.minute
+
+    quietly {
+      withSpark(new SparkContext(conf)) { sc =>
+        withCoordinatorRef(sc) { coordinatorRef =>
+          require(!StateStore.isMaintenanceRunning, "StateStore is 
unexpectedly running")
+
+          // Generate sufficient versions of store for snapshots
+          generateStoreVersions()
+          eventually(timeout(timeoutDuration)) {
+            // Store should have been reported to the coordinator
+            assert(coordinatorRef.getLocation(storeProviderId1).nonEmpty,
+              "active instance was not reported")
+            // Background maintenance should clean up and generate snapshots
+            assert(StateStore.isMaintenanceRunning, "Maintenance task is not 
running")
+            // Some snapshots should have been generated
+            
tryWithProviderResource(newStoreProvider(storeProviderId1.storeId)) { provider 
=>
+              val snapshotVersions = (1 to latestStoreVersion).filter { 
version =>
+                fileExists(provider, version, isSnapshot = true)
+              }
+              assert(snapshotVersions.nonEmpty, "no snapshot file found")
+            }
+          }
+          // Generate more versions such that there is another snapshot.
+          generateStoreVersions()
+
+          // If driver decides to deactivate all stores related to a query run,
+          // then this instance should be unloaded.
+          coordinatorRef.deactivateInstances(storeProviderId1.queryRunId)
+          eventually(timeout(timeoutDuration)) {
+            assert(!StateStore.isLoaded(storeProviderId1))
+          }
+
+          // Earliest delta file should be scheduled a cleanup during unload.

Review Comment:
   Sure, I will run the test 100 times. This test is deterministic from my 
understanding. After we verify that unload is called, maintenance must also 
have been called. The reason why a shorter interval is not good for this test 
is that we can't differentiate last maintenance of unloaded instances and 
normal maintenance of active instances.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to