neilramaswamy commented on code in PR #47475: URL: https://github.com/apache/spark/pull/47475#discussion_r1690830808
########## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala: ########## @@ -1562,42 +1594,55 @@ abstract class StateStoreSuiteBase[ProviderClass <: StateStoreProvider] } } - test("SPARK-44438: maintenance task should be shutdown on error") { - val conf = new SparkConf() - .setMaster("local") - .setAppName("test") + test("SPARK-48997: maintenance threads with exceptions unload only themselves") { val sqlConf = getDefaultSQLConf( SQLConf.STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT.defaultValue.get, SQLConf.MAX_BATCHES_TO_RETAIN_IN_MEMORY.defaultValue.get ) - // Make maintenance interval small so that maintenance task is called right after scheduling. + // Make maintenance interval small so that maintenance task is called right after scheduling. sqlConf.setConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL, 100L) - // Use the `FakeStateStoreProviderWithMaintenanceError` to run the test - sqlConf.setConf(SQLConf.STATE_STORE_PROVIDER_CLASS, - classOf[FakeStateStoreProviderWithMaintenanceError].getName) + // Use the `MaintenanceErrorOnCertainPartitionsProvider` to run the test + sqlConf.setConf( + SQLConf.STATE_STORE_PROVIDER_CLASS, + classOf[MaintenanceErrorOnCertainPartitionsProvider].getName + ) - quietly { - withSpark(new SparkContext(conf)) { sc => - withCoordinatorRef(sc) { _ => - FakeStateStoreProviderWithMaintenanceError.errorOnMaintenance.set(false) - val storeId = StateStoreProviderId(StateStoreId("firstDir", 0, 1), UUID.randomUUID) - val storeConf = StateStoreConf(sqlConf) - - // get the state store and kick off the maintenance task - StateStore.get(storeId, null, null, - NoPrefixKeyStateEncoderSpec(keySchema), 0, useColumnFamilies = false, - storeConf, sc.hadoopConfiguration) - - eventually(timeout(30.seconds)) { - assert(!StateStore.isMaintenanceRunning) - } + val conf = new SparkConf().setMaster("local").setAppName("test") + + withSpark(new SparkContext(conf)) { sc => + withCoordinatorRef(sc) { _ => + // 0 and 1's maintenance will fail + val provider0Id = + StateStoreProviderId(StateStoreId("spark-48997", 0, 0), UUID.randomUUID) + val provider1Id = + StateStoreProviderId(StateStoreId("spark-48997", 0, 1), UUID.randomUUID) + val provider2Id = + StateStoreProviderId(StateStoreId("spark-48997", 0, 2), UUID.randomUUID) + + // Create provider 2 first to start maintenance for it + StateStore.get( + provider2Id, + keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema), + 0, useColumnFamilies = false, new StateStoreConf(sqlConf), new Configuration() + ) - // SPARK-45002: The maintenance task thread failure should not invoke the - // SparkUncaughtExceptionHandler which could lead to the executor process - // getting killed. - assert(!FakeStateStoreProviderWithMaintenanceError.errorOnMaintenance.get) + // The following 2 calls go `get` will cause the associated maintenance to fail + StateStore.get( + provider0Id, + keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema), + 0, useColumnFamilies = false, new StateStoreConf(sqlConf), new Configuration() + ) + + StateStore.get( + provider1Id, + keySchema, valueSchema, NoPrefixKeyStateEncoderSpec(keySchema), + 0, useColumnFamilies = false, new StateStoreConf(sqlConf), new Configuration() + ) - StateStore.stop() + eventually(timeout(5.seconds)) { + assert(!StateStore.isLoaded(provider0Id)) + assert(!StateStore.isLoaded(provider1Id)) + assert(StateStore.isLoaded(provider2Id)) Review Comment: That's tricky, right ? Because even if we start with 3 threads and 2 die, then after provider 2's maintenance finishes successfully, it could pick up the 4th providers' maintenance. One solution I see: we can force the maintenance thread pool to only have 1 thread (it's a store conf I think)—then if it ever throws an exception, it can never possibly run maintenance again. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org