This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b94aad  [SPARK-35552][SQL] Make query stage materialized more readable
3b94aad is described below

commit 3b94aad5e72a6b96e4a8f517ac60e0a2fed2590b
Author: ulysses-you <ulyssesyo...@gmail.com>
AuthorDate: Fri May 28 20:42:11 2021 +0800

    [SPARK-35552][SQL] Make query stage materialized more readable
    
    ### What changes were proposed in this pull request?
    
    Add a new method `isMaterialized` in `QueryStageExec`.
    
    ### Why are the changes needed?
    
    Currently, we use `resultOption().get.isDefined` to check if a query stage 
has materialized. The code is not readable at a glance. It's better to use a 
new method like `isMaterialized` to define it.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Pass CI.
    
    Closes #32689 from ulysses-you/SPARK-35552.
    
    Authored-by: ulysses-you <ulyssesyo...@gmail.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala   | 5 ++---
 .../spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala       | 6 +++---
 .../apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala | 2 +-
 .../org/apache/spark/sql/execution/adaptive/QueryStageExec.scala   | 7 +++++--
 4 files changed, 11 insertions(+), 9 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala
index 614fc78..648d2e7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEPropagateEmptyRelation.scala
@@ -37,14 +37,13 @@ object AQEPropagateEmptyRelation extends 
PropagateEmptyRelationBase {
     super.nonEmpty(plan) || getRowCount(plan).exists(_ > 0)
 
   private def getRowCount(plan: LogicalPlan): Option[BigInt] = plan match {
-    case LogicalQueryStage(_, stage: QueryStageExec) if 
stage.resultOption.get().isDefined =>
+    case LogicalQueryStage(_, stage: QueryStageExec) if stage.isMaterialized =>
       stage.getRuntimeStatistics.rowCount
     case _ => None
   }
 
   private def isRelationWithAllNullKeys(plan: LogicalPlan): Boolean = plan 
match {
-    case LogicalQueryStage(_, stage: BroadcastQueryStageExec)
-      if stage.resultOption.get().isDefined =>
+    case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if 
stage.isMaterialized =>
       stage.broadcast.relationFuture.get().value == 
HashedRelationWithAllNullKeys
     case _ => false
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 556c036..ebff790 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -420,7 +420,7 @@ case class AdaptiveSparkPlanExec(
       context.stageCache.get(e.canonicalized) match {
         case Some(existingStage) if conf.exchangeReuseEnabled =>
           val stage = reuseQueryStage(existingStage, e)
-          val isMaterialized = stage.resultOption.get().isDefined
+          val isMaterialized = stage.isMaterialized
           CreateStageResult(
             newPlan = stage,
             allChildStagesMaterialized = isMaterialized,
@@ -442,7 +442,7 @@ case class AdaptiveSparkPlanExec(
                 newStage = reuseQueryStage(queryStage, e)
               }
             }
-            val isMaterialized = newStage.resultOption.get().isDefined
+            val isMaterialized = newStage.isMaterialized
             CreateStageResult(
               newPlan = newStage,
               allChildStagesMaterialized = isMaterialized,
@@ -455,7 +455,7 @@ case class AdaptiveSparkPlanExec(
 
     case q: QueryStageExec =>
       CreateStageResult(newPlan = q,
-        allChildStagesMaterialized = q.resultOption.get().isDefined, newStages 
= Seq.empty)
+        allChildStagesMaterialized = q.isMaterialized, newStages = Seq.empty)
 
     case _ =>
       if (plan.children.isEmpty) {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala
index 61124f0..a8c74b5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/DynamicJoinSelection.scala
@@ -53,7 +53,7 @@ object DynamicJoinSelection extends Rule[LogicalPlan] {
   }
 
   private def selectJoinStrategy(plan: LogicalPlan): Option[JoinStrategyHint] 
= plan match {
-    case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if 
stage.resultOption.get().isDefined
+    case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if 
stage.isMaterialized
       && stage.mapStats.isDefined =>
       val demoteBroadcastHash = 
shouldDemoteBroadcastHashJoin(stage.mapStats.get)
       val preferShuffleHash = preferShuffledHashJoin(stage.mapStats.get)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index a4ec4f1..6451d0b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -95,11 +95,13 @@ abstract class QueryStageExec extends LeafExecNode {
   /**
    * Compute the statistics of the query stage if executed, otherwise None.
    */
-  def computeStats(): Option[Statistics] = resultOption.get().map { _ =>
+  def computeStats(): Option[Statistics] = if (isMaterialized) {
     val runtimeStats = getRuntimeStatistics
     val dataSize = runtimeStats.sizeInBytes.max(0)
     val numOutputRows = runtimeStats.rowCount.map(_.max(0))
-    Statistics(dataSize, numOutputRows, isRuntime = true)
+    Some(Statistics(dataSize, numOutputRows, isRuntime = true))
+  } else {
+    None
   }
 
   @transient
@@ -107,6 +109,7 @@ abstract class QueryStageExec extends LeafExecNode {
   protected var _resultOption = new AtomicReference[Option[Any]](None)
 
   private[adaptive] def resultOption: AtomicReference[Option[Any]] = 
_resultOption
+  def isMaterialized: Boolean = resultOption.get().isDefined
 
   override def output: Seq[Attribute] = plan.output
   override def outputPartitioning: Partitioning = plan.outputPartitioning

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to