This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f53840  [SPARK-35881][SQL] Add support for columnar execution of 
final query stage in AdaptiveSparkPlanExec
0f53840 is described below

commit 0f538402fb76e4d6182cc881219d53b5fdf73af1
Author: Andy Grove <andygrov...@gmail.com>
AuthorDate: Fri Jul 30 13:21:50 2021 -0500

    [SPARK-35881][SQL] Add support for columnar execution of final query stage 
in AdaptiveSparkPlanExec
    
    ### What changes were proposed in this pull request?
    
    Changes in this PR:
    
    - `AdaptiveSparkPlanExec` has new methods `finalPlanSupportsColumnar` and 
`doExecuteColumnar` to support adaptive queries where the final query stage 
produces columnar data.
    - `SessionState` now has a new set of injectable rules named 
`finalQueryStagePrepRules` that can be applied to the final query stage.
    - `AdaptiveSparkPlanExec` can now safely be wrapped by either 
`RowToColumnarExec` or `ColumnarToRowExec`.
    
    A Spark plugin can use the new rules to remove the root `ColumnarToRowExec` 
transition that is inserted by previous rules and at execution time can call 
`finalPlanSupportsColumnar` to see if the final query stage is columnar. If the 
plan is columnar then the plugin can safely call `doExecuteColumnar`. The 
adaptive plan can be wrapped in either `RowToColumnarExec` or 
`ColumnarToRowExec` to force a particular output format. There are fast paths 
in both of these operators to avoid any re [...]
    
    ### Why are the changes needed?
    
    Without this change it is necessary to use reflection to get the final 
physical plan to determine whether it is columnar and to execute it is a 
columnar plan. `AdaptiveSparkPlanExec` only provides public methods for 
row-based execution.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    I have manually tested this patch with the RAPIDS Accelerator for Apache 
Spark.
    
    Closes #33140 from andygrove/support-columnar-adaptive.
    
    Authored-by: Andy Grove <andygrov...@gmail.com>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../apache/spark/sql/SparkSessionExtensions.scala  |  22 +++-
 .../org/apache/spark/sql/execution/Columnar.scala  | 134 ++++++++++++---------
 .../execution/adaptive/AdaptiveSparkPlanExec.scala |  46 +++++--
 .../sql/internal/BaseSessionStateBuilder.scala     |   7 +-
 .../apache/spark/sql/internal/SessionState.scala   |   3 +-
 5 files changed, 141 insertions(+), 71 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala
index b14dce6..18ebae5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSessionExtensions.scala
@@ -47,6 +47,7 @@ import org.apache.spark.sql.execution.{ColumnarRule, 
SparkPlan}
  * <li>(External) Catalog listeners.</li>
  * <li>Columnar Rules.</li>
  * <li>Adaptive Query Stage Preparation Rules.</li>
+ * <li>Adaptive Query Post Stage Preparation Rules.</li>
  * </ul>
  *
  * The extensions can be used by calling `withExtensions` on the 
[[SparkSession.Builder]], for
@@ -110,9 +111,12 @@ class SparkSessionExtensions {
   type TableFunctionDescription = (FunctionIdentifier, ExpressionInfo, 
TableFunctionBuilder)
   type ColumnarRuleBuilder = SparkSession => ColumnarRule
   type QueryStagePrepRuleBuilder = SparkSession => Rule[SparkPlan]
+  type PostStageCreationRuleBuilder = SparkSession => Rule[SparkPlan]
 
   private[this] val columnarRuleBuilders = 
mutable.Buffer.empty[ColumnarRuleBuilder]
   private[this] val queryStagePrepRuleBuilders = 
mutable.Buffer.empty[QueryStagePrepRuleBuilder]
+  private[this] val postStageCreationRuleBuilders =
+    mutable.Buffer.empty[PostStageCreationRuleBuilder]
 
   /**
    * Build the override rules for columnar execution.
@@ -129,6 +133,14 @@ class SparkSessionExtensions {
   }
 
   /**
+   * Build the override rules for the final query stage preparation phase of 
adaptive query
+   * execution.
+   */
+  private[sql] def buildPostStageCreationRules(session: SparkSession): 
Seq[Rule[SparkPlan]] = {
+    postStageCreationRuleBuilders.map(_.apply(session)).toSeq
+  }
+
+  /**
    * Inject a rule that can override the columnar execution of an executor.
    */
   def injectColumnar(builder: ColumnarRuleBuilder): Unit = {
@@ -136,13 +148,21 @@ class SparkSessionExtensions {
   }
 
   /**
-   * Inject a rule that can override the the query stage preparation phase of 
adaptive query
+   * Inject a rule that can override the query stage preparation phase of 
adaptive query
    * execution.
    */
   def injectQueryStagePrepRule(builder: QueryStagePrepRuleBuilder): Unit = {
     queryStagePrepRuleBuilders += builder
   }
 
+  /**
+   * Inject a rule that can override the final query stage preparation phase 
of adaptive query
+   * execution.
+   */
+  def injectPostStageCreationRule(builder: PostStageCreationRuleBuilder): Unit 
= {
+    postStageCreationRuleBuilders += builder
+  }
+
   private[this] val resolutionRuleBuilders = mutable.Buffer.empty[RuleBuilder]
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
index 2be8233..406e757 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala
@@ -28,6 +28,7 @@ import 
org.apache.spark.sql.catalyst.expressions.codegen.Block._
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.errors.QueryExecutionErrors
+import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec
 import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
 import org.apache.spark.sql.execution.vectorized.{OffHeapColumnVector, 
OnHeapColumnVector, WritableColumnVector}
 import org.apache.spark.sql.types._
@@ -65,7 +66,9 @@ trait ColumnarToRowTransition extends UnaryExecNode
  * [[MapPartitionsInRWithArrowExec]]. Eventually this should replace those 
implementations.
  */
 case class ColumnarToRowExec(child: SparkPlan) extends ColumnarToRowTransition 
with CodegenSupport {
-  assert(child.supportsColumnar)
+  // child plan must be columnar or an adaptive plan, which could either be 
row-based or
+  // columnar, but we don't know until we execute it
+  assert(child.supportsColumnar || child.isInstanceOf[AdaptiveSparkPlanExec])
 
   override def output: Seq[Attribute] = child.output
 
@@ -83,18 +86,25 @@ case class ColumnarToRowExec(child: SparkPlan) extends 
ColumnarToRowTransition w
   )
 
   override def doExecute(): RDD[InternalRow] = {
-    val numOutputRows = longMetric("numOutputRows")
-    val numInputBatches = longMetric("numInputBatches")
-    // This avoids calling `output` in the RDD closure, so that we don't need 
to include the entire
-    // plan (this) in the closure.
-    val localOutput = this.output
-    child.executeColumnar().mapPartitionsInternal { batches =>
-      val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
-      batches.flatMap { batch =>
-        numInputBatches += 1
-        numOutputRows += batch.numRows()
-        batch.rowIterator().asScala.map(toUnsafe)
-      }
+    child match {
+      case a: AdaptiveSparkPlanExec if !a.finalPlanSupportsColumnar() =>
+        // if the child plan is adaptive and resulted in rows rather than 
columnar data
+        // then we can bypass any transition
+        a.execute()
+      case _ =>
+        val numOutputRows = longMetric("numOutputRows")
+        val numInputBatches = longMetric("numInputBatches")
+        // This avoids calling `output` in the RDD closure, so that we don't 
need to include
+        // the entire plan (this) in the closure.
+        val localOutput = this.output
+        child.executeColumnar().mapPartitionsInternal { batches =>
+          val toUnsafe = UnsafeProjection.create(localOutput, localOutput)
+          batches.flatMap { batch =>
+            numInputBatches += 1
+            numOutputRows += batch.numRows()
+            batch.rowIterator().asScala.map(toUnsafe)
+          }
+        }
     }
   }
 
@@ -419,6 +429,10 @@ trait RowToColumnarTransition extends UnaryExecNode
  * would only be to reduce code.
  */
 case class RowToColumnarExec(child: SparkPlan) extends RowToColumnarTransition 
{
+  // child plan must be row-based or an adaptive plan, which could either be 
row-based or
+  // columnar, but we don't know until we execute it
+  assert(!child.supportsColumnar || child.isInstanceOf[AdaptiveSparkPlanExec])
+
   override def output: Seq[Attribute] = child.output
 
   override def outputPartitioning: Partitioning = child.outputPartitioning
@@ -441,52 +455,60 @@ case class RowToColumnarExec(child: SparkPlan) extends 
RowToColumnarTransition {
   )
 
   override def doExecuteColumnar(): RDD[ColumnarBatch] = {
-    val enableOffHeapColumnVector = conf.offHeapColumnVectorEnabled
-    val numInputRows = longMetric("numInputRows")
-    val numOutputBatches = longMetric("numOutputBatches")
-    // Instead of creating a new config we are reusing columnBatchSize. In the 
future if we do
-    // combine with some of the Arrow conversion tools we will need to unify 
some of the configs.
-    val numRows = conf.columnBatchSize
-    // This avoids calling `schema` in the RDD closure, so that we don't need 
to include the entire
-    // plan (this) in the closure.
-    val localSchema = this.schema
-    child.execute().mapPartitionsInternal { rowIterator =>
-      if (rowIterator.hasNext) {
-        new Iterator[ColumnarBatch] {
-          private val converters = new RowToColumnConverter(localSchema)
-          private val vectors: Seq[WritableColumnVector] = if 
(enableOffHeapColumnVector) {
-            OffHeapColumnVector.allocateColumns(numRows, localSchema)
-          } else {
-            OnHeapColumnVector.allocateColumns(numRows, localSchema)
-          }
-          private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray)
-
-          TaskContext.get().addTaskCompletionListener[Unit] { _ =>
-            cb.close()
-          }
-
-          override def hasNext: Boolean = {
-            rowIterator.hasNext
-          }
-
-          override def next(): ColumnarBatch = {
-            cb.setNumRows(0)
-            vectors.foreach(_.reset())
-            var rowCount = 0
-            while (rowCount < numRows && rowIterator.hasNext) {
-              val row = rowIterator.next()
-              converters.convert(row, vectors.toArray)
-              rowCount += 1
+    child match {
+      case a: AdaptiveSparkPlanExec if a.finalPlanSupportsColumnar() =>
+        // if the child plan is adaptive and resulted in columnar data
+        // then we can bypass any transition
+        a.executeColumnar()
+      case _ =>
+        val enableOffHeapColumnVector = conf.offHeapColumnVectorEnabled
+        val numInputRows = longMetric("numInputRows")
+        val numOutputBatches = longMetric("numOutputBatches")
+        // Instead of creating a new config we are reusing columnBatchSize. In 
the future if we do
+        // combine with some of the Arrow conversion tools we will need to 
unify some of the
+        // configs.
+        val numRows = conf.columnBatchSize
+        // This avoids calling `schema` in the RDD closure, so that we don't 
need to include the
+        // entire plan (this) in the closure.
+        val localSchema = this.schema
+        child.execute().mapPartitionsInternal { rowIterator =>
+          if (rowIterator.hasNext) {
+            new Iterator[ColumnarBatch] {
+              private val converters = new RowToColumnConverter(localSchema)
+              private val vectors: Seq[WritableColumnVector] = if 
(enableOffHeapColumnVector) {
+                OffHeapColumnVector.allocateColumns(numRows, localSchema)
+              } else {
+                OnHeapColumnVector.allocateColumns(numRows, localSchema)
+              }
+              private val cb: ColumnarBatch = new 
ColumnarBatch(vectors.toArray)
+
+              TaskContext.get().addTaskCompletionListener[Unit] { _ =>
+                cb.close()
+              }
+
+              override def hasNext: Boolean = {
+                rowIterator.hasNext
+              }
+
+              override def next(): ColumnarBatch = {
+                cb.setNumRows(0)
+                vectors.foreach(_.reset())
+                var rowCount = 0
+                while (rowCount < numRows && rowIterator.hasNext) {
+                  val row = rowIterator.next()
+                  converters.convert(row, vectors.toArray)
+                  rowCount += 1
+                }
+                cb.setNumRows(rowCount)
+                numInputRows += rowCount
+                numOutputBatches += 1
+                cb
+              }
             }
-            cb.setNumRows(rowCount)
-            numInputRows += rowCount
-            numOutputBatches += 1
-            cb
+          } else {
+            Iterator.empty
           }
         }
-      } else {
-        Iterator.empty
-      }
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
index 2f6619d..c03bb4b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala
@@ -41,6 +41,7 @@ import 
org.apache.spark.sql.execution.bucketing.DisableUnnecessaryBucketedScan
 import org.apache.spark.sql.execution.exchange._
 import 
org.apache.spark.sql.execution.ui.{SparkListenerSQLAdaptiveExecutionUpdate, 
SparkListenerSQLAdaptiveSQLMetricUpdates, SQLPlanMetric}
 import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.vectorized.ColumnarBatch
 import org.apache.spark.util.ThreadUtils
 
 /**
@@ -111,7 +112,7 @@ case class AdaptiveSparkPlanExec(
   @transient private val postStageCreationRules = Seq(
     
ApplyColumnarRulesAndInsertTransitions(context.session.sessionState.columnarRules),
     CollapseCodegenStages()
-  )
+  ) ++ context.session.sessionState.postStageCreationRules
 
   // The partitioning of the query output depends on the shuffle(s) in the 
final stage. If the
   // original plan contains a repartition operator, we need to preserve the 
specified partitioning,
@@ -187,6 +188,13 @@ case class AdaptiveSparkPlanExec(
 
   override def doCanonicalize(): SparkPlan = inputPlan.canonicalized
 
+  // This operator reports that output is row-based but because of the 
adaptive nature of
+  // execution, we don't really know whether the output is going to row-based 
or columnar
+  // until we start running the query, so there is a finalPlanSupportsColumnar 
method that
+  // can be called at execution time to determine what the output format is.
+  // This operator can safely be wrapped in either RowToColumnarExec or 
ColumnarToRowExec.
+  override def supportsColumnar: Boolean = false
+
   override def resetMetrics(): Unit = {
     metrics.valuesIterator.foreach(_.reset())
     executedPlan.resetMetrics()
@@ -314,27 +322,41 @@ case class AdaptiveSparkPlanExec(
   }
 
   override def executeCollect(): Array[InternalRow] = {
-    val rdd = getFinalPhysicalPlan().executeCollect()
-    finalPlanUpdate
-    rdd
+    withFinalPlanUpdate(_.executeCollect())
   }
 
   override def executeTake(n: Int): Array[InternalRow] = {
-    val rdd = getFinalPhysicalPlan().executeTake(n)
-    finalPlanUpdate
-    rdd
+    withFinalPlanUpdate(_.executeTake(n))
   }
 
   override def executeTail(n: Int): Array[InternalRow] = {
-    val rdd = getFinalPhysicalPlan().executeTail(n)
-    finalPlanUpdate
-    rdd
+    withFinalPlanUpdate(_.executeTail(n))
   }
 
   override def doExecute(): RDD[InternalRow] = {
-    val rdd = getFinalPhysicalPlan().execute()
+    withFinalPlanUpdate(_.execute())
+  }
+
+  /**
+   * Determine if the final query stage supports columnar execution. Calling 
this method
+   * will trigger query execution of child query stages if they have not 
already executed.
+   *
+   * If this method returns true then it is safe to call doExecuteColumnar to 
execute the
+   * final stage.
+   */
+  def finalPlanSupportsColumnar(): Boolean = {
+    getFinalPhysicalPlan().supportsColumnar
+  }
+
+  override def doExecuteColumnar(): RDD[ColumnarBatch] = {
+    withFinalPlanUpdate(_.executeColumnar())
+  }
+
+  private def withFinalPlanUpdate[T](fun: SparkPlan => T): T = {
+    val plan = getFinalPhysicalPlan()
+    val result = fun(plan)
     finalPlanUpdate
-    rdd
+    result
   }
 
   override def doExecuteBroadcast[T](): broadcast.Broadcast[T] = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
index 8289819..1c0c916 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/BaseSessionStateBuilder.scala
@@ -307,6 +307,10 @@ abstract class BaseSessionStateBuilder(
     extensions.buildQueryStagePrepRules(session)
   }
 
+  protected def postStageCreationRules: Seq[Rule[SparkPlan]] = {
+    extensions.buildPostStageCreationRules(session)
+  }
+
   /**
    * Create a query execution object.
    */
@@ -360,7 +364,8 @@ abstract class BaseSessionStateBuilder(
       createQueryExecution,
       createClone,
       columnarRules,
-      queryStagePrepRules)
+      queryStagePrepRules,
+      postStageCreationRules)
   }
 }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
index cdf764a..7685e54 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SessionState.scala
@@ -79,7 +79,8 @@ private[sql] class SessionState(
     createQueryExecution: (LogicalPlan, CommandExecutionMode.Value) => 
QueryExecution,
     createClone: (SparkSession, SessionState) => SessionState,
     val columnarRules: Seq[ColumnarRule],
-    val queryStagePrepRules: Seq[Rule[SparkPlan]]) {
+    val queryStagePrepRules: Seq[Rule[SparkPlan]],
+    val postStageCreationRules: Seq[Rule[SparkPlan]]) {
 
   // The following fields are lazy to avoid creating the Hive client when 
creating SessionState.
   lazy val catalog: SessionCatalog = catalogBuilder()

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to