Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20097#discussion_r159364715
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
    @@ -367,21 +416,20 @@ class MicroBatchExecution(
         // Replace sources in the logical plan with data that has arrived 
since the last batch.
         val withNewSources = logicalPlan transform {
           case StreamingExecutionRelation(source, output) =>
    -        newData.get(source).map { data =>
    -          val newPlan = data.logicalPlan
    -          assert(output.size == newPlan.output.size,
    +        newData.get(source).map { dataPlan =>
    +          assert(output.size == dataPlan.output.size,
                 s"Invalid batch: ${Utils.truncatedString(output, ",")} != " +
    -              s"${Utils.truncatedString(newPlan.output, ",")}")
    -          replacements ++= output.zip(newPlan.output)
    -          newPlan
    +              s"${Utils.truncatedString(dataPlan.output, ",")}")
    +          replacements ++= output.zip(dataPlan.output)
    +          dataPlan
             }.getOrElse {
               LocalRelation(output, isStreaming = true)
             }
         }
     
         // Rewire the plan to use the new attributes that were returned by the 
source.
         val replacementMap = AttributeMap(replacements)
    -    val triggerLogicalPlan = withNewSources transformAllExpressions {
    +    val withNewExprs = withNewSources transformAllExpressions {
    --- End diff --
    
    similarly change `withNewSources`


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to