This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new e33972e [SPARK-35566][SS] Fix StateStoreRestoreExec output rows e33972e is described below commit e33972e4075cf7ceb24cb288e5a62ff2d418a698 Author: Liang-Chi Hsieh <vii...@gmail.com> AuthorDate: Mon May 31 16:45:56 2021 +0900 [SPARK-35566][SS] Fix StateStoreRestoreExec output rows ### What changes were proposed in this pull request? This is a minor change to update how `StateStoreRestoreExec` computes its number of output rows. Previously we only count input rows, but the optionally restored rows are not counted in. ### Why are the changes needed? Currently the number of output rows of `StateStoreRestoreExec` only counts the each input row. But it actually outputs input rows + optional restored rows. We should provide correct number of output rows. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests. Closes #32703 from viirya/fix-outputrows. Authored-by: Liang-Chi Hsieh <vii...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> (cherry picked from commit 73ba4492b1deea953e4f22dbf36dfcacd81c0f8a) Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- .../org/apache/spark/sql/execution/streaming/statefulOperators.scala | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 1bec924..8e12099 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -251,8 +251,9 @@ case class StateStoreRestoreExec( iter.flatMap { row => val key = stateManager.getKey(row.asInstanceOf[UnsafeRow]) val restoredRow = stateManager.get(store, key) - numOutputRows += 1 - Option(restoredRow).toSeq :+ row + val outputRows = Option(restoredRow).toSeq :+ row + numOutputRows += outputRows.size + outputRows } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org