Huanli Wang created SPARK-48105:
-----------------------------------

             Summary:  Fix the data corruption issue when state store unload 
and snapshotting happens concurrently for HDFS state store
                 Key: SPARK-48105
                 URL: https://issues.apache.org/jira/browse/SPARK-48105
             Project: Spark
          Issue Type: Improvement
          Components: Structured Streaming
    Affects Versions: 4.0.0
            Reporter: Huanli Wang


There are two race conditions between state store snapshotting and state store 
unloading which could result in query failure and potential data corruption.

 

Case 1:
 # the maintenance thread pool encounters some issues and call the 
[stopMaintenanceTask,|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L774]
 this function further calls 
[threadPool.stop.|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L587]
 However, this function doesn't wait for the stop operation to be completed and 
move to do the state store [unload and 
clear.|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L775-L778]
 # the provider unload will [close the state 
store|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L719-L721]
 which [clear the values of 
loadedMaps|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L353-L355]
 for HDFS backed state store.
 # if the not-yet-stop maintenance thread is still running and trying to do the 
snapshot, but the data in the underlying `HDFSBackedStateStoreMap` has been 
removed. if this snapshot process completes successfully, then we will write 
corrupted data and the following batches will consume this corrupted data.

Case 2:
 # In executor_1, the maintenance thread is going to do the snapshot for 
state_store_1, it retrieves the `HDFSBackedStateStoreMap` object from the 
loadedMaps, after this, the maintenance thread [releases the lock of the 
loadedMaps|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L750-L751].
 # state_store_1 is loaded in another executor, e.g. executor_2.
 # another state store, state_store_2, is loaded on executor_1 and 
[reportActiveStoreInstance|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L854-L871]
 to driver.
 # executor_1 does the 
[unload|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L713]
 for those no longer active state store which clears the data entries in the 
`HDFSBackedStateStoreMap`
 # the snapshotting thread is terminated and uploads the incomplete snapshot to 
cloud because the [iterator doesn't have next 
element|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L634]
 after doing the clear.
 # future batches are consuming the corrupted data.

 

Proposed fix:
 * When we close the hdfs state store, we should only remove the entry from 
`loadedMaps` rather than doing the active data cleanup. JVM GC should be able 
to help us GC those objects.
 * we should wait for the maintenance thread to stop before unloading the 
providers. 

 

Thanks [~anishshri-db] for helping debug this issue!



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to