This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8c800160b62 [SPARK-38999][SQL] Refactor `FileSourceScanExec`: file scan physical node 8c800160b62 is described below commit 8c800160b62657fa5ab16a69ab694360897468d6 Author: Utkarsh <utkarsh.agar...@databricks.com> AuthorDate: Mon Apr 25 15:44:20 2022 +0800 [SPARK-38999][SQL] Refactor `FileSourceScanExec`: file scan physical node ### What changes were proposed in this pull request? The PR refactors `FileSourceScanExec` case class into a base trait `FileSourceScanLike` which is then subclassed by `FileSourceScanExec`. `FileSourceScanLike` contains basic functionality like metrics and file listing while the `FileSourceScanExec` contains execution specific code. ### Why are the changes needed? Currently the code for `FileSourceScanExec` class, the physical node for the file scans is quite complex and lengthy making it slightly difficult to reason about. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Code refactor, existing tests should suffice. Closes #36327 from utkarsh39/split-file-scan-node. Authored-by: Utkarsh <utkarsh.agar...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../spark/sql/execution/DataSourceScanExec.scala | 208 ++++++++++++--------- 1 file changed, 117 insertions(+), 91 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala index 5cf8aa91ea5..953a7db0f9d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala @@ -19,8 +19,6 @@ package org.apache.spark.sql.execution import java.util.concurrent.TimeUnit._ -import scala.collection.mutable.HashMap - import org.apache.commons.lang3.StringUtils import org.apache.hadoop.fs.Path @@ -179,48 +177,34 @@ case class RowDataSourceScanExec( } /** - * Physical plan node for scanning data from HadoopFsRelations. - * - * @param relation The file-based relation to scan. - * @param output Output attributes of the scan, including data attributes and partition attributes. - * @param requiredSchema Required schema of the underlying relation, excluding partition columns. - * @param partitionFilters Predicates to use for partition pruning. - * @param optionalBucketSet Bucket ids for bucket pruning. - * @param optionalNumCoalescedBuckets Number of coalesced buckets. - * @param dataFilters Filters on non-partition columns. - * @param tableIdentifier Identifier for the table in the metastore. - * @param disableBucketedScan Disable bucketed scan based on physical query plan, see rule - * [[DisableUnnecessaryBucketedScan]] for details. + * A base trait for file scans containing file listing and metrics code. */ -case class FileSourceScanExec( - @transient relation: HadoopFsRelation, - output: Seq[Attribute], - requiredSchema: StructType, - partitionFilters: Seq[Expression], - optionalBucketSet: Option[BitSet], - optionalNumCoalescedBuckets: Option[Int], - dataFilters: Seq[Expression], - tableIdentifier: Option[TableIdentifier], - disableBucketedScan: Boolean = false) - extends DataSourceScanExec { +trait FileSourceScanLike extends DataSourceScanExec { + + // Filters on non-partition columns. + def dataFilters: Seq[Expression] + // Disable bucketed scan based on physical query plan, see rule + // [[DisableUnnecessaryBucketedScan]] for details. + def disableBucketedScan: Boolean + // Bucket ids for bucket pruning. + def optionalBucketSet: Option[BitSet] + // Number of coalesced buckets. + def optionalNumCoalescedBuckets: Option[Int] + // Output attributes of the scan, including data attributes and partition attributes. + def output: Seq[Attribute] + // Predicates to use for partition pruning. + def partitionFilters: Seq[Expression] + // The file-based relation to scan. + def relation: HadoopFsRelation + // Required schema of the underlying relation, excluding partition columns. + def requiredSchema: StructType + // Identifier for the table in the metastore. + def tableIdentifier: Option[TableIdentifier] + lazy val metadataColumns: Seq[AttributeReference] = output.collect { case FileSourceMetadataAttribute(attr) => attr } - // Note that some vals referring the file-based relation are lazy intentionally - // so that this plan can be canonicalized on executor side too. See SPARK-23731. - override lazy val supportsColumnar: Boolean = { - relation.fileFormat.supportBatch(relation.sparkSession, schema) - } - - private lazy val needsUnsafeRowConversion: Boolean = { - if (relation.fileFormat.isInstanceOf[ParquetSource]) { - conf.parquetVectorizedReaderEnabled - } else { - false - } - } - override def vectorTypes: Option[Seq[String]] = relation.fileFormat.vectorTypes( requiredSchema = requiredSchema, @@ -230,17 +214,28 @@ case class FileSourceScanExec( vectorTypes ++ Seq.fill(metadataColumns.size)(classOf[ConstantColumnVector].getName) } - private lazy val driverMetrics: HashMap[String, Long] = HashMap.empty + lazy val driverMetrics = Map( + "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"), + "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time"), + "filesSize" -> SQLMetrics.createSizeMetric(sparkContext, "size of files read") + ) ++ { + if (relation.partitionSchema.nonEmpty) { + Map( + "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions read"), + "pruningTime" -> + SQLMetrics.createTimingMetric(sparkContext, "dynamic partition pruning time")) + } else { + Map.empty[String, SQLMetric] + } + } ++ staticMetrics /** * Send the driver-side metrics. Before calling this function, selectedPartitions has * been initialized. See SPARK-26327 for more details. */ - private def sendDriverMetrics(): Unit = { - driverMetrics.foreach(e => metrics(e._1).add(e._2)) + protected def sendDriverMetrics(): Unit = { val executionId = sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY) - SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, - metrics.filter(e => driverMetrics.contains(e._1)).values.toSeq) + SQLMetrics.postDriverMetricUpdates(sparkContext, executionId, driverMetrics.values.toSeq) } private def isDynamicPruningFilter(e: Expression): Boolean = @@ -255,14 +250,14 @@ case class FileSourceScanExec( setFilesNumAndSizeMetric(ret, true) val timeTakenMs = NANOSECONDS.toMillis( (System.nanoTime() - startTime) + optimizerMetadataTimeNs) - driverMetrics("metadataTime") = timeTakenMs + driverMetrics("metadataTime").set(timeTakenMs) ret }.toArray // We can only determine the actual partitions at runtime when a dynamic partition filter is // present. This is because such a filter relies on information that is only available at run // time (for instance the keys used in the other side of a join). - @transient private lazy val dynamicallySelectedPartitions: Array[PartitionDirectory] = { + @transient protected lazy val dynamicallySelectedPartitions: Array[PartitionDirectory] = { val dynamicPartitionFilters = partitionFilters.filter(isDynamicPruningFilter) if (dynamicPartitionFilters.nonEmpty) { @@ -278,7 +273,7 @@ case class FileSourceScanExec( val ret = selectedPartitions.filter(p => boundPredicate.eval(p.values)) setFilesNumAndSizeMetric(ret, false) val timeTakenMs = (System.nanoTime() - startTime) / 1000 / 1000 - driverMetrics("pruningTime") = timeTakenMs + driverMetrics("pruningTime").set(timeTakenMs) ret } else { selectedPartitions @@ -369,7 +364,7 @@ case class FileSourceScanExec( } @transient - private lazy val pushedDownFilters = { + protected lazy val pushedDownFilters = { val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation) // `dataFilters` should not include any metadata col filters // because the metadata struct has been flatted in FileSourceStrategy @@ -445,33 +440,10 @@ case class FileSourceScanExec( |""".stripMargin } - lazy val inputRDD: RDD[InternalRow] = { - val readFile: (PartitionedFile) => Iterator[InternalRow] = - relation.fileFormat.buildReaderWithPartitionValues( - sparkSession = relation.sparkSession, - dataSchema = relation.dataSchema, - partitionSchema = relation.partitionSchema, - requiredSchema = requiredSchema, - filters = pushedDownFilters, - options = relation.options, - hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) - - val readRDD = if (bucketedScan) { - createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions, - relation) - } else { - createReadRDD(readFile, dynamicallySelectedPartitions, relation) - } - sendDriverMetrics() - readRDD - } - - override def inputRDDs(): Seq[RDD[InternalRow]] = { - inputRDD :: Nil - } + override def metrics: Map[String, SQLMetric] = scanMetrics /** SQL metrics generated only for scans using dynamic partition pruning. */ - private lazy val staticMetrics = if (partitionFilters.exists(isDynamicPruningFilter)) { + protected lazy val staticMetrics = if (partitionFilters.exists(isDynamicPruningFilter)) { Map("staticFilesNum" -> SQLMetrics.createMetric(sparkContext, "static number of files read"), "staticFilesSize" -> SQLMetrics.createSizeMetric(sparkContext, "static size of files read")) } else { @@ -485,22 +457,19 @@ case class FileSourceScanExec( val filesNum = partitions.map(_.files.size.toLong).sum val filesSize = partitions.map(_.files.map(_.getLen).sum).sum if (!static || !partitionFilters.exists(isDynamicPruningFilter)) { - driverMetrics("numFiles") = filesNum - driverMetrics("filesSize") = filesSize + driverMetrics("numFiles").set(filesNum) + driverMetrics("filesSize").set(filesSize) } else { - driverMetrics("staticFilesNum") = filesNum - driverMetrics("staticFilesSize") = filesSize + driverMetrics("staticFilesNum").set(filesNum) + driverMetrics("staticFilesSize").set(filesSize) } if (relation.partitionSchema.nonEmpty) { - driverMetrics("numPartitions") = partitions.length + driverMetrics("numPartitions").set(partitions.length) } } - override lazy val metrics = Map( - "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"), - "numFiles" -> SQLMetrics.createMetric(sparkContext, "number of files read"), - "metadataTime" -> SQLMetrics.createTimingMetric(sparkContext, "metadata time"), - "filesSize" -> SQLMetrics.createSizeMetric(sparkContext, "size of files read") + private lazy val scanMetrics = Map( + "numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows") ) ++ { // Tracking scan time has overhead, we can't afford to do it for each row, and can only do // it for each batch. @@ -509,16 +478,73 @@ case class FileSourceScanExec( } else { None } - } ++ { - if (relation.partitionSchema.nonEmpty) { - Map( - "numPartitions" -> SQLMetrics.createMetric(sparkContext, "number of partitions read"), - "pruningTime" -> - SQLMetrics.createTimingMetric(sparkContext, "dynamic partition pruning time")) + } ++ driverMetrics +} + +/** + * Physical plan node for scanning data from HadoopFsRelations. + * + * @param relation The file-based relation to scan. + * @param output Output attributes of the scan, including data attributes and partition attributes. + * @param requiredSchema Required schema of the underlying relation, excluding partition columns. + * @param partitionFilters Predicates to use for partition pruning. + * @param optionalBucketSet Bucket ids for bucket pruning. + * @param optionalNumCoalescedBuckets Number of coalesced buckets. + * @param dataFilters Filters on non-partition columns. + * @param tableIdentifier Identifier for the table in the metastore. + * @param disableBucketedScan Disable bucketed scan based on physical query plan, see rule + * [[DisableUnnecessaryBucketedScan]] for details. + */ +case class FileSourceScanExec( + @transient override val relation: HadoopFsRelation, + override val output: Seq[Attribute], + override val requiredSchema: StructType, + override val partitionFilters: Seq[Expression], + override val optionalBucketSet: Option[BitSet], + override val optionalNumCoalescedBuckets: Option[Int], + override val dataFilters: Seq[Expression], + override val tableIdentifier: Option[TableIdentifier], + override val disableBucketedScan: Boolean = false) + extends FileSourceScanLike { + + // Note that some vals referring the file-based relation are lazy intentionally + // so that this plan can be canonicalized on executor side too. See SPARK-23731. + override lazy val supportsColumnar: Boolean = { + relation.fileFormat.supportBatch(relation.sparkSession, schema) + } + + private lazy val needsUnsafeRowConversion: Boolean = { + if (relation.fileFormat.isInstanceOf[ParquetSource]) { + conf.parquetVectorizedReaderEnabled } else { - Map.empty[String, SQLMetric] + false } - } ++ staticMetrics + } + + lazy val inputRDD: RDD[InternalRow] = { + val readFile: (PartitionedFile) => Iterator[InternalRow] = + relation.fileFormat.buildReaderWithPartitionValues( + sparkSession = relation.sparkSession, + dataSchema = relation.dataSchema, + partitionSchema = relation.partitionSchema, + requiredSchema = requiredSchema, + filters = pushedDownFilters, + options = relation.options, + hadoopConf = relation.sparkSession.sessionState.newHadoopConfWithOptions(relation.options)) + + val readRDD = if (bucketedScan) { + createBucketedReadRDD(relation.bucketSpec.get, readFile, dynamicallySelectedPartitions, + relation) + } else { + createReadRDD(readFile, dynamicallySelectedPartitions, relation) + } + sendDriverMetrics() + readRDD + } + + override def inputRDDs(): Seq[RDD[InternalRow]] = { + inputRDD :: Nil + } protected override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org