[ 
https://issues.apache.org/jira/browse/SPARK-48105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jungtaek Lim resolved SPARK-48105.
----------------------------------
    Fix Version/s: 4.0.0
         Assignee: Huanli Wang
       Resolution: Fixed

Issue resolved via https://github.com/apache/spark/pull/46351

>  Fix the data corruption issue when state store unload and snapshotting 
> happens concurrently for HDFS state store
> -----------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-48105
>                 URL: https://issues.apache.org/jira/browse/SPARK-48105
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 4.0.0, 3.5.2, 3.4.4
>            Reporter: Huanli Wang
>            Assignee: Huanli Wang
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 4.0.0
>
>
> There are two race conditions between state store snapshotting and state 
> store unloading which could result in query failure and potential data 
> corruption.
>  
> Case 1:
>  # the maintenance thread pool encounters some issues and call the 
> [stopMaintenanceTask,|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L774]
>  this function further calls 
> [threadPool.stop.|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L587]
>  However, this function doesn't wait for the stop operation to be completed 
> and move to do the state store [unload and 
> clear.|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L775-L778]
>  # the provider unload will [close the state 
> store|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L719-L721]
>  which [clear the values of 
> loadedMaps|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L353-L355]
>  for HDFS backed state store.
>  # if the not-yet-stop maintenance thread is still running and trying to do 
> the snapshot, but the data in the underlying `HDFSBackedStateStoreMap` has 
> been removed. if this snapshot process completes successfully, then we will 
> write corrupted data and the following batches will consume this corrupted 
> data.
> Case 2:
>  # In executor_1, the maintenance thread is going to do the snapshot for 
> state_store_1, it retrieves the `HDFSBackedStateStoreMap` object from the 
> loadedMaps, after this, the maintenance thread [releases the lock of the 
> loadedMaps|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L750-L751].
>  # state_store_1 is loaded in another executor, e.g. executor_2.
>  # another state store, state_store_2, is loaded on executor_1 and 
> [reportActiveStoreInstance|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L854-L871]
>  to driver.
>  # executor_1 does the 
> [unload|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L713]
>  for those no longer active state store which clears the data entries in the 
> `HDFSBackedStateStoreMap`
>  # the snapshotting thread is terminated and uploads the incomplete snapshot 
> to cloud because the [iterator doesn't have next 
> element|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L634]
>  after doing the clear.
>  # future batches are consuming the corrupted data.
>  
> Proposed fix:
>  * When we close the hdfs state store, we should only remove the entry from 
> `loadedMaps` rather than doing the active data cleanup. JVM GC should be able 
> to help us GC those objects.
>  * we should wait for the maintenance thread to stop before unloading the 
> providers. 
>  
> Thanks [~anishshri-db] for helping debug this issue!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to