Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/19196#discussion_r138760476 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala --- @@ -200,18 +202,31 @@ case class StateStoreRestoreExec( sqlContext.sessionState, Some(sqlContext.streams.stateStoreCoordinator)) { case (store, iter) => val getKey = GenerateUnsafeProjection.generate(keyExpressions, child.output) - iter.flatMap { row => - val key = getKey(row) - val savedState = store.get(key) - numOutputRows += 1 - row +: Option(savedState).toSeq + val hasInput = iter.hasNext + if (!hasInput && keyExpressions.isEmpty) { --- End diff -- add docs on why we are doing this. similar to the docs in other places related to batch aggregation.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org