This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8c800160b62 [SPARK-38999][SQL] Refactor `FileSourceScanExec`: file 
scan physical node
8c800160b62 is described below

commit 8c800160b62657fa5ab16a69ab694360897468d6
Author: Utkarsh <utkarsh.agar...@databricks.com>
AuthorDate: Mon Apr 25 15:44:20 2022 +0800

    [SPARK-38999][SQL] Refactor `FileSourceScanExec`: file scan physical node
    
    ### What changes were proposed in this pull request?
    The PR refactors `FileSourceScanExec` case class into a base trait 
`FileSourceScanLike` which is then subclassed by `FileSourceScanExec`. 
`FileSourceScanLike` contains basic functionality like metrics and file listing 
while the `FileSourceScanExec` contains execution specific code.
    
    ### Why are the changes needed?
    Currently the code for `FileSourceScanExec` class, the physical node for 
the file scans is quite complex and lengthy making it slightly difficult to 
reason about.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Code refactor, existing tests should suffice.
    
    Closes #36327 from utkarsh39/split-file-scan-node.
    
    Authored-by: Utkarsh <utkarsh.agar...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../spark/sql/execution/DataSourceScanExec.scala   | 208 ++++++++++++---------
 1 file changed, 117 insertions(+), 91 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
index 5cf8aa91ea5..953a7db0f9d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.execution
 
 import java.util.concurrent.TimeUnit._
 
-import scala.collection.mutable.HashMap
-
 import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.fs.Path
 
@@ -179,48 +177,34 @@ case class RowDataSourceScanExec(
 }
 
 /**
- * Physical plan node for scanning data from HadoopFsRelations.
- *
- * @param relation The file-based relation to scan.
- * @param output Output attributes of the scan, including data attributes and 
partition attributes.
- * @param requiredSchema Required schema of the underlying relation, excluding 
partition columns.
- * @param partitionFilters Predicates to use for partition pruning.
- * @param optionalBucketSet Bucket ids for bucket pruning.
- * @param optionalNumCoalescedBuckets Number of coalesced buckets.
- * @param dataFilters Filters on non-partition columns.
- * @param tableIdentifier Identifier for the table in the metastore.
- * @param disableBucketedScan Disable bucketed scan based on physical query 
plan, see rule
- *                            [[DisableUnnecessaryBucketedScan]] for details.
+ * A base trait for file scans containing file listing and metrics code.
  */
-case class FileSourceScanExec(
-    @transient relation: HadoopFsRelation,
-    output: Seq[Attribute],
-    requiredSchema: StructType,
-    partitionFilters: Seq[Expression],
-    optionalBucketSet: Option[BitSet],
-    optionalNumCoalescedBuckets: Option[Int],
-    dataFilters: Seq[Expression],
-    tableIdentifier: Option[TableIdentifier],
-    disableBucketedScan: Boolean = false)
-  extends DataSourceScanExec {
+trait FileSourceScanLike extends DataSourceScanExec {
+
+  // Filters on non-partition columns.
+  def dataFilters: Seq[Expression]
+  // Disable bucketed scan based on physical query plan, see rule
+  // [[DisableUnnecessaryBucketedScan]] for details.
+  def disableBucketedScan: Boolean
+  // Bucket ids for bucket pruning.
+  def optionalBucketSet: Option[BitSet]
+  // Number of coalesced buckets.
+  def optionalNumCoalescedBuckets: Option[Int]
+  // Output attributes of the scan, including data attributes and partition 
attributes.
+  def output: Seq[Attribute]
+  // Predicates to use for partition pruning.
+  def partitionFilters: Seq[Expression]
+  // The file-based relation to scan.
+  def relation: HadoopFsRelation
+  // Required schema of the underlying relation, excluding partition columns.
+  def requiredSchema: StructType
+  // Identifier for the table in the metastore.
+  def tableIdentifier: Option[TableIdentifier]
+
 
   lazy val metadataColumns: Seq[AttributeReference] =
     output.collect { case FileSourceMetadataAttribute(attr) => attr }
 
-  // Note that some vals referring the file-based relation are lazy 
intentionally
-  // so that this plan can be canonicalized on executor side too. See 
SPARK-23731.
-  override lazy val supportsColumnar: Boolean = {
-    relation.fileFormat.supportBatch(relation.sparkSession, schema)
-  }
-
-  private lazy val needsUnsafeRowConversion: Boolean = {
-    if (relation.fileFormat.isInstanceOf[ParquetSource]) {
-      conf.parquetVectorizedReaderEnabled
-    } else {
-      false
-    }
-  }
-
   override def vectorTypes: Option[Seq[String]] =
     relation.fileFormat.vectorTypes(
       requiredSchema = requiredSchema,
@@ -230,17 +214,28 @@ case class FileSourceScanExec(
         vectorTypes ++ 
Seq.fill(metadataColumns.size)(classOf[ConstantColumnVector].getName)
       }
 
-  private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty
+  lazy val driverMetrics = Map(
+    "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files 
read"),
+    "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata 
time"),
+    "filesSize" -> SQLMetrics.createSizeMetric(sparkContext, "size of files 
read")
+  ) ++ {
+    if (relation.partitionSchema.nonEmpty) {
+      Map(
+        "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of 
partitions read"),
+        "pruningTime" ->
+          SQLMetrics.createTimingMetric(sparkContext, "dynamic partition 
pruning time"))
+    } else {
+      Map.empty[String, SQLMetric]
+    }
+  } ++ staticMetrics
 
   /**
    * Send the driver-side metrics. Before calling this function, 
selectedPartitions has
    * been initialized. See SPARK-26327 for more details.
    */
-  private def sendDriverMetrics(): Unit = {
-    driverMetrics.foreach(e => metrics(e._1).add(e._2))
+  protected def sendDriverMetrics(): Unit = {
     val executionId = 
sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)
-    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId,
-      metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq)
+    SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, 
driverMetrics.values.toSeq)
   }
 
   private def isDynamicPruningFilter(e: Expression): Boolean =
@@ -255,14 +250,14 @@ case class FileSourceScanExec(
     setFilesNumAndSizeMetric(ret, true)
     val timeTakenMs = NANOSECONDS.toMillis(
       (System.nanoTime() - startTime) + optimizerMetadataTimeNs)
-    driverMetrics("metadataTime") = timeTakenMs
+    driverMetrics("metadataTime").set(timeTakenMs)
     ret
   }.toArray
 
   // We can only determine the actual partitions at runtime when a dynamic 
partition filter is
   // present. This is because such a filter relies on information that is only 
available at run
   // time (for instance the keys used in the other side of a join).
-  @transient private lazy val dynamicallySelectedPartitions: 
Array[PartitionDirectory] = {
+  @transient protected lazy val dynamicallySelectedPartitions: 
Array[PartitionDirectory] = {
     val dynamicPartitionFilters = 
partitionFilters.filter(isDynamicPruningFilter)
 
     if (dynamicPartitionFilters.nonEmpty) {
@@ -278,7 +273,7 @@ case class FileSourceScanExec(
       val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values))
       setFilesNumAndSizeMetric(ret, false)
       val timeTakenMs = (System.nanoTime() - startTime) / 1000 / 1000
-      driverMetrics("pruningTime") = timeTakenMs
+      driverMetrics("pruningTime").set(timeTakenMs)
       ret
     } else {
       selectedPartitions
@@ -369,7 +364,7 @@ case class FileSourceScanExec(
   }
 
   @transient
-  private lazy val pushedDownFilters = {
+  protected lazy val pushedDownFilters = {
     val supportNestedPredicatePushdown = 
DataSourceUtils.supportNestedPredicatePushdown(relation)
     // `dataFilters` should not include any metadata col filters
     // because the metadata struct has been flatted in FileSourceStrategy
@@ -445,33 +440,10 @@ case class FileSourceScanExec(
        |""".stripMargin
   }
 
-  lazy val inputRDD: RDD[InternalRow] = {
-    val readFile: (PartitionedFile) => Iterator[InternalRow] =
-      relation.fileFormat.buildReaderWithPartitionValues(
-        sparkSession = relation.sparkSession,
-        dataSchema = relation.dataSchema,
-        partitionSchema = relation.partitionSchema,
-        requiredSchema = requiredSchema,
-        filters = pushedDownFilters,
-        options = relation.options,
-        hadoopConf = 
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
-
-    val readRDD = if (bucketedScan) {
-      createBucketedReadRDD(relation.bucketSpec.get, readFile, 
dynamicallySelectedPartitions,
-        relation)
-    } else {
-      createReadRDD(readFile, dynamicallySelectedPartitions, relation)
-    }
-    sendDriverMetrics()
-    readRDD
-  }
-
-  override def inputRDDs(): Seq[RDD[InternalRow]] = {
-    inputRDD :: Nil
-  }
+  override def metrics: Map[String, SQLMetric] = scanMetrics
 
   /** SQL metrics generated only for scans using dynamic partition pruning. */
-  private lazy val staticMetrics = if 
(partitionFilters.exists(isDynamicPruningFilter)) {
+  protected lazy val staticMetrics = if 
(partitionFilters.exists(isDynamicPruningFilter)) {
     Map("staticFilesNum" -> SQLMetrics.createMetric(sparkContext, "static 
number of files read"),
       "staticFilesSize" -> SQLMetrics.createSizeMetric(sparkContext, "static 
size of files read"))
   } else {
@@ -485,22 +457,19 @@ case class FileSourceScanExec(
     val filesNum = partitions.map(_.files.size.toLong).sum
     val filesSize = partitions.map(_.files.map(_.getLen).sum).sum
     if (!static || !partitionFilters.exists(isDynamicPruningFilter)) {
-      driverMetrics("numFiles") = filesNum
-      driverMetrics("filesSize") = filesSize
+      driverMetrics("numFiles").set(filesNum)
+      driverMetrics("filesSize").set(filesSize)
     } else {
-      driverMetrics("staticFilesNum") = filesNum
-      driverMetrics("staticFilesSize") = filesSize
+      driverMetrics("staticFilesNum").set(filesNum)
+      driverMetrics("staticFilesSize").set(filesSize)
     }
     if (relation.partitionSchema.nonEmpty) {
-      driverMetrics("numPartitions") = partitions.length
+      driverMetrics("numPartitions").set(partitions.length)
     }
   }
 
-  override lazy val metrics = Map(
-    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows"),
-    "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files 
read"),
-    "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata 
time"),
-    "filesSize" -> SQLMetrics.createSizeMetric(sparkContext, "size of files 
read")
+  private lazy val scanMetrics = Map(
+    "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output 
rows")
   ) ++ {
     // Tracking scan time has overhead, we can't afford to do it for each row, 
and can only do
     // it for each batch.
@@ -509,16 +478,73 @@ case class FileSourceScanExec(
     } else {
       None
     }
-  } ++ {
-    if (relation.partitionSchema.nonEmpty) {
-      Map(
-        "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of 
partitions read"),
-        "pruningTime" ->
-          SQLMetrics.createTimingMetric(sparkContext, "dynamic partition 
pruning time"))
+  } ++ driverMetrics
+}
+
+/**
+ * Physical plan node for scanning data from HadoopFsRelations.
+ *
+ * @param relation The file-based relation to scan.
+ * @param output Output attributes of the scan, including data attributes and 
partition attributes.
+ * @param requiredSchema Required schema of the underlying relation, excluding 
partition columns.
+ * @param partitionFilters Predicates to use for partition pruning.
+ * @param optionalBucketSet Bucket ids for bucket pruning.
+ * @param optionalNumCoalescedBuckets Number of coalesced buckets.
+ * @param dataFilters Filters on non-partition columns.
+ * @param tableIdentifier Identifier for the table in the metastore.
+ * @param disableBucketedScan Disable bucketed scan based on physical query 
plan, see rule
+ *                            [[DisableUnnecessaryBucketedScan]] for details.
+ */
+case class FileSourceScanExec(
+    @transient override val relation: HadoopFsRelation,
+    override val output: Seq[Attribute],
+    override val requiredSchema: StructType,
+    override val partitionFilters: Seq[Expression],
+    override val optionalBucketSet: Option[BitSet],
+    override val optionalNumCoalescedBuckets: Option[Int],
+    override val dataFilters: Seq[Expression],
+    override val tableIdentifier: Option[TableIdentifier],
+    override val disableBucketedScan: Boolean = false)
+  extends FileSourceScanLike {
+
+  // Note that some vals referring the file-based relation are lazy 
intentionally
+  // so that this plan can be canonicalized on executor side too. See 
SPARK-23731.
+  override lazy val supportsColumnar: Boolean = {
+    relation.fileFormat.supportBatch(relation.sparkSession, schema)
+  }
+
+  private lazy val needsUnsafeRowConversion: Boolean = {
+    if (relation.fileFormat.isInstanceOf[ParquetSource]) {
+      conf.parquetVectorizedReaderEnabled
     } else {
-      Map.empty[String, SQLMetric]
+      false
     }
-  } ++ staticMetrics
+  }
+
+  lazy val inputRDD: RDD[InternalRow] = {
+    val readFile: (PartitionedFile) => Iterator[InternalRow] =
+      relation.fileFormat.buildReaderWithPartitionValues(
+        sparkSession = relation.sparkSession,
+        dataSchema = relation.dataSchema,
+        partitionSchema = relation.partitionSchema,
+        requiredSchema = requiredSchema,
+        filters = pushedDownFilters,
+        options = relation.options,
+        hadoopConf = 
relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options))
+
+    val readRDD = if (bucketedScan) {
+      createBucketedReadRDD(relation.bucketSpec.get, readFile, 
dynamicallySelectedPartitions,
+        relation)
+    } else {
+      createReadRDD(readFile, dynamicallySelectedPartitions, relation)
+    }
+    sendDriverMetrics()
+    readRDD
+  }
+
+  override def inputRDDs(): Seq[RDD[InternalRow]] = {
+    inputRDD :: Nil
+  }
 
   protected override def doExecute(): RDD[InternalRow] = {
     val numOutputRows = longMetric("numOutputRows")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to