[GitHub] [spark] maryannxue commented on a change in pull request #25456: [SPARK-28739][SQL] Add a simple cost check for Adaptive Query Execution

2019-08-21 Thread GitBox
maryannxue commented on a change in pull request #25456: [SPARK-28739][SQL] Add 
a simple cost check for Adaptive Query Execution
URL: https://github.com/apache/spark/pull/25456#discussion_r316257936
 
 

 ##
 File path: 
sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala
 ##
 @@ -152,17 +152,17 @@ class AdaptiveQueryExecSuite extends QueryTest with 
SharedSparkSession {
   val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
 """
   |WITH t4 AS (
-  |  SELECT * FROM lowercaseData t2 JOIN testData3 t3 ON t2.n = t3.a
+  |  SELECT * FROM lowercaseData t2 JOIN testData3 t3 ON t2.n = t3.a 
where t2.n = '1'
 
 Review comment:
   Yeah, without this change, SMJ wouldn't be changed to BHJ because of the new 
cost guard.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #25456: [SPARK-28739][SQL] Add a simple cost check for Adaptive Query Execution

2019-08-16 Thread GitBox
maryannxue commented on a change in pull request #25456: [SPARK-28739][SQL] Add 
a simple cost check for Adaptive Query Execution
URL: https://github.com/apache/spark/pull/25456#discussion_r314808742
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -403,6 +450,18 @@ object AdaptiveSparkPlanExec {
   private val executionContext = ExecutionContext.fromExecutorService(
 ThreadUtils.newDaemonCachedThreadPool("QueryStageCreator", 16))
 
+  /**
+   * The temporary [[LogicalPlan]] link for query stages.
+   *
+   * Temp logical links are set for each query stage after its creation. 
During re-planning, the
 
 Review comment:
   We could do either way (temp or normal link) here I think, because we have a 
post-fix for the link in 
https://github.com/apache/spark/pull/25456/files#diff-6954dd8020a9ca298f1fb9602c0e831cR362,
 which by itself is necessary.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #25456: [SPARK-28739][SQL] Add a simple cost check for Adaptive Query Execution

2019-08-16 Thread GitBox
maryannxue commented on a change in pull request #25456: [SPARK-28739][SQL] Add 
a simple cost check for Adaptive Query Execution
URL: https://github.com/apache/spark/pull/25456#discussion_r314807909
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -317,22 +345,21 @@ case class AdaptiveSparkPlanExec(
*/
   private def updateLogicalPlan(
   logicalPlan: LogicalPlan,
-  newStages: Seq[(Exchange, QueryStageExec)]): LogicalPlan = {
+  newStages: Seq[QueryStageExec]): LogicalPlan = {
 var currentLogicalPlan = logicalPlan
 newStages.foreach {
-  case (exchange, stage) =>
-// Get the corresponding logical node for `exchange`. If `exchange` 
has been transformed
-// from a `Repartition`, it should have `logicalLink` available by 
itself; otherwise
-// traverse down to find the first node that is not generated by 
`EnsureRequirements`.
-val logicalNodeOpt = exchange.logicalLink.orElse(exchange.collectFirst 
{
-  case p if p.logicalLink.isDefined => p.logicalLink.get
-})
+  case stage if currentPhysicalPlan.find(_.eq(stage)).isDefined =>
 
 Review comment:
   Following this comment 
https://github.com/apache/spark/pull/25456/files#r314805998:
   
   Now that we might have `newStages` from different rounds of stage creation, 
some stages might have been included in newer stages already, so those are not 
"reachable" now and we don't need to worry about them any more. But meanwhile 
we need to make sure we always apply the latest stages first: 
https://github.com/apache/spark/pull/25456/files#diff-6954dd8020a9ca298f1fb9602c0e831cR142.
 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] maryannxue commented on a change in pull request #25456: [SPARK-28739][SQL] Add a simple cost check for Adaptive Query Execution

2019-08-16 Thread GitBox
maryannxue commented on a change in pull request #25456: [SPARK-28739][SQL] Add 
a simple cost check for Adaptive Query Execution
URL: https://github.com/apache/spark/pull/25456#discussion_r314805998
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 ##
 @@ -132,21 +135,23 @@ case class AdaptiveSparkPlanExec(
   var result = createQueryStages(currentPhysicalPlan)
   val events = new LinkedBlockingQueue[StageMaterializationEvent]()
   val errors = new mutable.ArrayBuffer[SparkException]()
+  var stagesToReplace = Seq.empty[QueryStageExec]
   while (!result.allChildStagesMaterialized) {
 currentPhysicalPlan = result.newPlan
-currentLogicalPlan = updateLogicalPlan(currentLogicalPlan, 
result.newStages)
-currentPhysicalPlan.setTagValue(SparkPlan.LOGICAL_PLAN_TAG, 
currentLogicalPlan)
-executionId.foreach(onUpdatePlan)
-
-// Start materialization of all new stages.
-result.newStages.map(_._2).foreach { stage =>
-  stage.materialize().onComplete { res =>
-if (res.isSuccess) {
-  events.offer(StageSuccess(stage, res.get))
-} else {
-  events.offer(StageFailure(stage, res.failed.get))
-}
-  }(AdaptiveSparkPlanExec.executionContext)
+if (result.newStages.nonEmpty) {
+  stagesToReplace = result.newStages ++ stagesToReplace
 
 Review comment:
   Yes, one important idea i should have put in code comment here is:
   
   The current logical plan is always updated together with the current 
physical plan, which means if a new physical plan is not adopted after 
re-optimization, the new logical plan (with stages replaced) is not taken 
either 
(https://github.com/apache/spark/pull/25456/files#diff-6954dd8020a9ca298f1fb9602c0e831cR181).
 That also means that the current logical plan is kind of behind the current 
status of the physical plan because the logical plan does not reflect the new 
stages created since last update 
(https://github.com/apache/spark/pull/25456/files#diff-6954dd8020a9ca298f1fb9602c0e831cR188).
 Yet we cannot update the logical plan alone, as all logical links of the 
current physical plan point to the original logical plan it is planned from. So 
as a fix for this "out-of-date" problem, we keep the logical plan together with 
this `stagesToReplace` list, and each time we re-optimize, we update the 
logical plan with those stages (that haven't been applied to it yet) first 
(https://github.com/apache/spark/pull/25456/files#diff-6954dd8020a9ca298f1fb9602c0e831cR177),
 and then start re-optimizing and re-planning on the updated logical plan. If 
the new physical plan is adopted, we take the new physical plan together with 
the new logical plan and clear the `stagesToReplace` list, otherwise, we keep 
the current logical plan and the list as they are.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org