[ 
https://issues.apache.org/jira/browse/SPARK-54585?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dylan Wong updated SPARK-54585:
-------------------------------
    Description: 
Issue 1:

When cancel() is called while the thread is in an interrupted state (e.g., 
during task cancellation), the previous implementation would fail. The code 
submitted Futures to cancel each stream, then called awaitResult() to wait for 
completion. However, awaitResult() checks the thread's interrupt flag and 
throws InterruptedException immediately if the thread is interrupted.

Issue 2:

Consider the case where {{abort()}} is called on 
{{{}RocksDBStateStoreProvider{}}}. This calls {{rollback()}} on the {{RocksDB}} 
instance, which in turn calls {{changelogWriter.foreach(_.abort())}} and then 
sets {{{}changelogWriter = None{}}}.

However, if {{changelogWriter.abort()}} throws an exception, the finally block 
still sets {{backingFileStream}} and {{compressedStream}} to {{{}null{}}}. The 
exception propagates, and we never reach the line that sets {{{}changelogWriter 
= None{}}}.

This leaves the RocksDB instance in an inconsistent state:
 * changelogWriter = Some(changelogWriterWeAttemptedToAbort)
 * changelogWriterWeAttemptedToAbort.backingFileStream = null
 * changelogWriterWeAttemptedToAbort.compressedStream = null

Now consider calling {{RocksDB.load()}} again. This calls 
{{{}replayChangelog(){}}}, which calls {{{}put(){}}}, which calls 
{{{}changelogWriter.put(){}}}. At this point, the assertion 
{{assert(compressedStream != null)}} fails, causing an exception while loading 
the StateStore.

  was:
Consider the case where {{abort()}} is called on 
{{{}RocksDBStateStoreProvider{}}}. This calls {{rollback()}} on the {{RocksDB}} 
instance, which in turn calls {{changelogWriter.foreach(_.abort())}} and then 
sets {{{}changelogWriter = None{}}}.

However, if {{changelogWriter.abort()}} throws an exception, the finally block 
still sets {{backingFileStream}} and {{compressedStream}} to {{{}null{}}}. The 
exception propagates, and we never reach the line that sets {{{}changelogWriter 
= None{}}}.

This leaves the RocksDB instance in an inconsistent state:
 * changelogWriter = Some(changelogWriterWeAttemptedToAbort)
 * changelogWriterWeAttemptedToAbort.backingFileStream = null
 * changelogWriterWeAttemptedToAbort.compressedStream = null

Now consider calling {{RocksDB.load()}} again. This calls 
{{{}replayChangelog(){}}}, which calls {{{}put(){}}}, which calls 
{{{}changelogWriter.put(){}}}. At this point, the assertion 
{{assert(compressedStream != null)}} fails, causing an exception while loading 
the StateStore.


> Fix State Store rollback when thread is in interrupted state
> ------------------------------------------------------------
>
>                 Key: SPARK-54585
>                 URL: https://issues.apache.org/jira/browse/SPARK-54585
>             Project: Spark
>          Issue Type: Task
>          Components: Structured Streaming
>    Affects Versions: 4.1.0
>            Reporter: Dylan Wong
>            Priority: Major
>              Labels: pull-request-available
>
> Issue 1:
> When cancel() is called while the thread is in an interrupted state (e.g., 
> during task cancellation), the previous implementation would fail. The code 
> submitted Futures to cancel each stream, then called awaitResult() to wait 
> for completion. However, awaitResult() checks the thread's interrupt flag and 
> throws InterruptedException immediately if the thread is interrupted.
> Issue 2:
> Consider the case where {{abort()}} is called on 
> {{{}RocksDBStateStoreProvider{}}}. This calls {{rollback()}} on the 
> {{RocksDB}} instance, which in turn calls 
> {{changelogWriter.foreach(_.abort())}} and then sets {{{}changelogWriter = 
> None{}}}.
> However, if {{changelogWriter.abort()}} throws an exception, the finally 
> block still sets {{backingFileStream}} and {{compressedStream}} to 
> {{{}null{}}}. The exception propagates, and we never reach the line that sets 
> {{{}changelogWriter = None{}}}.
> This leaves the RocksDB instance in an inconsistent state:
>  * changelogWriter = Some(changelogWriterWeAttemptedToAbort)
>  * changelogWriterWeAttemptedToAbort.backingFileStream = null
>  * changelogWriterWeAttemptedToAbort.compressedStream = null
> Now consider calling {{RocksDB.load()}} again. This calls 
> {{{}replayChangelog(){}}}, which calls {{{}put(){}}}, which calls 
> {{{}changelogWriter.put(){}}}. At this point, the assertion 
> {{assert(compressedStream != null)}} fails, causing an exception while 
> loading the StateStore.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to