[ 
https://issues.apache.org/jira/browse/SPARK-53561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated SPARK-53561:
-----------------------------------
    Labels: pull-request-available  (was: )

> Catch Interruption Exception in TransformWithStateInPySparkStateServer during 
> outputStream.flush at the thread close time to avoid the worker crash
> ---------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-53561
>                 URL: https://issues.apache.org/jira/browse/SPARK-53561
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 4.1.0, 4.0.0, 4.0.1
>            Reporter: Huanli Wang
>            Priority: Major
>              Labels: pull-request-available
>
> When the `query.stop` is invoked, we first close the the state store running 
> in the query main thread and then close the TWS state server thread as part 
> of the [task completion listener 
> event|https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala#L227-L231].
>  So it's possible that query main thread has already closed the state store 
> while in TWS state server thread, we are still doing the state store 
> operations. This results in throwing out Exception like
> ```
> ERROR TransformWithStateInPySparkStateServer: Error reading message: 
> [STATE_STORE_OPERATION_OUT_OF_ORDER] Streaming stateful operator attempted to 
> access state store out of order. This is a bug, please retry. 
> error_msg=Cannot update after ABORTED SQLSTATE: XXKST 
> org.apache.spark.sql.execution.streaming.state.StateStoreOperationOutOfOrder: 
> [STATE_STORE_OPERATION_OUT_OF_ORDER] Streaming stateful operator attempted to 
> access state store out of order. This is a bug, please retry. 
> error_msg=Cannot update after ABORTED SQLSTATE: XXKST
> ```
> This exception is caught 
> [here|https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala#L180-L185].
>  At this time, the TWS client in python worker may have already closed, then 
> [outputStream.flush()|https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala#L183]
>  will throw out `java.nio.channels.ClosedByInterruptException` which is not 
> handled properly and crashes the worker.
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to