anishshri-db commented on code in PR #47475:
URL: https://github.com/apache/spark/pull/47475#discussion_r1690828539


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala:
##########
@@ -1562,42 +1594,55 @@ abstract class StateStoreSuiteBase[ProviderClass <: 
StateStoreProvider]
     }
   }
 
-  test("SPARK-44438: maintenance task should be shutdown on error") {
-    val conf = new SparkConf()
-      .setMaster("local")
-      .setAppName("test")
+  test("SPARK-48997: maintenance threads with exceptions unload only 
themselves") {
     val sqlConf = getDefaultSQLConf(
       SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get,
       SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get
     )
-    // Make maintenance interval small so that maintenance task is called 
right after scheduling.
+    //  Make maintenance interval small so that maintenance task is called 
right after scheduling.
     sqlConf.setConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL, 100L)
-    // Use the `FakeStateStoreProviderWithMaintenanceError` to run the test
-    sqlConf.setConf(SQLConf.STATE_STORE_PROVIDER_CLASS,
-      classOf[FakeStateStoreProviderWithMaintenanceError].getName)
+    // Use the `MaintenanceErrorOnCertainPartitionsProvider` to run the test
+    sqlConf.setConf(
+      SQLConf.STATE_STORE_PROVIDER_CLASS,
+      classOf[MaintenanceErrorOnCertainPartitionsProvider].getName
+    )
 
-    quietly {
-      withSpark(new SparkContext(conf)) { sc =>
-        withCoordinatorRef(sc) { _ =>
-          
FakeStateStoreProviderWithMaintenanceError.errorOnMaintenance.set(false)
-          val storeId = StateStoreProviderId(StateStoreId("firstDir", 0, 1), 
UUID.randomUUID)
-          val storeConf = StateStoreConf(sqlConf)
-
-          // get the state store and kick off the maintenance task
-          StateStore.get(storeId, null, null,
-            NoPrefixKeyStateEncoderSpec(keySchema), 0, useColumnFamilies = 
false,
-            storeConf, sc.hadoopConfiguration)
-
-          eventually(timeout(30.seconds)) {
-            assert(!StateStore.isMaintenanceRunning)
-          }
+    val conf = new SparkConf().setMaster("local").setAppName("test")
+
+    withSpark(new SparkContext(conf)) { sc =>
+      withCoordinatorRef(sc) { _ =>
+        // 0 and 1's maintenance will fail
+        val provider0Id =
+          StateStoreProviderId(StateStoreId("spark-48997", 0, 0), 
UUID.randomUUID)
+        val provider1Id =
+          StateStoreProviderId(StateStoreId("spark-48997", 0, 1), 
UUID.randomUUID)
+        val provider2Id =
+          StateStoreProviderId(StateStoreId("spark-48997", 0, 2), 
UUID.randomUUID)
+
+        // Create provider 2 first to start maintenance for it
+        StateStore.get(
+          provider2Id,
+          keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema),
+          0, useColumnFamilies = false, new StateStoreConf(sqlConf), new 
Configuration()
+        )
 
-          // SPARK-45002: The maintenance task thread failure should not 
invoke the
-          // SparkUncaughtExceptionHandler which could lead to the executor 
process
-          // getting killed.
-          
assert(!FakeStateStoreProviderWithMaintenanceError.errorOnMaintenance.get)
+        // The following 2 calls go `get` will cause the associated 
maintenance to fail
+        StateStore.get(
+          provider0Id,
+          keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema),
+          0, useColumnFamilies = false, new StateStoreConf(sqlConf), new 
Configuration()
+        )
+
+        StateStore.get(
+          provider1Id,
+          keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema),
+          0, useColumnFamilies = false, new StateStoreConf(sqlConf), new 
Configuration()
+        )
 
-          StateStore.stop()
+        eventually(timeout(5.seconds)) {
+          assert(!StateStore.isLoaded(provider0Id))
+          assert(!StateStore.isLoaded(provider1Id))
+          assert(StateStore.isLoaded(provider2Id))

Review Comment:
   Is there also a way to check that all the maint thread pool threads are 
still running ? Should we add another 4th provider and ensure that it is picked 
up and remains loaded ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to