[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r357102082 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala ## @@ -76,28 +78,46 @@ object PushDownUtils extends PredicateHelper { * @return the created `ScanConfig`(since column pruning is the last step of operator pushdown), * and new output attributes after column pruning. */ - // TODO: nested column pruning. def pruneColumns( scanBuilder: ScanBuilder, relation: DataSourceV2Relation, - exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = { + projects: Seq[NamedExpression], + filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = { scanBuilder match { + case r: SupportsPushDownRequiredColumns if SQLConf.get.nestedSchemaPruningEnabled => Review comment: I don't think there is an API to find out whether a particular `ScanBuilder` supports nested schema pruning. Instead, we have `SupportsPushDownRequiredColumns` and data sources should use the passed schema as a reference and prune whatever they can. The flag that we added in this PR is specific to `FileScanBuilder` and I believe it will complicate the overall logic if we treat `FileScanBuilder` differently by introducing special branches or if conditions. Also, we might actually get rid of that flag soon, as mentioned in DB's [comment](https://github.com/apache/spark/pull/26751#discussion_r356771927). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r357102082 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala ## @@ -76,28 +78,46 @@ object PushDownUtils extends PredicateHelper { * @return the created `ScanConfig`(since column pruning is the last step of operator pushdown), * and new output attributes after column pruning. */ - // TODO: nested column pruning. def pruneColumns( scanBuilder: ScanBuilder, relation: DataSourceV2Relation, - exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = { + projects: Seq[NamedExpression], + filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = { scanBuilder match { + case r: SupportsPushDownRequiredColumns if SQLConf.get.nestedSchemaPruningEnabled => Review comment: I don't think there is an API to find out whether a particular `ScanBuilder` supports nested schema pruning or not. Instead, we have `SupportsPushDownRequiredColumns` and data sources should use the passed schema as a reference and prune whatever they can. The flag that we added in this PR is specific to `FileScanBuilder` and I believe it will complicate the overall logic if we treat `FileScanBuilder` differently by introducing special branches or if conditions. Also, we might actually get rid of that flag soon, as mentioned in DB's [comment](https://github.com/apache/spark/pull/26751#discussion_r356771927). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r357109500 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ## @@ -27,15 +27,20 @@ abstract class FileScanBuilder( dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + protected val supportsNestedSchemaPruning: Boolean = false protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { +// [SPARK-30107] While the passed `requiredSchema` always have pruned nested columns, the actual +// data schema of this scan is determined in `readDataSchema`. File formats that don't support +// nested schema pruning, use `requiredSchema` as a reference and perform the pruning partially. this.requiredSchema = requiredSchema } protected def readDataSchema(): StructType = { val requiredNameSet = createRequiredNameSet() -val fields = dataSchema.fields.filter { field => +val schema = if (supportsNestedSchemaPruning) requiredSchema else dataSchema Review comment: The difference between those two is that `requiredSchema` can have pruned nested columns if nested schema pruning is enabled. Therefore, ORC and Parquet must always use data types from `requiredSchema`. If nested schema pruning is disabled, `requiredSchema` will have pruned only top-level columns. In that case, it is still safe to use data types from `requiredSchema`. CSV/JSON/Avro will always use `requiredSchema` as a reference but keep data types from `dataSchema`, so that even if Spark passes a schema with pruned nested columns, we still use data types defined in `dataSchema` to match the behavior before this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r357102082 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala ## @@ -76,28 +78,46 @@ object PushDownUtils extends PredicateHelper { * @return the created `ScanConfig`(since column pruning is the last step of operator pushdown), * and new output attributes after column pruning. */ - // TODO: nested column pruning. def pruneColumns( scanBuilder: ScanBuilder, relation: DataSourceV2Relation, - exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = { + projects: Seq[NamedExpression], + filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = { scanBuilder match { + case r: SupportsPushDownRequiredColumns if SQLConf.get.nestedSchemaPruningEnabled => Review comment: I don't think there is an API for find out whether a particular `ScanBuilder` supports nested schema pruning or not. Instead, we have `SupportsPushDownRequiredColumns` and data sources should use the passed schema as a reference and prune whatever they can. The flag that we added in this PR is specific to `FileScanBuilder` and I believe it will complicate the overall logic if we treat `FileScanBuilder` differently by introducing special branches or if conditions. Also, we might actually get rid of that flag soon, as mentioned in DB's [comment](https://github.com/apache/spark/pull/26751#discussion_r356771927). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r357102082 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala ## @@ -76,28 +78,46 @@ object PushDownUtils extends PredicateHelper { * @return the created `ScanConfig`(since column pruning is the last step of operator pushdown), * and new output attributes after column pruning. */ - // TODO: nested column pruning. def pruneColumns( scanBuilder: ScanBuilder, relation: DataSourceV2Relation, - exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = { + projects: Seq[NamedExpression], + filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = { scanBuilder match { + case r: SupportsPushDownRequiredColumns if SQLConf.get.nestedSchemaPruningEnabled => Review comment: I don't think there is an API for find out whether a particular `ScanBuilder` supports nested schema pruning or not. Instead, we have only `SupportsPushDownRequiredColumns` and data sources should use the passed schema as a reference and prune whatever they can. The flag that we added in this PR is specific to `FileScanBuilder` and I believe it will complicate the overall logic if we treat `FileScanBuilder` differently by introducing special branches or if conditions. Also, we might actually get rid of that flag soon, as mentioned in DB's [comment](https://github.com/apache/spark/pull/26751#discussion_r356771927). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r356795707 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ## @@ -27,15 +27,20 @@ abstract class FileScanBuilder( dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + protected val supportsNestedSchemaPruning: Boolean = false protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { +// [SPARK-30107] While the passed `requiredSchema` always have pruned nested columns, the actual +// data schema of this scan is determined in `readDataSchema`. File formats that don't support +// nested schema pruning, use `requiredSchema` as a reference and perform the pruning partially. this.requiredSchema = requiredSchema Review comment: I agree and it will also be trivial because the changes in this layer are minimal. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r356605173 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ## @@ -27,15 +27,20 @@ abstract class FileScanBuilder( dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + protected val supportsNestedSchemaPruning: Boolean = false protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { +// [SPARK-30107] While the passed `requiredSchema` always have pruned nested columns, the actual +// data schema of this scan is determined in `readDataSchema`. File formats that don't support +// nested schema pruning, use `requiredSchema` as a reference and perform the pruning partially. this.requiredSchema = requiredSchema } protected def readDataSchema(): StructType = { val requiredNameSet = createRequiredNameSet() -val fields = dataSchema.fields.filter { field => +val schema = if (supportsNestedSchemaPruning) requiredSchema else dataSchema Review comment: If `SQLConf.get.nestedSchemaPruningEnabled` is false, we will use the old path without nested schema pruning. So, the passed `requiredSchema` will always be the source of truth for ORC and Parquet. I've added a test to `SchemaPruningSuite` to confirm this. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r356602388 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ## @@ -27,15 +27,20 @@ abstract class FileScanBuilder( dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + protected val supportsNestedSchemaPruning: Boolean = false protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { +// [SPARK-30107] While the passed `requiredSchema` always have pruned nested columns, the actual +// data schema of this scan is determined in `readDataSchema`. File formats that don't support +// nested schema pruning, use `requiredSchema` as a reference and perform the pruning partially. this.requiredSchema = requiredSchema Review comment: I think you are right and it seems that data sources such as CSV and JSON try to simply ignore columns that are not needed in readers (e.g. spark.sql.csv.parser.columnPruning.enabled). While CSV doesn't support nested data, JSON can potentially benefit from this. I haven't checked the JSON reader in detail to see whether it will need any changes. Sounds like a potential follow-up? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r356605173 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ## @@ -27,15 +27,20 @@ abstract class FileScanBuilder( dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + protected val supportsNestedSchemaPruning: Boolean = false protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { +// [SPARK-30107] While the passed `requiredSchema` always have pruned nested columns, the actual +// data schema of this scan is determined in `readDataSchema`. File formats that don't support +// nested schema pruning, use `requiredSchema` as a reference and perform the pruning partially. this.requiredSchema = requiredSchema } protected def readDataSchema(): StructType = { val requiredNameSet = createRequiredNameSet() -val fields = dataSchema.fields.filter { field => +val schema = if (supportsNestedSchemaPruning) requiredSchema else dataSchema Review comment: If `SQLConf.get.nestedSchemaPruningEnabled` is false, we will use the old path without nested schema pruning. So, the passed `requiredSchema` will always be the source of truth for ORC and Parquet. I've added a test to confirm this to `SchemaPruningSuite`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r356602765 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ## @@ -27,15 +27,20 @@ abstract class FileScanBuilder( dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + protected val supportsNestedSchemaPruning: Boolean = false protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { +// [SPARK-30107] While the passed `requiredSchema` always have pruned nested columns, the actual +// data schema of this scan is determined in `readDataSchema`. File formats that don't support +// nested schema pruning, use `requiredSchema` as a reference and perform the pruning partially. this.requiredSchema = requiredSchema Review comment: I've updated the comment as well. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r356602388 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ## @@ -27,15 +27,20 @@ abstract class FileScanBuilder( dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + protected val supportsNestedSchemaPruning: Boolean = false protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { +// [SPARK-30107] While the passed `requiredSchema` always have pruned nested columns, the actual +// data schema of this scan is determined in `readDataSchema`. File formats that don't support +// nested schema pruning, use `requiredSchema` as a reference and perform the pruning partially. this.requiredSchema = requiredSchema Review comment: I think you are right and it seems that data sources such as CSV and JSON try to simply ignore columns that are not needed in readers (e.g. spark.sql.csv.parser.columnPruning.enabled). While CSV doesn't support nested data, JSON can potentially benefit from this. I haven't checked the JSON reader in detail to see whether it will need any changed. Sounds like a potential follow-up? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r356204713 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ## @@ -27,15 +27,20 @@ abstract class FileScanBuilder( dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis + protected val supportsNestedSchemaPruning: Boolean = false Review comment: nit: I should, probably, remove the explicit type annotation to be consistent with the rest of code in this class. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r355930874 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ## @@ -438,6 +438,22 @@ object DataSourceStrategy { } } + /** + * The attribute name may differ from the one in the schema if the query analyzer + * is case insensitive. We should change attribute names to match the ones in the schema, + * so we do not need to worry about case sensitivity anymore. + */ + protected[sql] def normalizeExprs( Review comment: That's what I would propose as well. Here is [the PR](https://github.com/apache/spark/pull/26830). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r354837323 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ## @@ -26,34 +26,40 @@ abstract class FileScanBuilder( fileIndex: PartitioningAwareFileIndex, dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema + private val partitionNameSet = toNameSet(partitionSchema) private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { -this.requiredSchema = requiredSchema +// the default implementation prunes only top-level columns +// file formats that support nested column pruning must override this +val fields = dataSchema.fields ++ partitionSchema.fields Review comment: I've updated the PR to reflect what I meant. The old version is still accessible in the commit history. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r354834382 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ## @@ -26,34 +26,40 @@ abstract class FileScanBuilder( fileIndex: PartitioningAwareFileIndex, dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema + private val partitionNameSet = toNameSet(partitionSchema) private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { -this.requiredSchema = requiredSchema +// the default implementation prunes only top-level columns Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r354834149 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala ## @@ -76,28 +78,48 @@ object PushDownUtils extends PredicateHelper { * @return the created `ScanConfig`(since column pruning is the last step of operator pushdown), * and new output attributes after column pruning. */ - // TODO: nested column pruning. def pruneColumns( scanBuilder: ScanBuilder, relation: DataSourceV2Relation, - exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = { + projects: Seq[NamedExpression], + filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = { scanBuilder match { + case r: SupportsPushDownRequiredColumns if SQLConf.get.nestedSchemaPruningEnabled => +val rootFields = SchemaPruning.identifyRootFields(projects, filters) +val prunedSchema = if (rootFields.nonEmpty) { + SchemaPruning.pruneDataSchema(relation.schema, rootFields) +} else { + new StructType() +} +r.pruneColumns(prunedSchema) +val scan = r.build() +val readSchema = scan.readSchema() +scan -> toOutputAttrs(readSchema, relation) + case r: SupportsPushDownRequiredColumns => +val exprs = projects ++ filters val requiredColumns = AttributeSet(exprs.flatMap(_.references)) val neededOutput = relation.output.filter(requiredColumns.contains) if (neededOutput != relation.output) { r.pruneColumns(neededOutput.toStructType) val scan = r.build() - val nameToAttr = relation.output.map(_.name).zip(relation.output).toMap - scan -> scan.readSchema().toAttributes.map { -// We have to keep the attribute id during transformation. -a => a.withExprId(nameToAttr(a.name).exprId) - } + val readSchema = scan.readSchema() + scan -> toOutputAttrs(readSchema, relation) Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r354834063 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala ## @@ -76,28 +78,48 @@ object PushDownUtils extends PredicateHelper { * @return the created `ScanConfig`(since column pruning is the last step of operator pushdown), * and new output attributes after column pruning. */ - // TODO: nested column pruning. def pruneColumns( scanBuilder: ScanBuilder, relation: DataSourceV2Relation, - exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = { + projects: Seq[NamedExpression], + filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = { scanBuilder match { + case r: SupportsPushDownRequiredColumns if SQLConf.get.nestedSchemaPruningEnabled => +val rootFields = SchemaPruning.identifyRootFields(projects, filters) +val prunedSchema = if (rootFields.nonEmpty) { + SchemaPruning.pruneDataSchema(relation.schema, rootFields) +} else { + new StructType() +} +r.pruneColumns(prunedSchema) +val scan = r.build() +val readSchema = scan.readSchema() +scan -> toOutputAttrs(readSchema, relation) Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r354832332 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala ## @@ -76,28 +78,48 @@ object PushDownUtils extends PredicateHelper { * @return the created `ScanConfig`(since column pruning is the last step of operator pushdown), * and new output attributes after column pruning. */ - // TODO: nested column pruning. def pruneColumns( scanBuilder: ScanBuilder, relation: DataSourceV2Relation, - exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = { + projects: Seq[NamedExpression], + filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = { scanBuilder match { + case r: SupportsPushDownRequiredColumns if SQLConf.get.nestedSchemaPruningEnabled => +val rootFields = SchemaPruning.identifyRootFields(projects, filters) +val prunedSchema = if (rootFields.nonEmpty) { + SchemaPruning.pruneDataSchema(relation.schema, rootFields) +} else { + new StructType() Review comment: Done This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r354377650 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ## @@ -26,34 +26,40 @@ abstract class FileScanBuilder( fileIndex: PartitioningAwareFileIndex, dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema + private val partitionNameSet = toNameSet(partitionSchema) private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { -this.requiredSchema = requiredSchema +// the default implementation prunes only top-level columns +// file formats that support nested column pruning must override this +val fields = dataSchema.fields ++ partitionSchema.fields Review comment: Well, we don't necessarily need to extend `FileScanBuilder` with another builder. We can consider smth as below (I reverted most of the changes in `FileScanBuilder`): ``` abstract class FileScanBuilder( sparkSession: SparkSession, fileIndex: PartitioningAwareFileIndex, dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) protected def supportsNestedSchemaPruning: Boolean = false override def pruneColumns(requiredSchema: StructType): Unit = { this.requiredSchema = requiredSchema } protected def readDataSchema(): StructType = { val requiredNameSet = createRequiredNameSet() val schema = if (supportsNestedSchemaPruning) requiredSchema else dataSchema val fields = schema.fields.filter { field => val colName = PartitioningUtils.getColName(field, isCaseSensitive) requiredNameSet.contains(colName) && !partitionNameSet.contains(colName) } StructType(fields) } protected def readPartitionSchema(): StructType = { val requiredNameSet = createRequiredNameSet() val fields = partitionSchema.fields.filter { field => val colName = PartitioningUtils.getColName(field, isCaseSensitive) requiredNameSet.contains(colName) } StructType(fields) } private def createRequiredNameSet(): Set[String] = requiredSchema.fields.map(PartitioningUtils.getColName(_, isCaseSensitive)).toSet private val partitionNameSet: Set[String] = partitionSchema.fields.map(PartitioningUtils.getColName(_, isCaseSensitive)).toSet } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r354287169 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala ## @@ -68,6 +68,10 @@ case class ParquetScanBuilder( // All filters that can be converted to Parquet are pushed down. override def pushedFilters(): Array[Filter] = pushedParquetFilters + override def pruneColumns(requiredSchema: StructType): Unit = { +this.requiredSchema = requiredSchema + } Review comment: Basically, the default implementation in `FileScanBuilder` is pruning only top-level attributes. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r354286601 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala ## @@ -68,6 +68,10 @@ case class ParquetScanBuilder( // All filters that can be converted to Parquet are pushed down. override def pushedFilters(): Array[Filter] = pushedParquetFilters + override def pruneColumns(requiredSchema: StructType): Unit = { +this.requiredSchema = requiredSchema + } Review comment: I think data sources should do their best to prune whatever they can. Here is a quote from `SupportsPushDownRequiredColumns`: > Implementation should try its best to prune the unnecessary columns or nested fields, but it's also OK to do the pruning partially, e.g., a data source may not be able to prune nested fields, and only prune top-level columns. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r354285161 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala ## @@ -76,28 +78,48 @@ object PushDownUtils extends PredicateHelper { * @return the created `ScanConfig`(since column pruning is the last step of operator pushdown), * and new output attributes after column pruning. */ - // TODO: nested column pruning. def pruneColumns( scanBuilder: ScanBuilder, relation: DataSourceV2Relation, - exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = { + projects: Seq[NamedExpression], + filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = { scanBuilder match { + case r: SupportsPushDownRequiredColumns if SQLConf.get.nestedSchemaPruningEnabled => +val rootFields = SchemaPruning.identifyRootFields(projects, filters) +val prunedSchema = if (rootFields.nonEmpty) { Review comment: This check was needed to detect if any nested column was requested. The old rule would not apply otherwise. Here, the situation is different as we need to prune top-level columns even if no nested attributes are requested. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r353976438 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ## @@ -26,34 +26,40 @@ abstract class FileScanBuilder( fileIndex: PartitioningAwareFileIndex, dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema + private val partitionNameSet = toNameSet(partitionSchema) private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { -this.requiredSchema = requiredSchema +// the default implementation prunes only top-level columns +// file formats that support nested column pruning must override this +val fields = dataSchema.fields ++ partitionSchema.fields Review comment: @gengliangwang @dongjoon-hyun what about the overall approach here? Shall we keep it as is and let `readDataSchema` and `readPartitionSchema` deduplicate columns or shall we modify `readDataSchema` as suggested in my first comment? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r353973975 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ## @@ -26,34 +26,40 @@ abstract class FileScanBuilder( fileIndex: PartitioningAwareFileIndex, dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema + private val partitionNameSet = toNameSet(partitionSchema) private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { -this.requiredSchema = requiredSchema +// the default implementation prunes only top-level columns +// file formats that support nested column pruning must override this +val fields = dataSchema.fields ++ partitionSchema.fields Review comment: Sounds good, will add a new one! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r353969279 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala ## @@ -76,28 +78,48 @@ object PushDownUtils extends PredicateHelper { * @return the created `ScanConfig`(since column pruning is the last step of operator pushdown), * and new output attributes after column pruning. */ - // TODO: nested column pruning. def pruneColumns( scanBuilder: ScanBuilder, relation: DataSourceV2Relation, - exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = { + projects: Seq[NamedExpression], + filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = { scanBuilder match { + case r: SupportsPushDownRequiredColumns if SQLConf.get.nestedSchemaPruningEnabled => +val rootFields = SchemaPruning.identifyRootFields(projects, filters) +val prunedSchema = if (rootFields.nonEmpty) { + SchemaPruning.pruneDataSchema(relation.schema, rootFields) +} else { + new StructType() Review comment: Good idea This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r353969026 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ## @@ -26,34 +26,40 @@ abstract class FileScanBuilder( fileIndex: PartitioningAwareFileIndex, dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema + private val partitionNameSet = toNameSet(partitionSchema) private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { -this.requiredSchema = requiredSchema +// the default implementation prunes only top-level columns +// file formats that support nested column pruning must override this +val fields = dataSchema.fields ++ partitionSchema.fields Review comment: What about extending `FileBasedDataSourceSuite` to cover csv as well? It seems there is a test for orc/parquet/json already. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r353712082 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ## @@ -26,34 +26,40 @@ abstract class FileScanBuilder( fileIndex: PartitioningAwareFileIndex, dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema + private val partitionNameSet = toNameSet(partitionSchema) private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { -this.requiredSchema = requiredSchema +// the default implementation prunes only top-level columns +// file formats that support nested column pruning must override this +val fields = dataSchema.fields ++ partitionSchema.fields Review comment: If partition and data columns overlap, `requiredSchema` might contain duplicates as we match by name. However, `readDataSchema` and `readPartitionSchema` will still report correct structs. As an alternative to the current approach, we can keep the previous implementation of `pruneColumns` and come up with a new `ScanBuilder` that extends `FileScanBuilder` and overrides `readDataSchema` to iterate through `requiredSchema` instead of `dataSchema` (or simply put that logic in ORC/Parquet scan builders). That approach sounds slightly better, but I am also open to other suggestions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r353712082 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ## @@ -26,34 +26,40 @@ abstract class FileScanBuilder( fileIndex: PartitioningAwareFileIndex, dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema + private val partitionNameSet = toNameSet(partitionSchema) private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { -this.requiredSchema = requiredSchema +// the default implementation prunes only top-level columns +// file formats that support nested column pruning must override this +val fields = dataSchema.fields ++ partitionSchema.fields Review comment: If partition and data columns overlap, `requiredSchema` might contain duplicates as we match by name. However, `readDataSchema` and `readPartitionSchema` will still report correct structs. As an alternative to the current approach, we can keep the previous implementation of `pruneColumns` and come up with a new `ScanBuilder` that extends `FileScanBuilder` and overrides `readDataSchema` to iterate through `requiredSchema` instead of `dataSchema` (or simply have the same logic in Orc/Parquet scan builders). That approach sounds slightly better, but I am also open to other suggestions. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r353712082 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ## @@ -26,34 +26,40 @@ abstract class FileScanBuilder( fileIndex: PartitioningAwareFileIndex, dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema + private val partitionNameSet = toNameSet(partitionSchema) private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { -this.requiredSchema = requiredSchema +// the default implementation prunes only top-level columns +// file formats that support nested column pruning must override this +val fields = dataSchema.fields ++ partitionSchema.fields Review comment: If partition and data columns overlap, `requiredSchema` might contain duplicates as we match by name. However, `readDataSchema` and `readPartitionSchema` will still report correct structs. As an alternative to the current approach, we can keep the previous implementation of `pruneColumns` and come up with a new `ScanBuilder` that extends `FileScanBuilder` and overrides `readDataSchema` to iterate through `requiredSchema` instead of `dataSchema` (or simply have the same logic in Orc/Parquet scan builders). That sounds slightly better, but I am also open to other approaches. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r353712082 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ## @@ -26,34 +26,40 @@ abstract class FileScanBuilder( fileIndex: PartitioningAwareFileIndex, dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema + private val partitionNameSet = toNameSet(partitionSchema) private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { -this.requiredSchema = requiredSchema +// the default implementation prunes only top-level columns +// file formats that support nested column pruning must override this +val fields = dataSchema.fields ++ partitionSchema.fields Review comment: If partition and data columns overlap, `requiredSchema` might contain duplicates as we match by name. However, `readDataSchema` and `readPartitionSchema` will still report correct structs. As an alternative to the current approach, we can keep the previous implementation of `pruneColumns` and come up with a new `ScanBuilder` that extends `FileScanBuilder` and overrides `readDataSchema` to iterate through `requiredSchema` instead of `dataSchema`. That sounds slightly better, but I am also open to other approaches. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r353712082 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ## @@ -26,34 +26,40 @@ abstract class FileScanBuilder( fileIndex: PartitioningAwareFileIndex, dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema + private val partitionNameSet = toNameSet(partitionSchema) private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { -this.requiredSchema = requiredSchema +// the default implementation prunes only top-level columns +// file formats that support nested column pruning must override this +val fields = dataSchema.fields ++ partitionSchema.fields Review comment: If partition and data columns overlap, `requiredSchema` might contain duplicates as we match by name. However, `readDataSchema` and `readPartitionSchema` will still report correct structs. As an alternative to the current approach, we can keep the previous implementation of `pruneColumns` and come up with a new `ScanBuilder` that extends `FileScanBuilder` and overrides `readDataSchema` to iterate through `requiredSchema` instead of `dataSchema`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r353340524 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala ## @@ -26,34 +26,39 @@ abstract class FileScanBuilder( fileIndex: PartitioningAwareFileIndex, dataSchema: StructType) extends ScanBuilder with SupportsPushDownRequiredColumns { private val partitionSchema = fileIndex.partitionSchema + private val partitionNameSet = toNameSet(partitionSchema) private val isCaseSensitive = sparkSession.sessionState.conf.caseSensitiveAnalysis protected var requiredSchema = StructType(dataSchema.fields ++ partitionSchema.fields) override def pruneColumns(requiredSchema: StructType): Unit = { -this.requiredSchema = requiredSchema +// the default implementation prunes only top-level columns +// file formats that support nested column pruning must override this +val fields = dataSchema.fields ++ partitionSchema.fields +val requiredNameSet = toNameSet(requiredSchema) +val requiredFields = fields.filter { field => + val colName = PartitioningUtils.getColName(field, isCaseSensitive) + requiredNameSet.contains(colName) +} +this.requiredSchema = StructType(requiredFields) } protected def readDataSchema(): StructType = { -val requiredNameSet = createRequiredNameSet() -val fields = dataSchema.fields.filter { field => +val dataFields = requiredSchema.fields.filter { field => Review comment: We iterate through `requiredSchema` to pick changes to nested columns (i.e. `dataSchema` can contain the old struct). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r353339746 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ## @@ -438,6 +438,22 @@ object DataSourceStrategy { } } + /** + * The attribute name may differ from the one in the schema if the query analyzer + * is case insensitive. We should change attribute names to match the ones in the schema, + * so we do not need to worry about case sensitivity anymore. + */ + protected[sql] def normalizeExprs( Review comment: I copied the body from `DataSourceStrategy$normalizeFilters` to minimize the changes. Obviously, it is a temp solution until we agree on a better way in the community. For example, I can submit a separate PR to rename `normalizeFilters`. I think we want to avoid mixing such a rename with the logic in this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources URL: https://github.com/apache/spark/pull/26751#discussion_r353339746 ## File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala ## @@ -438,6 +438,22 @@ object DataSourceStrategy { } } + /** + * The attribute name may differ from the one in the schema if the query analyzer + * is case insensitive. We should change attribute names to match the ones in the schema, + * so we do not need to worry about case sensitivity anymore. + */ + protected[sql] def normalizeExprs( Review comment: I copied the body from `DataSourceStrategy$normalizeFilters` to minimize the changes. Obviously, it is a temp solution until we agree on a better way in the community. For example, I can submit a separate PR to rename `normalizeFilters`. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org