maryannxue commented on a change in pull request #25456: [SPARK-28739][SQL] Add 
a simple cost check for Adaptive Query Execution
URL: https://github.com/apache/spark/pull/25456#discussion_r314805998
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##########
 @@ -132,21 +135,23 @@ case class AdaptiveSparkPlanExec(
       var result = createQueryStages(currentPhysicalPlan)
       val events = new LinkedBlockingQueue[StageMaterializationEvent]()
       val errors = new mutable.ArrayBuffer[SparkException]()
+      var stagesToReplace = Seq.empty[QueryStageExec]
       while (!result.allChildStagesMaterialized) {
         currentPhysicalPlan = result.newPlan
-        currentLogicalPlan = updateLogicalPlan(currentLogicalPlan, 
result.newStages)
-        currentPhysicalPlan.setTagValue(SparkPlan.LOGICAL_PLAN_TAG, 
currentLogicalPlan)
-        executionId.foreach(onUpdatePlan)
-
-        // Start materialization of all new stages.
-        result.newStages.map(_._2).foreach { stage =>
-          stage.materialize().onComplete { res =>
-            if (res.isSuccess) {
-              events.offer(StageSuccess(stage, res.get))
-            } else {
-              events.offer(StageFailure(stage, res.failed.get))
-            }
-          }(AdaptiveSparkPlanExec.executionContext)
+        if (result.newStages.nonEmpty) {
+          stagesToReplace = result.newStages ++ stagesToReplace
 
 Review comment:
   Yes, one important idea i should have put in code comment here is:
   
   The current logical plan is always updated together with the current 
physical plan, which means if a new physical plan is not adopted after 
re-optimization, the new logical plan (with stages replaced) is not taken 
either 
(https://github.com/apache/spark/pull/25456/files#diff-6954dd8020a9ca298f1fb9602c0e831cR181).
 That also means that the current logical plan is kind of behind the current 
status of the physical plan because the logical plan does not reflect the new 
stages created since last update 
(https://github.com/apache/spark/pull/25456/files#diff-6954dd8020a9ca298f1fb9602c0e831cR188).
 Yet we cannot update the logical plan alone, as all logical links of the 
current physical plan point to the original logical plan it is planned from. So 
as a fix for this "out-of-date" problem, we keep the logical plan together with 
this `stagesToReplace` list, and each time we re-optimize, we update the 
logical plan with those stages (that haven't been applied to it yet) first 
(https://github.com/apache/spark/pull/25456/files#diff-6954dd8020a9ca298f1fb9602c0e831cR177),
 and then start re-optimizing and re-planning on the updated logical plan. If 
the new physical plan is adopted, we take the new physical plan together with 
the new logical plan and clear the `stagesToReplace` list, otherwise, we keep 
the current logical plan and the list as they are.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to