maryannxue commented on a change in pull request #25456: [SPARK-28739][SQL] Add a simple cost check for Adaptive Query Execution URL: https://github.com/apache/spark/pull/25456#discussion_r314805998
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala ########## @@ -132,21 +135,23 @@ case class AdaptiveSparkPlanExec( var result = createQueryStages(currentPhysicalPlan) val events = new LinkedBlockingQueue[StageMaterializationEvent]() val errors = new mutable.ArrayBuffer[SparkException]() + var stagesToReplace = Seq.empty[QueryStageExec] while (!result.allChildStagesMaterialized) { currentPhysicalPlan = result.newPlan - currentLogicalPlan = updateLogicalPlan(currentLogicalPlan, result.newStages) - currentPhysicalPlan.setTagValue(SparkPlan.LOGICAL_PLAN_TAG, currentLogicalPlan) - executionId.foreach(onUpdatePlan) - - // Start materialization of all new stages. - result.newStages.map(_._2).foreach { stage => - stage.materialize().onComplete { res => - if (res.isSuccess) { - events.offer(StageSuccess(stage, res.get)) - } else { - events.offer(StageFailure(stage, res.failed.get)) - } - }(AdaptiveSparkPlanExec.executionContext) + if (result.newStages.nonEmpty) { + stagesToReplace = result.newStages ++ stagesToReplace Review comment: Yes, one important idea i should have put in code comment here is: The current logical plan is always updated together with the current physical plan, which means if a new physical plan is not adopted after re-optimization, the new logical plan (with stages replaced) is not taken either (https://github.com/apache/spark/pull/25456/files#diff-6954dd8020a9ca298f1fb9602c0e831cR181). That also means that the current logical plan is kind of behind the current status of the physical plan because the logical plan does not reflect the new stages created since last update (https://github.com/apache/spark/pull/25456/files#diff-6954dd8020a9ca298f1fb9602c0e831cR188). Yet we cannot update the logical plan alone, as all logical links of the current physical plan point to the original logical plan it is planned from. So as a fix for this "out-of-date" problem, we keep the logical plan together with this `stagesToReplace` list, and each time we re-optimize, we update the logical plan with those stages (that haven't been applied to it yet) first (https://github.com/apache/spark/pull/25456/files#diff-6954dd8020a9ca298f1fb9602c0e831cR177), and then start re-optimizing and re-planning on the updated logical plan. If the new physical plan is adopted, we take the new physical plan together with the new logical plan and clear the `stagesToReplace` list, otherwise, we keep the current logical plan and the list as they are. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org