This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 173ba2c6327f [MINOR][SS] Fix indent for streaming aggregation operator 173ba2c6327f is described below commit 173ba2c6327ff74bab74c1660c80c0c8b43707c9 Author: Anish Shrigondekar <anish.shrigonde...@databricks.com> AuthorDate: Mon Jan 15 09:26:24 2024 +0900 [MINOR][SS] Fix indent for streaming aggregation operator ### What changes were proposed in this pull request? Fix indent for streaming aggregation operator ### Why are the changes needed? Indent/style change ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing unit tests ### Was this patch authored or co-authored using generative AI tooling? No Closes #44723 from anishshri-db/task/SPARK-46712. Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../execution/streaming/statefulOperators.scala | 30 +++++++++++----------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 80f5b3532c5e..8cb99a162ab2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -434,22 +434,22 @@ case class StateStoreRestoreExec( numColsPrefixKey = 0, session.sessionState, Some(session.streams.stateStoreCoordinator)) { case (store, iter) => - val hasInput = iter.hasNext - if (!hasInput && keyExpressions.isEmpty) { - // If our `keyExpressions` are empty, we're getting a global aggregation. In that case - // the `HashAggregateExec` will output a 0 value for the partial merge. We need to - // restore the value, so that we don't overwrite our state with a 0 value, but rather - // merge the 0 with existing state. - store.iterator().map(_.value) - } else { - iter.flatMap { row => - val key = stateManager.getKey(row.asInstanceOf[UnsafeRow]) - val restoredRow = stateManager.get(store, key) - val outputRows = Option(restoredRow).toSeq :+ row - numOutputRows += outputRows.size - outputRows - } + val hasInput = iter.hasNext + if (!hasInput && keyExpressions.isEmpty) { + // If our `keyExpressions` are empty, we're getting a global aggregation. In that case + // the `HashAggregateExec` will output a 0 value for the partial merge. We need to + // restore the value, so that we don't overwrite our state with a 0 value, but rather + // merge the 0 with existing state. + store.iterator().map(_.value) + } else { + iter.flatMap { row => + val key = stateManager.getKey(row.asInstanceOf[UnsafeRow]) + val restoredRow = stateManager.get(store, key) + val outputRows = Option(restoredRow).toSeq :+ row + numOutputRows += outputRows.size + outputRows } + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org