Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19196#discussion_r138760476
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 ---
    @@ -200,18 +202,31 @@ case class StateStoreRestoreExec(
           sqlContext.sessionState,
           Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) 
=>
             val getKey = GenerateUnsafeProjection.generate(keyExpressions, 
child.output)
    -        iter.flatMap { row =>
    -          val key = getKey(row)
    -          val savedState = store.get(key)
    -          numOutputRows += 1
    -          row +: Option(savedState).toSeq
    +        val hasInput = iter.hasNext
    +        if (!hasInput && keyExpressions.isEmpty) {
    --- End diff --
    
    add docs on why we are doing this. similar to the docs in other places 
related to batch aggregation.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to