This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 173ba2c6327f [MINOR][SS] Fix indent for streaming aggregation operator
173ba2c6327f is described below

commit 173ba2c6327ff74bab74c1660c80c0c8b43707c9
Author: Anish Shrigondekar <anish.shrigonde...@databricks.com>
AuthorDate: Mon Jan 15 09:26:24 2024 +0900

    [MINOR][SS] Fix indent for streaming aggregation operator
    
    ### What changes were proposed in this pull request?
    Fix indent for streaming aggregation operator
    
    ### Why are the changes needed?
    Indent/style change
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Existing unit tests
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #44723 from anishshri-db/task/SPARK-46712.
    
    Authored-by: Anish Shrigondekar <anish.shrigonde...@databricks.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../execution/streaming/statefulOperators.scala    | 30 +++++++++++-----------
 1 file changed, 15 insertions(+), 15 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index 80f5b3532c5e..8cb99a162ab2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -434,22 +434,22 @@ case class StateStoreRestoreExec(
       numColsPrefixKey = 0,
       session.sessionState,
       Some(session.streams.stateStoreCoordinator)) { case (store, iter) =>
-        val hasInput = iter.hasNext
-        if (!hasInput && keyExpressions.isEmpty) {
-          // If our `keyExpressions` are empty, we're getting a global 
aggregation. In that case
-          // the `HashAggregateExec` will output a 0 value for the partial 
merge. We need to
-          // restore the value, so that we don't overwrite our state with a 0 
value, but rather
-          // merge the 0 with existing state.
-          store.iterator().map(_.value)
-        } else {
-          iter.flatMap { row =>
-            val key = stateManager.getKey(row.asInstanceOf[UnsafeRow])
-            val restoredRow = stateManager.get(store, key)
-            val outputRows = Option(restoredRow).toSeq :+ row
-            numOutputRows += outputRows.size
-            outputRows
-          }
+      val hasInput = iter.hasNext
+      if (!hasInput && keyExpressions.isEmpty) {
+        // If our `keyExpressions` are empty, we're getting a global 
aggregation. In that case
+        // the `HashAggregateExec` will output a 0 value for the partial 
merge. We need to
+        // restore the value, so that we don't overwrite our state with a 0 
value, but rather
+        // merge the 0 with existing state.
+        store.iterator().map(_.value)
+      } else {
+        iter.flatMap { row =>
+          val key = stateManager.getKey(row.asInstanceOf[UnsafeRow])
+          val restoredRow = stateManager.get(store, key)
+          val outputRows = Option(restoredRow).toSeq :+ row
+          numOutputRows += outputRows.size
+          outputRows
         }
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to