Huanli Wang created SPARK-53561:
-----------------------------------

             Summary: Catch `InterruptedIOException` and 
`ClosedByInterruptException` in TransformWithStateInPySparkStateServer to avoid 
the worker crash
                 Key: SPARK-53561
                 URL: https://issues.apache.org/jira/browse/SPARK-53561
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 4.0.1, 4.0.0, 4.1.0
            Reporter: Huanli Wang


When the `query.stop` is invoked, we first close the the state store running in 
the query main thread and then close the TWS state server thread as part of the 
[task completion listener 
event|https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala#L227-L231].
 So it's possible that query main thread has already closed the state store 
while in TWS state server thread, we are still doing the state store 
operations. This results in throwing out Exception like
```

ERROR TransformWithStateInPySparkStateServer: Error reading message: 
[STATE_STORE_OPERATION_OUT_OF_ORDER] Streaming stateful operator attempted to 
access state store out of order. This is a bug, please retry. error_msg=Cannot 
update after ABORTED SQLSTATE: XXKST 
org.apache.spark.sql.execution.streaming.state.StateStoreOperationOutOfOrder: 
[STATE_STORE_OPERATION_OUT_OF_ORDER] Streaming stateful operator attempted to 
access state store out of order. This is a bug, please retry. error_msg=Cannot 
update after ABORTED SQLSTATE: XXKST
```

This exception is caught 
[here|https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala#L180-L185].
 At this time, the TWS client in python worker may have already closed, then 
[outputStream.flush()|https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala#L183]
 will throw out `java.nio.channels.ClosedByInterruptException` which is not 
handled properly and crashes the worker.
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to