This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new fb4d6d9db27 [SPARK-44136][SS] Fixed an issue that StateManager may get materialized in executor instead of driver in FlatMapGroupsWithStateExec fb4d6d9db27 is described below commit fb4d6d9db27d0e9642de33d5b3f9915b334ee02c Author: bogao007 <bo....@databricks.com> AuthorDate: Thu Jun 22 09:26:32 2023 +0900 [SPARK-44136][SS] Fixed an issue that StateManager may get materialized in executor instead of driver in FlatMapGroupsWithStateExec ### What changes were proposed in this pull request? Fixed an issue that StateManager may get materialized in executor instead of driver in FlatMapGroupsWithStateExec. The ticket that brought this issue: https://issues.apache.org/jira/browse/SPARK-40411 The basic idea is to maintain the `stateManager` as `lazy val` but initialize it earlier in the `doExecute()` to force a lazy init at driver. ### Why are the changes needed? Because without this change, the StateManager in FlatMapGroupsWithStateExec may get materialized in executor instead of driver which would cause unexpected behavior. ### Does this PR introduce _any_ user-facing change? Yes ### How was this patch tested? It's hard to write a unit test for this since it involves in both driver and executor which is hard to simulate through a unit test. Closes #41693 from bogao007/SPARK-44136. Authored-by: bogao007 <bo....@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala index d30b9ad116f..3c3d55e6208 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FlatMapGroupsWithStateExec.scala @@ -188,6 +188,7 @@ trait FlatMapGroupsWithStateExecBase } override protected def doExecute(): RDD[InternalRow] = { + stateManager // force lazy init at driver metrics // force lazy init at driver // Throw errors early if parameters are not as expected --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org