aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r354377650
########## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ########## @@ -26,34 +26,40 @@ abstract class FileScanBuilder( fileIndex: PartitioningAwareFileIndex, dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema + private val partitionNameSet = toNameSet(partitionSchema) private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { - this.requiredSchema = requiredSchema + // the default implementation prunes only top-level columns + // file formats that support nested column pruning must override this + val fields = dataSchema.fields ++ partitionSchema.fields Review comment: Well, we don't necessarily need to extend `FileScanBuilder` with another builder. We can consider smth as below (I reverted most of the changes in `FileScanBuilder`): ``` abstract class FileScanBuilder( sparkSession: SparkSession, fileIndex: PartitioningAwareFileIndex, dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) protected def supportsNestedSchemaPruning: Boolean = false override def pruneColumns(requiredSchema: StructType): Unit = { this.requiredSchema = requiredSchema } protected def readDataSchema(): StructType = { val requiredNameSet = createRequiredNameSet() val schema = if (supportsNestedSchemaPruning) requiredSchema else dataSchema val fields = schema.fields.filter { field => val colName = PartitioningUtils.getColName(field, isCaseSensitive) requiredNameSet.contains(colName) && !partitionNameSet.contains(colName) } StructType(fields) } protected def readPartitionSchema(): StructType = { val requiredNameSet = createRequiredNameSet() val fields = partitionSchema.fields.filter { field => val colName = PartitioningUtils.getColName(field, isCaseSensitive) requiredNameSet.contains(colName) } StructType(fields) } private def createRequiredNameSet(): Set[String] = requiredSchema.fields.map(PartitioningUtils.getColName(_, isCaseSensitive)).toSet private val partitionNameSet: Set[String] = partitionSchema.fields.map(PartitioningUtils.getColName(_, isCaseSensitive)).toSet } ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org