[ https://issues.apache.org/jira/browse/SPARK-53561?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
ASF GitHub Bot updated SPARK-53561: ----------------------------------- Labels: pull-request-available (was: ) > Catch Interruption Exception in TransformWithStateInPySparkStateServer during > outputStream.flush at the thread close time to avoid the worker crash > --------------------------------------------------------------------------------------------------------------------------------------------------- > > Key: SPARK-53561 > URL: https://issues.apache.org/jira/browse/SPARK-53561 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 4.1.0, 4.0.0, 4.0.1 > Reporter: Huanli Wang > Priority: Major > Labels: pull-request-available > > When the `query.stop` is invoked, we first close the the state store running > in the query main thread and then close the TWS state server thread as part > of the [task completion listener > event|https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkPythonRunner.scala#L227-L231]. > So it's possible that query main thread has already closed the state store > while in TWS state server thread, we are still doing the state store > operations. This results in throwing out Exception like > ``` > ERROR TransformWithStateInPySparkStateServer: Error reading message: > [STATE_STORE_OPERATION_OUT_OF_ORDER] Streaming stateful operator attempted to > access state store out of order. This is a bug, please retry. > error_msg=Cannot update after ABORTED SQLSTATE: XXKST > org.apache.spark.sql.execution.streaming.state.StateStoreOperationOutOfOrder: > [STATE_STORE_OPERATION_OUT_OF_ORDER] Streaming stateful operator attempted to > access state store out of order. This is a bug, please retry. > error_msg=Cannot update after ABORTED SQLSTATE: XXKST > ``` > This exception is caught > [here|https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala#L180-L185]. > At this time, the TWS client in python worker may have already closed, then > [outputStream.flush()|https://github.com/apache/spark/blob/bb41e19ae9ee0f7c228ac395800e740944ee355f/sql/core/src/main/scala/org/apache/spark/sql/execution/python/streaming/TransformWithStateInPySparkStateServer.scala#L183] > will throw out `java.nio.channels.ClosedByInterruptException` which is not > handled properly and crashes the worker. > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org