[ https://issues.apache.org/jira/browse/SPARK-48105?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17843123#comment-17843123 ]
Anish Shrigondekar commented on SPARK-48105: -------------------------------------------- Thanks [~huanli.wang] Worth noting that we believe that this also fixes the stream-stream join null pointer issue - https://issues.apache.org/jira/browse/SPARK-31754 This was effectively happening due to the same state data loss/corruption scenarios explained above cc - [~kabhwan] > Fix the data corruption issue when state store unload and snapshotting > happens concurrently for HDFS state store > ----------------------------------------------------------------------------------------------------------------- > > Key: SPARK-48105 > URL: https://issues.apache.org/jira/browse/SPARK-48105 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Affects Versions: 4.0.0 > Reporter: Huanli Wang > Priority: Major > Labels: pull-request-available > > There are two race conditions between state store snapshotting and state > store unloading which could result in query failure and potential data > corruption. > > Case 1: > # the maintenance thread pool encounters some issues and call the > [stopMaintenanceTask,|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L774] > this function further calls > [threadPool.stop.|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L587] > However, this function doesn't wait for the stop operation to be completed > and move to do the state store [unload and > clear.|https://github.com/apache/spark/blob/d9d79a54a3cd487380039c88ebe9fa708e0dcf23/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L775-L778] > # the provider unload will [close the state > store|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L719-L721] > which [clear the values of > loadedMaps|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L353-L355] > for HDFS backed state store. > # if the not-yet-stop maintenance thread is still running and trying to do > the snapshot, but the data in the underlying `HDFSBackedStateStoreMap` has > been removed. if this snapshot process completes successfully, then we will > write corrupted data and the following batches will consume this corrupted > data. > Case 2: > # In executor_1, the maintenance thread is going to do the snapshot for > state_store_1, it retrieves the `HDFSBackedStateStoreMap` object from the > loadedMaps, after this, the maintenance thread [releases the lock of the > loadedMaps|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L750-L751]. > # state_store_1 is loaded in another executor, e.g. executor_2. > # another state store, state_store_2, is loaded on executor_1 and > [reportActiveStoreInstance|https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L854-L871] > to driver. > # executor_1 does the > [unload|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala#L713] > for those no longer active state store which clears the data entries in the > `HDFSBackedStateStoreMap` > # the snapshotting thread is terminated and uploads the incomplete snapshot > to cloud because the [iterator doesn't have next > element|https://github.com/apache/spark/blob/c6696cdcd611a682ebf5b7a183e2970ecea3b58c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L634] > after doing the clear. > # future batches are consuming the corrupted data. > > Proposed fix: > * When we close the hdfs state store, we should only remove the entry from > `loadedMaps` rather than doing the active data cleanup. JVM GC should be able > to help us GC those objects. > * we should wait for the maintenance thread to stop before unloading the > providers. > > Thanks [~anishshri-db] for helping debug this issue! -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org