[
https://issues.apache.org/jira/browse/SPARK-53561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Anish Shrigondekar resolved SPARK-53561.
----------------------------------------
Fix Version/s: 4.1.0
Resolution: Fixed
> Catch Interruption Exception in TransformWithStateInPySparkStateServer during
> outputStream.flush at the thread close time to avoid the worker crash
> ---------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-53561
> URL: https://issues.apache.org/jira/browse/SPARK-53561
> Project: Spark
> Issue Type: Bug
> Components: Structured Streaming
> Affects Versions: 4.1.0, 4.0.0, 4.0.1
> Reporter: Huanli Wang
> Assignee: Huanli Wang
> Priority: Major
> Labels: pull-request-available
> Fix For: 4.1.0
>
>
> When the `query.stop` is invoked, we first close the the state store running
> in the query main thread and then close the TWS state server thread as part
> of the [task completion listener
> event|https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala#L227-L231].
> So it's possible that query main thread has already closed the state store
> while in TWS state server thread, we are still doing the state store
> operations. This results in throwing out Exception like
> ```
> ERROR TransformWithStateInPySparkStateServer: Error reading message:
> [STATE_STORE_OPERATION_OUT_OF_ORDER] Streaming stateful operator attempted to
> access state store out of order. This is a bug, please retry.
> error_msg=Cannot update after ABORTED SQLSTATE: XXKST
> org.apache.spark.sql.execution.streaming.state.StateStoreOperationOutOfOrder:
> [STATE_STORE_OPERATION_OUT_OF_ORDER] Streaming stateful operator attempted to
> access state store out of order. This is a bug, please retry.
> error_msg=Cannot update after ABORTED SQLSTATE: XXKST
> ```
> This exception is caught
> [here|https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala#L180-L185].
> At this time, the TWS client in python worker may have already closed, then
> [outputStream.flush()|https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala#L183]
> will throw out `java.nio.channels.ClosedByInterruptException` which is not
> handled properly and crashes the worker.
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]