[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-12 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r357102082
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
 ##
 @@ -76,28 +78,46 @@ object PushDownUtils extends PredicateHelper {
* @return the created `ScanConfig`(since column pruning is the last step of 
operator pushdown),
* and new output attributes after column pruning.
*/
-  // TODO: nested column pruning.
   def pruneColumns(
   scanBuilder: ScanBuilder,
   relation: DataSourceV2Relation,
-  exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
+  projects: Seq[NamedExpression],
+  filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
 scanBuilder match {
+  case r: SupportsPushDownRequiredColumns if 
SQLConf.get.nestedSchemaPruningEnabled =>
 
 Review comment:
   I don't think there is an API to find out whether a particular `ScanBuilder` 
supports nested schema pruning. Instead, we have 
`SupportsPushDownRequiredColumns` and data sources should use the passed schema 
as a reference and prune whatever they can. The flag that we added in this PR 
is specific to `FileScanBuilder` and I believe it will complicate the overall 
logic if we treat `FileScanBuilder` differently by introducing special branches 
or if conditions. Also, we might actually get rid of that flag soon, as 
mentioned in DB's 
[comment](https://github.com/apache/spark/pull/26751#discussion_r356771927).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-12 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r357102082
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
 ##
 @@ -76,28 +78,46 @@ object PushDownUtils extends PredicateHelper {
* @return the created `ScanConfig`(since column pruning is the last step of 
operator pushdown),
* and new output attributes after column pruning.
*/
-  // TODO: nested column pruning.
   def pruneColumns(
   scanBuilder: ScanBuilder,
   relation: DataSourceV2Relation,
-  exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
+  projects: Seq[NamedExpression],
+  filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
 scanBuilder match {
+  case r: SupportsPushDownRequiredColumns if 
SQLConf.get.nestedSchemaPruningEnabled =>
 
 Review comment:
   I don't think there is an API to find out whether a particular `ScanBuilder` 
supports nested schema pruning or not. Instead, we have 
`SupportsPushDownRequiredColumns` and data sources should use the passed schema 
as a reference and prune whatever they can. The flag that we added in this PR 
is specific to `FileScanBuilder` and I believe it will complicate the overall 
logic if we treat `FileScanBuilder` differently by introducing special branches 
or if conditions. Also, we might actually get rid of that flag soon, as 
mentioned in DB's 
[comment](https://github.com/apache/spark/pull/26751#discussion_r356771927).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-12 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r357109500
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##
 @@ -27,15 +27,20 @@ abstract class FileScanBuilder(
 dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+  protected val supportsNestedSchemaPruning: Boolean = false
   protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
+// [SPARK-30107] While the passed `requiredSchema` always have pruned 
nested columns, the actual
+// data schema of this scan is determined in `readDataSchema`. File 
formats that don't support
+// nested schema pruning, use `requiredSchema` as a reference and perform 
the pruning partially.
 this.requiredSchema = requiredSchema
   }
 
   protected def readDataSchema(): StructType = {
 val requiredNameSet = createRequiredNameSet()
-val fields = dataSchema.fields.filter { field =>
+val schema = if (supportsNestedSchemaPruning) requiredSchema else 
dataSchema
 
 Review comment:
   The difference between those two is that `requiredSchema` can have pruned 
nested columns if nested schema pruning is enabled. Therefore, ORC and Parquet 
must always use data types from `requiredSchema`. If nested schema pruning is 
disabled, `requiredSchema` will have pruned only top-level columns. In that 
case, it is still safe to use data types from `requiredSchema`. CSV/JSON/Avro 
will always use `requiredSchema` as a reference but keep data types from 
`dataSchema`, so that even if Spark passes a schema with pruned nested columns, 
we still use data types defined in `dataSchema` to match the behavior before 
this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-12 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r357102082
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
 ##
 @@ -76,28 +78,46 @@ object PushDownUtils extends PredicateHelper {
* @return the created `ScanConfig`(since column pruning is the last step of 
operator pushdown),
* and new output attributes after column pruning.
*/
-  // TODO: nested column pruning.
   def pruneColumns(
   scanBuilder: ScanBuilder,
   relation: DataSourceV2Relation,
-  exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
+  projects: Seq[NamedExpression],
+  filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
 scanBuilder match {
+  case r: SupportsPushDownRequiredColumns if 
SQLConf.get.nestedSchemaPruningEnabled =>
 
 Review comment:
   I don't think there is an API for find out whether a particular 
`ScanBuilder` supports nested schema pruning or not. Instead, we have 
`SupportsPushDownRequiredColumns` and data sources should use the passed schema 
as a reference and prune whatever they can. The flag that we added in this PR 
is specific to `FileScanBuilder` and I believe it will complicate the overall 
logic if we treat `FileScanBuilder` differently by introducing special branches 
or if conditions. Also, we might actually get rid of that flag soon, as 
mentioned in DB's 
[comment](https://github.com/apache/spark/pull/26751#discussion_r356771927).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-12 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r357102082
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
 ##
 @@ -76,28 +78,46 @@ object PushDownUtils extends PredicateHelper {
* @return the created `ScanConfig`(since column pruning is the last step of 
operator pushdown),
* and new output attributes after column pruning.
*/
-  // TODO: nested column pruning.
   def pruneColumns(
   scanBuilder: ScanBuilder,
   relation: DataSourceV2Relation,
-  exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
+  projects: Seq[NamedExpression],
+  filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
 scanBuilder match {
+  case r: SupportsPushDownRequiredColumns if 
SQLConf.get.nestedSchemaPruningEnabled =>
 
 Review comment:
   I don't think there is an API for find out whether a particular 
`ScanBuilder` supports nested schema pruning or not. Instead, we have only 
`SupportsPushDownRequiredColumns` and data sources should use the passed schema 
as a reference and prune whatever they can. The flag that we added in this PR 
is specific to `FileScanBuilder` and I believe it will complicate the overall 
logic if we treat `FileScanBuilder` differently by introducing special branches 
or if conditions. Also, we might actually get rid of that flag soon, as 
mentioned in DB's 
[comment](https://github.com/apache/spark/pull/26751#discussion_r356771927).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-11 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r356795707
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##
 @@ -27,15 +27,20 @@ abstract class FileScanBuilder(
 dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+  protected val supportsNestedSchemaPruning: Boolean = false
   protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
+// [SPARK-30107] While the passed `requiredSchema` always have pruned 
nested columns, the actual
+// data schema of this scan is determined in `readDataSchema`. File 
formats that don't support
+// nested schema pruning, use `requiredSchema` as a reference and perform 
the pruning partially.
 this.requiredSchema = requiredSchema
 
 Review comment:
   I agree and it will also be trivial because the changes in this layer are 
minimal.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-11 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r356605173
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##
 @@ -27,15 +27,20 @@ abstract class FileScanBuilder(
 dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+  protected val supportsNestedSchemaPruning: Boolean = false
   protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
+// [SPARK-30107] While the passed `requiredSchema` always have pruned 
nested columns, the actual
+// data schema of this scan is determined in `readDataSchema`. File 
formats that don't support
+// nested schema pruning, use `requiredSchema` as a reference and perform 
the pruning partially.
 this.requiredSchema = requiredSchema
   }
 
   protected def readDataSchema(): StructType = {
 val requiredNameSet = createRequiredNameSet()
-val fields = dataSchema.fields.filter { field =>
+val schema = if (supportsNestedSchemaPruning) requiredSchema else 
dataSchema
 
 Review comment:
   If `SQLConf.get.nestedSchemaPruningEnabled` is false, we will use the old 
path without nested schema pruning. So, the passed `requiredSchema` will always 
be the source of truth for ORC and Parquet. I've added a test to 
`SchemaPruningSuite` to confirm this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-11 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r356602388
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##
 @@ -27,15 +27,20 @@ abstract class FileScanBuilder(
 dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+  protected val supportsNestedSchemaPruning: Boolean = false
   protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
+// [SPARK-30107] While the passed `requiredSchema` always have pruned 
nested columns, the actual
+// data schema of this scan is determined in `readDataSchema`. File 
formats that don't support
+// nested schema pruning, use `requiredSchema` as a reference and perform 
the pruning partially.
 this.requiredSchema = requiredSchema
 
 Review comment:
   I think you are right and it seems that data sources such as CSV and JSON 
try to simply ignore columns that are not needed in readers (e.g. 
spark.sql.csv.parser.columnPruning.enabled).
   
   While CSV doesn't support nested data, JSON can potentially benefit from 
this. I haven't checked the JSON reader in detail to see whether it will need 
any changes. Sounds like a potential follow-up?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-11 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r356605173
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##
 @@ -27,15 +27,20 @@ abstract class FileScanBuilder(
 dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+  protected val supportsNestedSchemaPruning: Boolean = false
   protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
+// [SPARK-30107] While the passed `requiredSchema` always have pruned 
nested columns, the actual
+// data schema of this scan is determined in `readDataSchema`. File 
formats that don't support
+// nested schema pruning, use `requiredSchema` as a reference and perform 
the pruning partially.
 this.requiredSchema = requiredSchema
   }
 
   protected def readDataSchema(): StructType = {
 val requiredNameSet = createRequiredNameSet()
-val fields = dataSchema.fields.filter { field =>
+val schema = if (supportsNestedSchemaPruning) requiredSchema else 
dataSchema
 
 Review comment:
   If `SQLConf.get.nestedSchemaPruningEnabled` is false, we will use the old 
path without nested schema pruning. So, the passed `requiredSchema` will always 
be the source of truth for ORC and Parquet. I've added a test to confirm this 
to `SchemaPruningSuite`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-11 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r356602765
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##
 @@ -27,15 +27,20 @@ abstract class FileScanBuilder(
 dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+  protected val supportsNestedSchemaPruning: Boolean = false
   protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
+// [SPARK-30107] While the passed `requiredSchema` always have pruned 
nested columns, the actual
+// data schema of this scan is determined in `readDataSchema`. File 
formats that don't support
+// nested schema pruning, use `requiredSchema` as a reference and perform 
the pruning partially.
 this.requiredSchema = requiredSchema
 
 Review comment:
   I've updated the comment as well.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-11 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r356602388
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##
 @@ -27,15 +27,20 @@ abstract class FileScanBuilder(
 dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+  protected val supportsNestedSchemaPruning: Boolean = false
   protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
+// [SPARK-30107] While the passed `requiredSchema` always have pruned 
nested columns, the actual
+// data schema of this scan is determined in `readDataSchema`. File 
formats that don't support
+// nested schema pruning, use `requiredSchema` as a reference and perform 
the pruning partially.
 this.requiredSchema = requiredSchema
 
 Review comment:
   I think you are right and it seems that data sources such as CSV and JSON 
try to simply ignore columns that are not needed in readers (e.g. 
spark.sql.csv.parser.columnPruning.enabled).
   
   While CSV doesn't support nested data, JSON can potentially benefit from 
this. I haven't checked the JSON reader in detail to see whether it will need 
any changed. Sounds like a potential follow-up?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-10 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r356204713
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##
 @@ -27,15 +27,20 @@ abstract class FileScanBuilder(
 dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
+  protected val supportsNestedSchemaPruning: Boolean = false
 
 Review comment:
   nit: I should, probably, remove the explicit type annotation to be 
consistent with the rest of code in this class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-10 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r355930874
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ##
 @@ -438,6 +438,22 @@ object DataSourceStrategy {
 }
   }
 
+  /**
+   * The attribute name may differ from the one in the schema if the query 
analyzer
+   * is case insensitive. We should change attribute names to match the ones 
in the schema,
+   * so we do not need to worry about case sensitivity anymore.
+   */
+  protected[sql] def normalizeExprs(
 
 Review comment:
   That's what I would propose as well. Here is [the 
PR](https://github.com/apache/spark/pull/26830).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-06 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r354837323
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##
 @@ -26,34 +26,40 @@ abstract class FileScanBuilder(
 fileIndex: PartitioningAwareFileIndex,
 dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
+  private val partitionNameSet = toNameSet(partitionSchema)
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
   protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
-this.requiredSchema = requiredSchema
+// the default implementation prunes only top-level columns
+// file formats that support nested column pruning must override this
+val fields = dataSchema.fields ++ partitionSchema.fields
 
 Review comment:
   I've updated the PR to reflect what I meant. The old version is still 
accessible in the commit history.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-06 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r354834382
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##
 @@ -26,34 +26,40 @@ abstract class FileScanBuilder(
 fileIndex: PartitioningAwareFileIndex,
 dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
+  private val partitionNameSet = toNameSet(partitionSchema)
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
   protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
-this.requiredSchema = requiredSchema
+// the default implementation prunes only top-level columns
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-06 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r354834149
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
 ##
 @@ -76,28 +78,48 @@ object PushDownUtils extends PredicateHelper {
* @return the created `ScanConfig`(since column pruning is the last step of 
operator pushdown),
* and new output attributes after column pruning.
*/
-  // TODO: nested column pruning.
   def pruneColumns(
   scanBuilder: ScanBuilder,
   relation: DataSourceV2Relation,
-  exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
+  projects: Seq[NamedExpression],
+  filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
 scanBuilder match {
+  case r: SupportsPushDownRequiredColumns if 
SQLConf.get.nestedSchemaPruningEnabled =>
+val rootFields = SchemaPruning.identifyRootFields(projects, filters)
+val prunedSchema = if (rootFields.nonEmpty) {
+  SchemaPruning.pruneDataSchema(relation.schema, rootFields)
+} else {
+  new StructType()
+}
+r.pruneColumns(prunedSchema)
+val scan = r.build()
+val readSchema = scan.readSchema()
+scan -> toOutputAttrs(readSchema, relation)
+
   case r: SupportsPushDownRequiredColumns =>
+val exprs = projects ++ filters
 val requiredColumns = AttributeSet(exprs.flatMap(_.references))
 val neededOutput = relation.output.filter(requiredColumns.contains)
 if (neededOutput != relation.output) {
   r.pruneColumns(neededOutput.toStructType)
   val scan = r.build()
-  val nameToAttr = 
relation.output.map(_.name).zip(relation.output).toMap
-  scan -> scan.readSchema().toAttributes.map {
-// We have to keep the attribute id during transformation.
-a => a.withExprId(nameToAttr(a.name).exprId)
-  }
+  val readSchema = scan.readSchema()
+  scan -> toOutputAttrs(readSchema, relation)
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-06 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r354834063
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
 ##
 @@ -76,28 +78,48 @@ object PushDownUtils extends PredicateHelper {
* @return the created `ScanConfig`(since column pruning is the last step of 
operator pushdown),
* and new output attributes after column pruning.
*/
-  // TODO: nested column pruning.
   def pruneColumns(
   scanBuilder: ScanBuilder,
   relation: DataSourceV2Relation,
-  exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
+  projects: Seq[NamedExpression],
+  filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
 scanBuilder match {
+  case r: SupportsPushDownRequiredColumns if 
SQLConf.get.nestedSchemaPruningEnabled =>
+val rootFields = SchemaPruning.identifyRootFields(projects, filters)
+val prunedSchema = if (rootFields.nonEmpty) {
+  SchemaPruning.pruneDataSchema(relation.schema, rootFields)
+} else {
+  new StructType()
+}
+r.pruneColumns(prunedSchema)
+val scan = r.build()
+val readSchema = scan.readSchema()
+scan -> toOutputAttrs(readSchema, relation)
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-06 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r354832332
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
 ##
 @@ -76,28 +78,48 @@ object PushDownUtils extends PredicateHelper {
* @return the created `ScanConfig`(since column pruning is the last step of 
operator pushdown),
* and new output attributes after column pruning.
*/
-  // TODO: nested column pruning.
   def pruneColumns(
   scanBuilder: ScanBuilder,
   relation: DataSourceV2Relation,
-  exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
+  projects: Seq[NamedExpression],
+  filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
 scanBuilder match {
+  case r: SupportsPushDownRequiredColumns if 
SQLConf.get.nestedSchemaPruningEnabled =>
+val rootFields = SchemaPruning.identifyRootFields(projects, filters)
+val prunedSchema = if (rootFields.nonEmpty) {
+  SchemaPruning.pruneDataSchema(relation.schema, rootFields)
+} else {
+  new StructType()
 
 Review comment:
   Done


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-05 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r354377650
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##
 @@ -26,34 +26,40 @@ abstract class FileScanBuilder(
 fileIndex: PartitioningAwareFileIndex,
 dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
+  private val partitionNameSet = toNameSet(partitionSchema)
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
   protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
-this.requiredSchema = requiredSchema
+// the default implementation prunes only top-level columns
+// file formats that support nested column pruning must override this
+val fields = dataSchema.fields ++ partitionSchema.fields
 
 Review comment:
   Well, we don't necessarily need to extend `FileScanBuilder` with another 
builder.
   
   We can consider smth as below (I reverted most of the changes in 
`FileScanBuilder`):
   
   ```
   abstract class FileScanBuilder(
   sparkSession: SparkSession,
   fileIndex: PartitioningAwareFileIndex,
   dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
 private val partitionSchema = fileIndex.partitionSchema
 private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
 protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
   
 protected def supportsNestedSchemaPruning: Boolean = false
   
 override def pruneColumns(requiredSchema: StructType): Unit = {
   this.requiredSchema = requiredSchema
 }
   
 protected def readDataSchema(): StructType = {
   val requiredNameSet = createRequiredNameSet()
   val schema = if (supportsNestedSchemaPruning) requiredSchema else 
dataSchema
   val fields = schema.fields.filter { field =>
 val colName = PartitioningUtils.getColName(field, isCaseSensitive)
 requiredNameSet.contains(colName) && 
!partitionNameSet.contains(colName)
   }
   StructType(fields)
 }
   
 protected def readPartitionSchema(): StructType = {
   val requiredNameSet = createRequiredNameSet()
   val fields = partitionSchema.fields.filter { field =>
 val colName = PartitioningUtils.getColName(field, isCaseSensitive)
 requiredNameSet.contains(colName)
   }
   StructType(fields)
 }
   
 private def createRequiredNameSet(): Set[String] =
   requiredSchema.fields.map(PartitioningUtils.getColName(_, 
isCaseSensitive)).toSet
   
 private val partitionNameSet: Set[String] =
   partitionSchema.fields.map(PartitioningUtils.getColName(_, 
isCaseSensitive)).toSet
   }
   
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-05 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r354287169
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
 ##
 @@ -68,6 +68,10 @@ case class ParquetScanBuilder(
   // All filters that can be converted to Parquet are pushed down.
   override def pushedFilters(): Array[Filter] = pushedParquetFilters
 
+  override def pruneColumns(requiredSchema: StructType): Unit = {
+this.requiredSchema = requiredSchema
+  }
 
 Review comment:
   Basically, the default implementation in `FileScanBuilder` is pruning only 
top-level attributes.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-05 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r354286601
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetScanBuilder.scala
 ##
 @@ -68,6 +68,10 @@ case class ParquetScanBuilder(
   // All filters that can be converted to Parquet are pushed down.
   override def pushedFilters(): Array[Filter] = pushedParquetFilters
 
+  override def pruneColumns(requiredSchema: StructType): Unit = {
+this.requiredSchema = requiredSchema
+  }
 
 Review comment:
   I think data sources should do their best to prune whatever they can.
   
   Here is a quote from `SupportsPushDownRequiredColumns`:
   
   > Implementation should try its best to prune the unnecessary columns or 
nested fields, but it's also OK to do the pruning partially, e.g., a data 
source may not be able to prune nested fields, and only prune top-level columns.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-05 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r354285161
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
 ##
 @@ -76,28 +78,48 @@ object PushDownUtils extends PredicateHelper {
* @return the created `ScanConfig`(since column pruning is the last step of 
operator pushdown),
* and new output attributes after column pruning.
*/
-  // TODO: nested column pruning.
   def pruneColumns(
   scanBuilder: ScanBuilder,
   relation: DataSourceV2Relation,
-  exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
+  projects: Seq[NamedExpression],
+  filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
 scanBuilder match {
+  case r: SupportsPushDownRequiredColumns if 
SQLConf.get.nestedSchemaPruningEnabled =>
+val rootFields = SchemaPruning.identifyRootFields(projects, filters)
+val prunedSchema = if (rootFields.nonEmpty) {
 
 Review comment:
   This check was needed to detect if any nested column was requested. The old 
rule would not apply otherwise. Here, the situation is different as we need to 
prune top-level columns even if no nested attributes are requested.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-04 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r353976438
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##
 @@ -26,34 +26,40 @@ abstract class FileScanBuilder(
 fileIndex: PartitioningAwareFileIndex,
 dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
+  private val partitionNameSet = toNameSet(partitionSchema)
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
   protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
-this.requiredSchema = requiredSchema
+// the default implementation prunes only top-level columns
+// file formats that support nested column pruning must override this
+val fields = dataSchema.fields ++ partitionSchema.fields
 
 Review comment:
   @gengliangwang @dongjoon-hyun what about the overall approach here? Shall we 
keep it as is and let `readDataSchema` and `readPartitionSchema` deduplicate 
columns or shall we modify `readDataSchema` as suggested in my first comment?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-04 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r353973975
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##
 @@ -26,34 +26,40 @@ abstract class FileScanBuilder(
 fileIndex: PartitioningAwareFileIndex,
 dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
+  private val partitionNameSet = toNameSet(partitionSchema)
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
   protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
-this.requiredSchema = requiredSchema
+// the default implementation prunes only top-level columns
+// file formats that support nested column pruning must override this
+val fields = dataSchema.fields ++ partitionSchema.fields
 
 Review comment:
   Sounds good, will add a new one!


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-04 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r353969279
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
 ##
 @@ -76,28 +78,48 @@ object PushDownUtils extends PredicateHelper {
* @return the created `ScanConfig`(since column pruning is the last step of 
operator pushdown),
* and new output attributes after column pruning.
*/
-  // TODO: nested column pruning.
   def pruneColumns(
   scanBuilder: ScanBuilder,
   relation: DataSourceV2Relation,
-  exprs: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
+  projects: Seq[NamedExpression],
+  filters: Seq[Expression]): (Scan, Seq[AttributeReference]) = {
 scanBuilder match {
+  case r: SupportsPushDownRequiredColumns if 
SQLConf.get.nestedSchemaPruningEnabled =>
+val rootFields = SchemaPruning.identifyRootFields(projects, filters)
+val prunedSchema = if (rootFields.nonEmpty) {
+  SchemaPruning.pruneDataSchema(relation.schema, rootFields)
+} else {
+  new StructType()
 
 Review comment:
   Good idea


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-04 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r353969026
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##
 @@ -26,34 +26,40 @@ abstract class FileScanBuilder(
 fileIndex: PartitioningAwareFileIndex,
 dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
+  private val partitionNameSet = toNameSet(partitionSchema)
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
   protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
-this.requiredSchema = requiredSchema
+// the default implementation prunes only top-level columns
+// file formats that support nested column pruning must override this
+val fields = dataSchema.fields ++ partitionSchema.fields
 
 Review comment:
   What about extending `FileBasedDataSourceSuite` to cover csv as well? It 
seems there is a test for orc/parquet/json already.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-04 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r353712082
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##
 @@ -26,34 +26,40 @@ abstract class FileScanBuilder(
 fileIndex: PartitioningAwareFileIndex,
 dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
+  private val partitionNameSet = toNameSet(partitionSchema)
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
   protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
-this.requiredSchema = requiredSchema
+// the default implementation prunes only top-level columns
+// file formats that support nested column pruning must override this
+val fields = dataSchema.fields ++ partitionSchema.fields
 
 Review comment:
   If partition and data columns overlap, `requiredSchema` might contain 
duplicates as we match by name. However, `readDataSchema` and 
`readPartitionSchema` will still report correct structs.
   
   As an alternative to the current approach, we can keep the previous 
implementation of `pruneColumns` and come up with a new `ScanBuilder` that 
extends `FileScanBuilder` and overrides `readDataSchema` to iterate through 
`requiredSchema` instead of `dataSchema` (or simply put that logic in 
ORC/Parquet scan builders). That approach sounds slightly better, but I am also 
open to other suggestions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-04 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r353712082
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##
 @@ -26,34 +26,40 @@ abstract class FileScanBuilder(
 fileIndex: PartitioningAwareFileIndex,
 dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
+  private val partitionNameSet = toNameSet(partitionSchema)
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
   protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
-this.requiredSchema = requiredSchema
+// the default implementation prunes only top-level columns
+// file formats that support nested column pruning must override this
+val fields = dataSchema.fields ++ partitionSchema.fields
 
 Review comment:
   If partition and data columns overlap, `requiredSchema` might contain 
duplicates as we match by name. However, `readDataSchema` and 
`readPartitionSchema` will still report correct structs.
   
   As an alternative to the current approach, we can keep the previous 
implementation of `pruneColumns` and come up with a new `ScanBuilder` that 
extends `FileScanBuilder` and overrides `readDataSchema` to iterate through 
`requiredSchema` instead of `dataSchema` (or simply have the same logic in 
Orc/Parquet scan builders). That approach sounds slightly better, but I am also 
open to other suggestions.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-04 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r353712082
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##
 @@ -26,34 +26,40 @@ abstract class FileScanBuilder(
 fileIndex: PartitioningAwareFileIndex,
 dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
+  private val partitionNameSet = toNameSet(partitionSchema)
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
   protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
-this.requiredSchema = requiredSchema
+// the default implementation prunes only top-level columns
+// file formats that support nested column pruning must override this
+val fields = dataSchema.fields ++ partitionSchema.fields
 
 Review comment:
   If partition and data columns overlap, `requiredSchema` might contain 
duplicates as we match by name. However, `readDataSchema` and 
`readPartitionSchema` will still report correct structs.
   
   As an alternative to the current approach, we can keep the previous 
implementation of `pruneColumns` and come up with a new `ScanBuilder` that 
extends `FileScanBuilder` and overrides `readDataSchema` to iterate through 
`requiredSchema` instead of `dataSchema` (or simply have the same logic in 
Orc/Parquet scan builders). That sounds slightly better, but I am also open to 
other approaches.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-04 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r353712082
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##
 @@ -26,34 +26,40 @@ abstract class FileScanBuilder(
 fileIndex: PartitioningAwareFileIndex,
 dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
+  private val partitionNameSet = toNameSet(partitionSchema)
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
   protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
-this.requiredSchema = requiredSchema
+// the default implementation prunes only top-level columns
+// file formats that support nested column pruning must override this
+val fields = dataSchema.fields ++ partitionSchema.fields
 
 Review comment:
   If partition and data columns overlap, `requiredSchema` might contain 
duplicates as we match by name. However, `readDataSchema` and 
`readPartitionSchema` will still report correct structs.
   
   As an alternative to the current approach, we can keep the previous 
implementation of `pruneColumns` and come up with a new `ScanBuilder` that 
extends `FileScanBuilder` and overrides `readDataSchema` to iterate through 
`requiredSchema` instead of `dataSchema`. That sounds slightly better, but I am 
also open to other approaches.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-04 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r353712082
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##
 @@ -26,34 +26,40 @@ abstract class FileScanBuilder(
 fileIndex: PartitioningAwareFileIndex,
 dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
+  private val partitionNameSet = toNameSet(partitionSchema)
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
   protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
-this.requiredSchema = requiredSchema
+// the default implementation prunes only top-level columns
+// file formats that support nested column pruning must override this
+val fields = dataSchema.fields ++ partitionSchema.fields
 
 Review comment:
   If partition and data columns overlap, `requiredSchema` might contain 
duplicates as we match by name. However, `readDataSchema` and 
`readPartitionSchema` will still report correct structs.
   
   As an alternative to the current approach, we can keep the previous 
implementation of `pruneColumns` and come up with a new `ScanBuilder` that 
extends `FileScanBuilder` and overrides `readDataSchema` to iterate through 
`requiredSchema` instead of `dataSchema`. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-03 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r353340524
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileScanBuilder.scala
 ##
 @@ -26,34 +26,39 @@ abstract class FileScanBuilder(
 fileIndex: PartitioningAwareFileIndex,
 dataSchema: StructType) extends ScanBuilder with 
SupportsPushDownRequiredColumns {
   private val partitionSchema = fileIndex.partitionSchema
+  private val partitionNameSet = toNameSet(partitionSchema)
   private val isCaseSensitive = 
sparkSession.sessionState.conf.caseSensitiveAnalysis
   protected var requiredSchema = StructType(dataSchema.fields ++ 
partitionSchema.fields)
 
   override def pruneColumns(requiredSchema: StructType): Unit = {
-this.requiredSchema = requiredSchema
+// the default implementation prunes only top-level columns
+// file formats that support nested column pruning must override this
+val fields = dataSchema.fields ++ partitionSchema.fields
+val requiredNameSet = toNameSet(requiredSchema)
+val requiredFields = fields.filter { field =>
+  val colName = PartitioningUtils.getColName(field, isCaseSensitive)
+  requiredNameSet.contains(colName)
+}
+this.requiredSchema = StructType(requiredFields)
   }
 
   protected def readDataSchema(): StructType = {
-val requiredNameSet = createRequiredNameSet()
-val fields = dataSchema.fields.filter { field =>
+val dataFields = requiredSchema.fields.filter { field =>
 
 Review comment:
   We iterate through `requiredSchema` to pick changes to nested columns (i.e. 
`dataSchema` can contain the old struct).


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-03 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r353339746
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ##
 @@ -438,6 +438,22 @@ object DataSourceStrategy {
 }
   }
 
+  /**
+   * The attribute name may differ from the one in the schema if the query 
analyzer
+   * is case insensitive. We should change attribute names to match the ones 
in the schema,
+   * so we do not need to worry about case sensitivity anymore.
+   */
+  protected[sql] def normalizeExprs(
 
 Review comment:
   I copied the body from `DataSourceStrategy$normalizeFilters` to minimize the 
changes. Obviously, it is a temp solution until we agree on a better way in the 
community. For example, I can submit a separate PR to rename 
`normalizeFilters`. I think we want to avoid mixing such a rename with the 
logic in this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] [spark] aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] Expose nested schema pruning to all V2 sources

2019-12-03 Thread GitBox
aokolnychyi commented on a change in pull request #26751: [SPARK-30107][SQL] 
Expose nested schema pruning to all V2 sources
URL: https://github.com/apache/spark/pull/26751#discussion_r353339746
 
 

 ##
 File path: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 ##
 @@ -438,6 +438,22 @@ object DataSourceStrategy {
 }
   }
 
+  /**
+   * The attribute name may differ from the one in the schema if the query 
analyzer
+   * is case insensitive. We should change attribute names to match the ones 
in the schema,
+   * so we do not need to worry about case sensitivity anymore.
+   */
+  protected[sql] def normalizeExprs(
 
 Review comment:
   I copied the body from `DataSourceStrategy$normalizeFilters` to minimize the 
changes. Obviously, it is a temp solution until we agree on a better way in the 
community. For example, I can submit a separate PR to rename `normalizeFilters`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org