This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new a4b184d8db28 [SPARK-46547][SS] Swallow non-fatal exception in 
maintenance task to avoid deadlock between maintenance thread and streaming 
aggregation operator
a4b184d8db28 is described below

commit a4b184d8db284db1c279896fb50ef111bf4c91d2
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Wed Jan 10 23:18:04 2024 +0900

    [SPARK-46547][SS] Swallow non-fatal exception in maintenance task to avoid 
deadlock between maintenance thread and streaming aggregation operator
    
    ### What changes were proposed in this pull request?
    Swallow non-fatal exception in maintenance task to avoid deadlock between 
maintenance thread and streaming aggregation operator
    
    ### Why are the changes needed?
    This change fixes a race condition that causes a deadlock between the task 
thread and the maintenance thread. This is primarily only possible with the 
streaming aggregation operator. In this case, we use 2 physical operators - 
`StateStoreRestoreExec` and `StateStoreSaveExec`. The first one opens the store 
in read-only mode and the 2nd one does the actual commit.
    
    However, the following sequence of events creates an issue
    1. Task thread runs the `StateStoreRestoreExec` and gets the store instance 
and thereby the DB instance lock
    2. Maintenance thread fails with an error for some reason
    3. Maintenance thread takes the `loadedProviders` lock and tries to call 
`close` on all the loaded providers
    4. Task thread tries to execute the StateStoreRDD for the 
`StateStoreSaveExec` operator and tries to acquire the `loadedProviders` lock 
which is held by the thread above
    
    So basically if the maintenance thread is interleaved between the 
`restore/save` operations, there is a deadlock condition based on the 
`loadedProviders` lock and the DB instance lock.
    
    The fix proposes to simply release the resources at the end of the 
`StateStoreRestoreExec` operator (note that `abort` for `ReadStateStore` is 
likely a misnomer - but we choose to follow the already provided API in this 
case)
    
    Relevant Logs:
    Link - 
https://github.com/anishshri-db/spark/actions/runs/7356847259/job/20027577445?pr=4
    ```
    2023-12-27T09:59:02.6362466Z 09:59:02.635 WARN 
org.apache.spark.sql.execution.streaming.state.StateStore: Error in 
maintenanceThreadPool
    2023-12-27T09:59:02.6365616Z java.io.FileNotFoundException: File 
file:/home/runner/work/spark/spark/target/tmp/spark-8ef51f34-b9de-48f2-b8df-07e14599b4c9/state/0/1
 does not exist
    2023-12-27T09:59:02.6367861Z    at 
org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:733)
    2023-12-27T09:59:02.6369383Z    at 
org.apache.hadoop.fs.DelegateToFileSystem.listStatus(DelegateToFileSystem.java:177)
    2023-12-27T09:59:02.6370693Z    at 
org.apache.hadoop.fs.ChecksumFs.listStatus(ChecksumFs.java:571)
    2023-12-27T09:59:02.6371781Z    at 
org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1940)
    2023-12-27T09:59:02.6372876Z    at 
org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1936)
    2023-12-27T09:59:02.6373967Z    at 
org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
    2023-12-27T09:59:02.6375104Z    at 
org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1942)
    2023-12-27T09:59:02.6376676Z 09:59:02.636 WARN 
org.apache.spark.sql.execution.streaming.state.StateStore: Error running 
maintenance thread
    2023-12-27T09:59:02.6379079Z java.io.FileNotFoundException: File 
file:/home/runner/work/spark/spark/target/tmp/spark-8ef51f34-b9de-48f2-b8df-07e14599b4c9/state/0/1
 does not exist
    2023-12-27T09:59:02.6381083Z    at 
org.apache.hadoop.fs.RawLocalFileSystem.listStatus(RawLocalFileSystem.java:733)
    2023-12-27T09:59:02.6382490Z    at 
org.apache.hadoop.fs.DelegateToFileSystem.listStatus(DelegateToFileSystem.java:177)
    2023-12-27T09:59:02.6383816Z    at 
org.apache.hadoop.fs.ChecksumFs.listStatus(ChecksumFs.java:571)
    2023-12-27T09:59:02.6384875Z    at 
org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1940)
    2023-12-27T09:59:02.6386294Z    at 
org.apache.hadoop.fs.FileContext$Util$1.next(FileContext.java:1936)
    2023-12-27T09:59:02.6387439Z    at 
org.apache.hadoop.fs.FSLinkResolver.resolve(FSLinkResolver.java:90)
    2023-12-27T09:59:02.6388674Z    at 
org.apache.hadoop.fs.FileContext$Util.listStatus(FileContext.java:1942)
    ...
    2023-12-27T10:01:02.4292831Z [info] - changing 
schema of state when restarting query - state format version 2 
(RocksDBStateStore) *** FAILED *** (2 minutes)
    2023-12-27T10:01:02.4295311Z [info]   Timed 
out waiting for stream: The code passed to failAfter did not complete within 
120 seconds.
    2023-12-27T10:01:02.4297271Z [info]   
java.base/java.lang.Thread.getStackTrace(Thread.java:1619)
    2023-12-27T10:01:02.4299084Z [info]    
org.scalatest.concurrent.TimeLimits$.failAfterImpl(TimeLimits.scala:277)
    2023-12-27T10:01:02.4300948Z [info]    
org.scalatest.concurrent.TimeLimits.failAfter(TimeLimits.scala:231)
    ...
    2023-12-27T10:01:02.6474472Z 10:01:02.646 WARN 
org.apache.spark.sql.execution.streaming.state.RocksDB 
StateStoreId(opId=0,partId=0,name=default): Error closing RocksDB
    2023-12-27T10:01:02.6482792Z org.apache.spark.SparkException: 
[CANNOT_LOAD_STATE_STORE.UNRELEASED_THREAD_ERROR] An error occurred during 
loading state. StateStoreId(opId=0,partId=0,name=default): RocksDB instance 
could not be acquired by [ThreadId: Some(1858)] as it was not released by 
[ThreadId: Some(3835), task: partition 0.0 in stage 513.0, TID 1369] after 
120009 ms.
    2023-12-27T10:01:02.6488483Z Thread holding the lock has trace: 
app//org.apache.spark.sql.execution.streaming.state.StateStore$.getStateStoreProvider(StateStore.scala:577)
    2023-12-27T10:01:02.6490896Z 
app//org.apache.spark.sql.execution.streaming.state.StateStore$.get(StateStore.scala:565)
    2023-12-27T10:01:02.6493072Z 
app//org.apache.spark.sql.execution.streaming.state.StateStoreRDD.compute(StateStoreRDD.scala:128)
    2023-12-27T10:01:02.6494915Z 
app//org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    2023-12-27T10:01:02.6496232Z 
app//org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
    2023-12-27T10:01:02.6497655Z 
app//org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    2023-12-27T10:01:02.6499153Z 
app//org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
    2023-12-27T10:01:02.6556758Z 10:01:02.654 WARN 
org.apache.spark.scheduler.TaskSetManager: Lost task 0.0 in stage 513.0 (TID 
1369) (localhost executor driver): TaskKilled (Stage cancelled: 
[SPARK_JOB_CANCELLED] Job 260 cancelled part of cancelled job group 
cf26288c-0158-48ce-8a86-00a596dd45d8 SQLSTATE: XXKDA)
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing unit tests
    ```
    [info] Run completed in 6 minutes, 20 seconds.
    [info] Total number of tests run: 80
    [info] Suites: completed 1, aborted 0
    [info] Tests: succeeded 80, failed 0, canceled 0, ignored 0, pending 0
    [info] All tests passed.
    ```
    
    ### Was this patch authored or co-authored using generative AI tooling?
    Yes
    
    Closes #44542 from anishshri-db/task/SPARK-46547.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    (cherry picked from commit f7b0b453791707b904ed0fa5508aa4b648d56bba)
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../streaming/state/RocksDBStateStoreProvider.scala          | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
index 10f207c7ec1f..a19eb00a7b5e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreProvider.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.streaming.state
 
 import java.io._
 
+import scala.util.control.NonFatal
+
 import org.apache.hadoop.conf.Configuration
 
 import org.apache.spark.{SparkConf, SparkEnv}
@@ -202,7 +204,15 @@ private[sql] class RocksDBStateStoreProvider
   }
 
   override def doMaintenance(): Unit = {
-    rocksDB.doMaintenance()
+    try {
+      rocksDB.doMaintenance()
+    } catch {
+      // SPARK-46547 - Swallow non-fatal exception in maintenance task to 
avoid deadlock between
+      // maintenance thread and streaming aggregation operator
+      case NonFatal(ex) =>
+        logWarning(s"Ignoring error while performing maintenance operations 
with exception=",
+          ex)
+    }
   }
 
   override def close(): Unit = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to