Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20097#discussion_r159364715 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala --- @@ -367,21 +416,20 @@ class MicroBatchExecution( // Replace sources in the logical plan with data that has arrived since the last batch. val withNewSources = logicalPlan transform { case StreamingExecutionRelation(source, output) => - newData.get(source).map { data => - val newPlan = data.logicalPlan - assert(output.size == newPlan.output.size, + newData.get(source).map { dataPlan => + assert(output.size == dataPlan.output.size, s"Invalid batch: ${Utils.truncatedString(output, ",")} != " + - s"${Utils.truncatedString(newPlan.output, ",")}") - replacements ++= output.zip(newPlan.output) - newPlan + s"${Utils.truncatedString(dataPlan.output, ",")}") + replacements ++= output.zip(dataPlan.output) + dataPlan }.getOrElse { LocalRelation(output, isStreaming = true) } } // Rewire the plan to use the new attributes that were returned by the source. val replacementMap = AttributeMap(replacements) - val triggerLogicalPlan = withNewSources transformAllExpressions { + val withNewExprs = withNewSources transformAllExpressions { --- End diff -- similarly change `withNewSources`
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org