[
https://issues.apache.org/jira/browse/SPARK-48105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dongjoon Hyun closed SPARK-48105.
---------------------------------
> Fix the data corruption issue when state store unload and snapshotting
> happens concurrently for HDFS state store
> -----------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-48105
> URL: https://issues.apache.org/jira/browse/SPARK-48105
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 3.5.2, 3.4.4, 4.0.0
> Reporter: Huanli Wang
> Assignee: Huanli Wang
> Priority: Blocker
> Labels: correctness, pull-request-available
> Fix For: 3.5.2, 3.4.4, 4.0.0
>
>
> There are two race conditions between state store snapshotting and state
> store unloading which could result in query failure and potential data
> corruption.
>
> Case 1:
> # the maintenance thread pool encounters some issues and call the
> [stopMaintenanceTask,|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L774]
> this function further calls
> [threadPool.stop.|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L587]
> However, this function doesn't wait for the stop operation to be completed
> and move to do the state store [unload and
> clear.|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L775-L778]
> # the provider unload will [close the state
> store|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L719-L721]
> which [clear the values of
> loadedMaps|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L353-L355]
> for HDFS backed state store.
> # if the not-yet-stop maintenance thread is still running and trying to do
> the snapshot, but the data in the underlying `HDFSBackedStateStoreMap` has
> been removed. if this snapshot process completes successfully, then we will
> write corrupted data and the following batches will consume this corrupted
> data.
> Case 2:
> # In executor_1, the maintenance thread is going to do the snapshot for
> state_store_1, it retrieves the `HDFSBackedStateStoreMap` object from the
> loadedMaps, after this, the maintenance thread [releases the lock of the
> loadedMaps|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L750-L751].
> # state_store_1 is loaded in another executor, e.g. executor_2.
> # another state store, state_store_2, is loaded on executor_1 and
> [reportActiveStoreInstance|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L854-L871]
> to driver.
> # executor_1 does the
> [unload|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L713]
> for those no longer active state store which clears the data entries in the
> `HDFSBackedStateStoreMap`
> # the snapshotting thread is terminated and uploads the incomplete snapshot
> to cloud because the [iterator doesn't have next
> element|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L634]
> after doing the clear.
> # future batches are consuming the corrupted data.
>
> Proposed fix:
> * When we close the hdfs state store, we should only remove the entry from
> `loadedMaps` rather than doing the active data cleanup. JVM GC should be able
> to help us GC those objects.
> * we should wait for the maintenance thread to stop before unloading the
> providers.
>
> Thanks [~anishshri-db] for helping debug this issue!
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]