aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r354377650
 
 

 ##########
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##########
 @@ -26,34 +26,40 @@ abstract class FileScanBuilder(
     fileIndex: PartitioningAwareFileIndex,
     dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
+  private val partitionNameSet = toNameSet(partitionSchema)
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
   protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
-    this.requiredSchema = requiredSchema
+    // the default implementation prunes only top-level columns
+    // file formats that support nested column pruning must override this
+    val fields = dataSchema.fields ++ partitionSchema.fields
 
 Review comment:
   Well, we don't necessarily need to extend `FileScanBuilder` with another 
builder.
   
   We can consider smth as below (I reverted most of the changes in 
`FileScanBuilder`):
   
   ```
   abstract class FileScanBuilder(
       sparkSession: SparkSession,
       fileIndex: PartitioningAwareFileIndex,
       dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
     private val partitionSchema = fileIndex.partitionSchema
     private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
     protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
   
     protected def supportsNestedSchemaPruning: Boolean = false
   
     override def pruneColumns(requiredSchema: StructType): Unit = {
       this.requiredSchema = requiredSchema
     }
   
     protected def readDataSchema(): StructType = {
       val requiredNameSet = createRequiredNameSet()
       val schema = if (supportsNestedSchemaPruning) requiredSchema else 
dataSchema
       val fields = schema.fields.filter { field =>
         val colName = PartitioningUtils.getColName(field, isCaseSensitive)
         requiredNameSet.contains(colName) && 
!partitionNameSet.contains(colName)
       }
       StructType(fields)
     }
   
     protected def readPartitionSchema(): StructType = {
       val requiredNameSet = createRequiredNameSet()
       val fields = partitionSchema.fields.filter { field =>
         val colName = PartitioningUtils.getColName(field, isCaseSensitive)
         requiredNameSet.contains(colName)
       }
       StructType(fields)
     }
   
     private def createRequiredNameSet(): Set[String] =
       requiredSchema.fields.map(PartitioningUtils.getColName(_, 
isCaseSensitive)).toSet
   
     private val partitionNameSet: Set[String] =
       partitionSchema.fields.map(PartitioningUtils.getColName(_, 
isCaseSensitive)).toSet
   }
   
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to