Huanli Wang created SPARK-53561:
-----------------------------------
Summary: Catch `InterruptedIOException` and
`ClosedByInterruptException` in TransformWithStateInPySparkStateServer to avoid
the worker crash
Key: SPARK-53561
URL: https://issues.apache.org/jira/browse/SPARK-53561
Project: Spark
Issue Type: Bug
Components: Structured Streaming
Affects Versions: 4.0.1, 4.0.0, 4.1.0
Reporter: Huanli Wang
When the `query.stop` is invoked, we first close the the state store running in
the query main thread and then close the TWS state server thread as part of the
[task completion listener
event|https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala#L227-L231].
So it's possible that query main thread has already closed the state store
while in TWS state server thread, we are still doing the state store
operations. This results in throwing out Exception like
```
ERROR TransformWithStateInPySparkStateServer: Error reading message:
[STATE_STORE_OPERATION_OUT_OF_ORDER] Streaming stateful operator attempted to
access state store out of order. This is a bug, please retry. error_msg=Cannot
update after ABORTED SQLSTATE: XXKST
org.apache.spark.sql.execution.streaming.state.StateStoreOperationOutOfOrder:
[STATE_STORE_OPERATION_OUT_OF_ORDER] Streaming stateful operator attempted to
access state store out of order. This is a bug, please retry. error_msg=Cannot
update after ABORTED SQLSTATE: XXKST
```
This exception is caught
[here|https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala#L180-L185].
At this time, the TWS client in python worker may have already closed, then
[outputStream.flush()|https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala#L183]
will throw out `java.nio.channels.ClosedByInterruptException` which is not
handled properly and crashes the worker.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]