[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman closed the pull request at: https://github.com/apache/spark/pull/16578 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r181575614 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -151,6 +151,9 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) // The following batch should be executed after batch "Join Reorder" and "LocalRelation". Batch("Check Cartesian Products", Once, CheckCartesianProducts) :+ +Batch("Field Extraction Pushdown", fixedPoint, + AggregateFieldExtractionPushdown, + JoinFieldExtractionPushdown) :+ --- End diff -- Hi @gatorsmile. Given the scope of your request, can I ask you to provide a reason for it? What you ask would invalidate some of the existing conversation and review of this PR. It would also substantially restrict the practical usability of this patch. I believe I've written this patch with a logical separation of concerns along the lines you've requested. As a compromise, would you consider an incremental review starting with the basic projection/filter functionality and proceeding to the optimizer rules following them? BTW I'm traveling for a few weeks, and I'm spending most of my time away from work. If I'm delayed in responding, that's the reason. I'll still keep up, but at a slower pace. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r180495402 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -151,6 +151,9 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) // The following batch should be executed after batch "Join Reorder" and "LocalRelation". Batch("Check Cartesian Products", Once, CheckCartesianProducts) :+ +Batch("Field Extraction Pushdown", fixedPoint, + AggregateFieldExtractionPushdown, + JoinFieldExtractionPushdown) :+ --- End diff -- @mallman Could you split these new optimizer rules to two PRs first? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user DaimonPl commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r151917066 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -961,6 +961,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data.") + .booleanConf + .createWithDefault(true) --- End diff -- So maybe at least make it true for some core sql/parquet test suits? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r151597063 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -961,6 +961,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data.") + .booleanConf + .createWithDefault(true) --- End diff -- As I know, seems we don't have a config setting specified for all tests previously. We have set a config for a whole test suite, but not for all tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user DaimonPl commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r151485646 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -961,6 +961,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data.") + .booleanConf + .createWithDefault(true) --- End diff -- Nope :( maybe @viirya can give input about it? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r151477529 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -961,6 +961,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data.") + .booleanConf + .createWithDefault(true) --- End diff -- Ah. Sounds reasonable. Do you know how to do that? Is there a precedent I can follow? I'm not aware of one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user DaimonPl commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r151476679 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -961,6 +961,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data.") + .booleanConf + .createWithDefault(true) --- End diff -- Just to be clear. I mean to make it default true for all tests in spark. Not only those explicitly related to this feature :) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r151474342 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -961,6 +961,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data.") + .booleanConf + .createWithDefault(true) --- End diff -- It needs to be set `true` for the tests. This can be done. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user DaimonPl commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r151344821 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -961,6 +961,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data.") + .booleanConf + .createWithDefault(true) --- End diff -- How about making it default true for tests? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r151026919 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -961,6 +961,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data.") + .booleanConf + .createWithDefault(true) --- End diff -- Giving it more though, I believe it's prudent to choose correctness over performance. I will change the default to `false`. "Power users" will set it to `true` and (hopefully) report a problem if they run into one. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r150261547 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { + // Parquet-mr will throw an exception if we try to read a superset of the file's schema. + // Therefore, we intersect our clipped schema with the underlying file's schema --- End diff -- This is interesting because if we don't do nested pruning, the superset of parquet reading schema like: ``` message spark_schema { optional group name { optional binary first (UTF8); optional binary middle (UTF8); optional binary last (UTF8); } optional binary address (UTF8); } ``` won't cause any failure. Once we perform nested pruning, the required parquet schema becomes: ``` message spark_schema { optional group name { optional binary middle (UTF8); } optional binary address (UTF8); } ``` Then if we don't remove the "group name" from the required schema, the failure happens. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r149880006 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -961,6 +961,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data.") + .booleanConf + .createWithDefault(true) --- End diff -- nit: are we confident enough to set it as true by default? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r149293915 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a nested field, continue. Otherwise, +// return [[op]] +if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) { + val prunedSchema = requestedFields +.map { case (field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) + val dataSchemaFieldNames = dataSchema.fieldNames.toSet + val prunedDataSchema = +StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) + + // If the data schema is different from the pruned data schema, continue. Otherwise, + // return [[op]]. We effect this comparison by counting the number of "leaf" fields in + // each schemata, assuming the fields in [[prunedDataSchema]] are a subset of the fields + // in [[dataSchema]]. + if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { +val prunedParquetRelation = + hadoopFsRelation.copy(dataSchema = prunedDataSchema)(hadoopFsRelation.sparkSession) + +// We need to replace the expression ids of the pruned relation output attributes +// with the expression ids of the original relation output attributes so that +// references to the original relation's output are not broken +val outputIdMap = l.output.map(att => (att.name, att.exprId)).toMap +val prunedRelationOutput = + prunedParquetRelation +.schema +.toAttributes +.map { + case att if outputIdMap.contains(att.name) => +att.withExprId(outputIdMap(att.name)) + case att => att +} +val prunedRelation = + l.copy(relation = prunedParquetRelation, output = prunedRelationOutput, +catalogTable = None) --- End diff -- Because `catalogTable` can contain some data such as statistics, that might be useful. Not sure if we may lose some optimization chance. --- --
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r149880058 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -961,6 +961,15 @@ object SQLConf { .booleanConf .createWithDefault(true) + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data.") --- End diff -- Please also mention this is only applied on Parquet data source for now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148866084 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { + // Parquet-mr will throw an exception if we try to read a superset of the file's schema. + // Therefore, we intersect our clipped schema with the underlying file's schema --- End diff -- On the other hand, if we fix `parquetMrCompatibility` to `true`, then a couple of other tests fail. Namely, these tests are "Filter applied on merged Parquet schema with new column should work" in `ParquetFilterSuite.scala` "read partitioned table - merging compatible schemas" in `ParquetPartitionDiscoverySuite.scala` In both cases, the failures involve queries over multiple files with compatible but different schema. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148863673 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { + // Parquet-mr will throw an exception if we try to read a superset of the file's schema. + // Therefore, we intersect our clipped schema with the underlying file's schema --- End diff -- As for the problem of requesting a read of a superset of a file's fields, if we disable the `parquetMrCompatibility` code, then the "partial schema intersection - select missing subfield" test in `ParquetSchemaPruningSuite.scala` fails with the following stack trace: ``` [info] Cause: org.apache.parquet.io.ParquetDecodingException: Can not read value at 0 in block -1 in file file:/Volumes/VideoAmpCS/msa/workspace/spark-public/target/tmp/spark-a0bda193-9d3f-4cd1-885c-9e8b5b0fc1ed/contacts/p=2/part-1-4a8671f1-afb2-482f-8c4d-4f6f4df896bc-c000.snappy.parquet [info] at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:223) [info] at org.apache.parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:215) [info] at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39) [info] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) [info] at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:106) [info] at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:182) [info] at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:106) [info] at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source) [info] at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) [info] at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$7$$anon$1.hasNext(WholeStageCodegenExec.scala:432) [info] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) [info] at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408) [info] at org.apache.spark.util.Utils$.getIteratorSize(Utils.scala:1820) [info] at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1160) [info] at org.apache.spark.rdd.RDD$$anonfun$count$1.apply(RDD.scala:1160) [info] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2064) [info] at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2064) [info] at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) [info] at org.apache.spark.scheduler.Task.run(Task.scala:108) [info] at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) [info] at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [info] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [info] at java.lang.Thread.run(Thread.java:748) [info] Cause: java.lang.IndexOutOfBoundsException: Index: 0, Size: 0 [info] at java.util.ArrayList.rangeCheck(ArrayList.java:653) [info] at java.util.ArrayList.get(ArrayList.java:429) [info] at org.apache.parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:103) [info] at org.apache.parquet.io.GroupColumnIO.getFirst(GroupColumnIO.java:103) [info] at org.apache.parquet.io.PrimitiveColumnIO.getFirst(PrimitiveColumnIO.java:102) [info] at org.apache.parquet.io.PrimitiveColumnIO.isFirst(PrimitiveColumnIO.java:97) [info] at org.apache.parquet.io.RecordReaderImplementation.(RecordReaderImplementation.java:278) [info] at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:141) [info] at org.apache.parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:107) [info] at org.apache.parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:155) [info] at org.apache.parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:107) [info] at org.apache.parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:136) [info] at org.apache.parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:194) [info] at org.apache.parquet.hadoop
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148731634 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala --- @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.types._ + +// scalastyle:off line.size.limit +class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll { + // The test schema as a tree string, i.e. `schema.treeString` + // root + // |-- col1: string (nullable = false) + // |-- col2: struct (nullable = true) + // ||-- field1: integer (nullable = true) + // ||-- field2: array (nullable = true) + // |||-- element: integer (containsNull = false) + // ||-- field3: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: integer (nullable = true) + // ||||-- subfield3: array (nullable = true) + // |||||-- element: integer (containsNull = true) + // ||-- field4: map (nullable = true) + // |||-- key: string + // |||-- value: struct (valueContainsNull = false) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: array (nullable = true) + // |||||-- element: integer (containsNull = false) + // ||-- field5: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: struct (nullable = false) + // |||||-- subsubfield1: integer (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||||-- subfield2: struct (nullable = true) + // |||||-- subsubfield1: struct (nullable = true) + // ||||||-- subsubsubfield1: string (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||-- field6: struct (nullable = true) + // |||-- subfield1: string (nullable = false) + // |||-- subfield2: string (nullable = true) + // ||-- field7: struct (nullable = true) + // |||-- subfield1: struct (nullable = true) + // ||||-- subsubfield1: integer (nullable = true) + // ||||-- subsubfield2: integer (nullable = true) + // ||-- field8: map (nullable = true) + // |||-- key: string + // |||-- value: array (valueContainsNull = false) + // ||||-- element: struct (containsNull = true) + // |||||-- subfield1: integer (nullable = true) + // |||||-- subfield2: array (nullable = true) + // ||||||-- element: integer (containsNull = false) + // ||-- field9: map (nullable = true) + // |||-- key: string + // |||-- value: integer (valueContainsNull = false) + // |-- col3: array (nullable = false) + // ||-- element: struct (containsNull = false) + // |||-- field1: struct (nullable = true) + // ||||-- subfield1: integer (nullable = false) + // ||||-- subfield2: integer (nullable = true) + // |||-- field2: map (nullable = true) + // ||||-- key: string + // ||||-- value: integer (valueContainsN
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148731266 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala --- @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.types._ + +// scalastyle:off line.size.limit +class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll { + // The test schema as a tree string, i.e. `schema.treeString` + // root + // |-- col1: string (nullable = false) + // |-- col2: struct (nullable = true) + // ||-- field1: integer (nullable = true) + // ||-- field2: array (nullable = true) + // |||-- element: integer (containsNull = false) + // ||-- field3: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: integer (nullable = true) + // ||||-- subfield3: array (nullable = true) + // |||||-- element: integer (containsNull = true) + // ||-- field4: map (nullable = true) + // |||-- key: string + // |||-- value: struct (valueContainsNull = false) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: array (nullable = true) + // |||||-- element: integer (containsNull = false) + // ||-- field5: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: struct (nullable = false) + // |||||-- subsubfield1: integer (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||||-- subfield2: struct (nullable = true) + // |||||-- subsubfield1: struct (nullable = true) + // ||||||-- subsubsubfield1: string (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||-- field6: struct (nullable = true) + // |||-- subfield1: string (nullable = false) + // |||-- subfield2: string (nullable = true) + // ||-- field7: struct (nullable = true) + // |||-- subfield1: struct (nullable = true) + // ||||-- subsubfield1: integer (nullable = true) + // ||||-- subsubfield2: integer (nullable = true) + // ||-- field8: map (nullable = true) + // |||-- key: string + // |||-- value: array (valueContainsNull = false) + // ||||-- element: struct (containsNull = true) + // |||||-- subfield1: integer (nullable = true) + // |||||-- subfield2: array (nullable = true) + // ||||||-- element: integer (containsNull = false) + // ||-- field9: map (nullable = true) + // |||-- key: string + // |||-- value: integer (valueContainsNull = false) + // |-- col3: array (nullable = false) + // ||-- element: struct (containsNull = false) + // |||-- field1: struct (nullable = true) + // ||||-- subfield1: integer (nullable = false) + // ||||-- subfield2: integer (nullable = true) + // |||-- field2: map (nullable = true) + // ||||-- key: string + // ||||-- value: integer (valueContainsN
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148725914 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { + // Parquet-mr will throw an exception if we try to read a superset of the file's schema. + // Therefore, we intersect our clipped schema with the underlying file's schema --- End diff -- We can request a read of a superset of a file's fields for the case of a partitioned table with partitions with a subset of the table's fields. See my related comment and example in `ParquetRowConverter.scala`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148725482 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/FileSchemaPruningTest.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.scalactic.Equality +import org.scalatest.Assertions + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.types.StructType + +private[sql] trait FileSchemaPruningTest { + _: Assertions => + + private val schemaEquality = new Equality[StructType] { +override def areEqual(a: StructType, b: Any) = + b match { +case otherType: StructType => a sameType otherType +case _ => false + } + } + + protected def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: String*): Unit = { +val fileSourceScanSchemata = --- End diff -- `fileSourceScanSchemata` is a `Seq[StructType]`, so I made it plural. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148725325 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/FileSchemaPruningTest.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.scalactic.Equality +import org.scalatest.Assertions + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.types.StructType + +private[sql] trait FileSchemaPruningTest { + _: Assertions => + + private val schemaEquality = new Equality[StructType] { +override def areEqual(a: StructType, b: Any) = + b match { +case otherType: StructType => a sameType otherType +case _ => false + } + } + + protected def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: String*): Unit = { --- End diff -- I used the plural form here because `expectedSchemaCatalogStrings` is a varargs type. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148725152 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a nested field, continue. Otherwise, +// return [[op]] +if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) { + val prunedSchema = requestedFields +.map { case (field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) + val dataSchemaFieldNames = dataSchema.fieldNames.toSet + val prunedDataSchema = +StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) + + // If the data schema is different from the pruned data schema, continue. Otherwise, + // return [[op]]. We effect this comparison by counting the number of "leaf" fields in + // each schemata, assuming the fields in [[prunedDataSchema]] are a subset of the fields + // in [[dataSchema]]. + if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { +val prunedParquetRelation = + hadoopFsRelation.copy(dataSchema = prunedDataSchema)(hadoopFsRelation.sparkSession) + +// We need to replace the expression ids of the pruned relation output attributes +// with the expression ids of the original relation output attributes so that +// references to the original relation's output are not broken +val outputIdMap = l.output.map(att => (att.name, att.exprId)).toMap +val prunedRelationOutput = --- End diff -- Top-level attributes of struct type whose type has been pruned need to be replaced in the logical relation's output. The pruned attributes are constructed in the `toAttributes` method call on the pruned schema. The expression ids of these replacement attributes are altered to the expression ids of the original attributes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148723190 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a nested field, continue. Otherwise, +// return [[op]] +if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) { + val prunedSchema = requestedFields +.map { case (field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) + val dataSchemaFieldNames = dataSchema.fieldNames.toSet + val prunedDataSchema = +StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) + + // If the data schema is different from the pruned data schema, continue. Otherwise, + // return [[op]]. We effect this comparison by counting the number of "leaf" fields in + // each schemata, assuming the fields in [[prunedDataSchema]] are a subset of the fields + // in [[dataSchema]]. + if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { +val prunedParquetRelation = + hadoopFsRelation.copy(dataSchema = prunedDataSchema)(hadoopFsRelation.sparkSession) + +// We need to replace the expression ids of the pruned relation output attributes +// with the expression ids of the original relation output attributes so that +// references to the original relation's output are not broken +val outputIdMap = l.output.map(att => (att.name, att.exprId)).toMap +val prunedRelationOutput = + prunedParquetRelation +.schema +.toAttributes +.map { + case att if outputIdMap.contains(att.name) => +att.withExprId(outputIdMap(att.name)) + case att => att +} +val prunedRelation = + l.copy(relation = prunedParquetRelation, output = prunedRelationOutput, +catalogTable = None) --- End diff -- I just made a judgment call here based on the idea that by modifying the logical relation it's no longer the same as the catalog table. What do you think? --- --
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148722822 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -127,8 +127,8 @@ private[parquet] class ParquetRowConverter( extends ParquetGroupConverter(updater) with Logging { assert( -parquetType.getFieldCount == catalystType.length, -s"""Field counts of the Parquet schema and the Catalyst schema don't match: +parquetType.getFieldCount <= catalystType.length, --- End diff -- In `ParquetReadSupport.scala`, when `parquetMrCompatibility` is `true`, we intersect the clipped parquet schema with the underlying parquet file's schema. This can result in a requested parquet schema with fewer fields than the requested catalyst schema. For example, in the case of a partitioned table where we select a column which doesn't exist in the schema of one partition's files, we will remove the missing columns from the requested parquet schema. This scenario is illustrated and tested by the "partial schema intersection - select missing subfield" test in `ParquetSchemaPruningSuite.scala`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148720677 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} + +/** + * Pushes down aliases to [[expressions.GetStructField]] expressions in an aggregate's grouping and + * aggregate expressions into a projection over its children. The original + * [[expressions.GetStructField]] expressions are replaced with references to the pushed down + * aliases. + */ +object AggregateFieldExtractionPushdown extends FieldExtractionPushdown { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case agg @ Aggregate(groupingExpressions, aggregateExpressions, child) => +val expressions = groupingExpressions ++ aggregateExpressions +val attributes = AttributeSet(expressions.collect { case att: Attribute => att }) +val childAttributes = AttributeSet(child.expressions) +val fieldExtractors0 = + expressions +.flatMap(getFieldExtractors) +.distinct +val fieldExtractors1 = + fieldExtractors0 +.filter(_.collectFirst { case att: Attribute => att } + .filter(attributes.contains).isEmpty) +val fieldExtractors = + fieldExtractors1 +.filter(_.collectFirst { case att: Attribute => att } + .filter(childAttributes.contains).nonEmpty) + +if (fieldExtractors.nonEmpty) { + val (aliases, substituteAttributes) = constructAliasesAndSubstitutions(fieldExtractors) + + // Construct the new grouping and aggregate expressions by substituting + // each GetStructField expression with a reference to its alias + val newAggregateExpressions = +aggregateExpressions.map(substituteAttributes) + .collect { case named: NamedExpression => named } --- End diff -- I meant it might be like: `aggregateExpressions.map(substituteAttributes).asInstanceOf[Seq[NamedExpression]]`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148720590 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/FieldExtractionPushdown.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, GetStructField} +import org.apache.spark.sql.catalyst.planning.SelectedField +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +abstract class FieldExtractionPushdown extends Rule[LogicalPlan] { --- End diff -- Oh. I just thought `FieldExtractionPushdown` seems not to be necessarily a `Rule` and being just a simple trait. `AggregateFieldExtractionPushdown` and `JoinFieldExtractionPushdown` extends `Rule` and `FieldExtractionPushdown` then. Not a big deal. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148719962 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that projects an expression over a given schema. Data types, + * field indexes and array lengths of complex type extractors and attributes + * are adjusted to fit the schema. All other expressions are left as-is. + */ +case class ProjectionOverSchema(schema: StructType) { --- End diff -- > Can you describe clearly in comment that this is used in nested column pruning? Will do. > ProjectionOverPrunedSchema ? It's a thought. Technically this supports projection over any schema, pruned or otherwise. But its reason for existence is schema pruning. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148718059 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that projects an expression over a given schema. Data types, + * field indexes and array lengths of complex type extractors and attributes --- End diff -- I don't remember why I used the term "array lengths" here, but I believe it pertains to the `numFields` field of the `GetArrayStructFields` case class. Projecting over an array of structs may filter out one or more fields in that struct. In that case, `numFields` needs to be adjusted. I'll revise the doc comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148717122 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/JoinFieldExtractionPushdown.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project} + +/** + * Pushes down aliases to [[expressions.GetStructField]] expressions in a projection over a join + * and its join condition. The original [[expressions.GetStructField]] expressions are replaced + * with references to the pushed down aliases. + */ +object JoinFieldExtractionPushdown extends FieldExtractionPushdown { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, Seq(), + join @ Join(left, right, joinType, Some(joinCondition))) => +val fieldExtractors = (projects :+ joinCondition).flatMap(getFieldExtractors).distinct + +if (fieldExtractors.nonEmpty) { + val (aliases, substituteAttributes) = constructAliasesAndSubstitutions(fieldExtractors) + + // Construct the new projections and join condition by substituting each GetStructField + // expression with a reference to its alias + val newProjects = +projects.map(substituteAttributes).collect { case named: NamedExpression => named } --- End diff -- Please see my reply to the same question for `AggregateFieldExtractionPushdown`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148717085 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/FieldExtractionPushdown.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, GetStructField} +import org.apache.spark.sql.catalyst.planning.SelectedField +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +abstract class FieldExtractionPushdown extends Rule[LogicalPlan] { --- End diff -- Not sure what you're getting at, but this seems reasonable to me. `FieldExtractionPushdown` is part of an abstract class hierarchy that supports `AggregateFieldExtractionPushdown` and `JoinFieldExtractionPushdown`. The latter two must be instances of `Rule[LogicalPlan]` to be plugged into the optimizer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148716702 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} + +/** + * Pushes down aliases to [[expressions.GetStructField]] expressions in an aggregate's grouping and + * aggregate expressions into a projection over its children. The original + * [[expressions.GetStructField]] expressions are replaced with references to the pushed down + * aliases. + */ +object AggregateFieldExtractionPushdown extends FieldExtractionPushdown { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case agg @ Aggregate(groupingExpressions, aggregateExpressions, child) => +val expressions = groupingExpressions ++ aggregateExpressions +val attributes = AttributeSet(expressions.collect { case att: Attribute => att }) +val childAttributes = AttributeSet(child.expressions) +val fieldExtractors0 = + expressions +.flatMap(getFieldExtractors) +.distinct +val fieldExtractors1 = + fieldExtractors0 +.filter(_.collectFirst { case att: Attribute => att } + .filter(attributes.contains).isEmpty) +val fieldExtractors = + fieldExtractors1 +.filter(_.collectFirst { case att: Attribute => att } + .filter(childAttributes.contains).nonEmpty) + +if (fieldExtractors.nonEmpty) { + val (aliases, substituteAttributes) = constructAliasesAndSubstitutions(fieldExtractors) + + // Construct the new grouping and aggregate expressions by substituting + // each GetStructField expression with a reference to its alias + val newAggregateExpressions = +aggregateExpressions.map(substituteAttributes) + .collect { case named: NamedExpression => named } --- End diff -- I'm not sure exactly what you're asking, but the compiler infers that `newAggregateExpressions` is `Seq[NamedExpression]` because of the type signature of `{ case named: NamedExpression => named }`. We don't need any kind of type casting here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148716194 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} + +/** + * Pushes down aliases to [[expressions.GetStructField]] expressions in an aggregate's grouping and + * aggregate expressions into a projection over its children. The original + * [[expressions.GetStructField]] expressions are replaced with references to the pushed down + * aliases. + */ +object AggregateFieldExtractionPushdown extends FieldExtractionPushdown { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case agg @ Aggregate(groupingExpressions, aggregateExpressions, child) => +val expressions = groupingExpressions ++ aggregateExpressions +val attributes = AttributeSet(expressions.collect { case att: Attribute => att }) +val childAttributes = AttributeSet(child.expressions) +val fieldExtractors0 = + expressions +.flatMap(getFieldExtractors) +.distinct +val fieldExtractors1 = + fieldExtractors0 +.filter(_.collectFirst { case att: Attribute => att } + .filter(attributes.contains).isEmpty) --- End diff -- The attribute `a` is not in `expressions`, so it is not in `attributes`. When we construct `attributes`, we simply collect instances of `Attribute`. We don't do any recursion. Your query is tested by the "basic aggregate field extraction pushdown" test in `AggregateFieldExtractionPushdownSuite`. It's a little difficult to see because I'm using the Catalyst DataFrame DSL. This seems to be the convention in these tests, though. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148687395 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/FieldExtractionPushdown.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, GetStructField} +import org.apache.spark.sql.catalyst.planning.SelectedField +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +abstract class FieldExtractionPushdown extends Rule[LogicalPlan] { --- End diff -- > nit: Does this need to be a Rule? As opposed to? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148456230 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/FileSchemaPruningTest.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.scalactic.Equality +import org.scalatest.Assertions + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.types.StructType + +private[sql] trait FileSchemaPruningTest { + _: Assertions => + + private val schemaEquality = new Equality[StructType] { +override def areEqual(a: StructType, b: Any) = + b match { +case otherType: StructType => a sameType otherType +case _ => false + } + } + + protected def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: String*): Unit = { +val fileSourceScanSchemata = --- End diff -- `fileSourceScanSchemata` -> `fileSourceScanSchema`? and also below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148455168 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala --- @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.types._ + +// scalastyle:off line.size.limit +class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll { + // The test schema as a tree string, i.e. `schema.treeString` + // root + // |-- col1: string (nullable = false) + // |-- col2: struct (nullable = true) + // ||-- field1: integer (nullable = true) + // ||-- field2: array (nullable = true) + // |||-- element: integer (containsNull = false) + // ||-- field3: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: integer (nullable = true) + // ||||-- subfield3: array (nullable = true) + // |||||-- element: integer (containsNull = true) + // ||-- field4: map (nullable = true) + // |||-- key: string + // |||-- value: struct (valueContainsNull = false) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: array (nullable = true) + // |||||-- element: integer (containsNull = false) + // ||-- field5: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: struct (nullable = false) + // |||||-- subsubfield1: integer (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||||-- subfield2: struct (nullable = true) + // |||||-- subsubfield1: struct (nullable = true) + // ||||||-- subsubsubfield1: string (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||-- field6: struct (nullable = true) + // |||-- subfield1: string (nullable = false) + // |||-- subfield2: string (nullable = true) + // ||-- field7: struct (nullable = true) + // |||-- subfield1: struct (nullable = true) + // ||||-- subsubfield1: integer (nullable = true) + // ||||-- subsubfield2: integer (nullable = true) + // ||-- field8: map (nullable = true) + // |||-- key: string + // |||-- value: array (valueContainsNull = false) + // ||||-- element: struct (containsNull = true) + // |||||-- subfield1: integer (nullable = true) + // |||||-- subfield2: array (nullable = true) + // ||||||-- element: integer (containsNull = false) + // ||-- field9: map (nullable = true) + // |||-- key: string + // |||-- value: integer (valueContainsNull = false) + // |-- col3: array (nullable = false) + // ||-- element: struct (containsNull = false) + // |||-- field1: struct (nullable = true) + // ||||-- subfield1: integer (nullable = false) + // ||||-- subfield2: integer (nullable = true) + // |||-- field2: map (nullable = true) + // ||||-- key: string + // ||||-- value: integer (valueContainsNu
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148454584 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala --- @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.types._ + +// scalastyle:off line.size.limit +class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll { + // The test schema as a tree string, i.e. `schema.treeString` + // root + // |-- col1: string (nullable = false) + // |-- col2: struct (nullable = true) + // ||-- field1: integer (nullable = true) + // ||-- field2: array (nullable = true) + // |||-- element: integer (containsNull = false) + // ||-- field3: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: integer (nullable = true) + // ||||-- subfield3: array (nullable = true) + // |||||-- element: integer (containsNull = true) + // ||-- field4: map (nullable = true) + // |||-- key: string + // |||-- value: struct (valueContainsNull = false) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: array (nullable = true) + // |||||-- element: integer (containsNull = false) + // ||-- field5: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: struct (nullable = false) + // |||||-- subsubfield1: integer (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||||-- subfield2: struct (nullable = true) + // |||||-- subsubfield1: struct (nullable = true) + // ||||||-- subsubsubfield1: string (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||-- field6: struct (nullable = true) + // |||-- subfield1: string (nullable = false) + // |||-- subfield2: string (nullable = true) + // ||-- field7: struct (nullable = true) + // |||-- subfield1: struct (nullable = true) + // ||||-- subsubfield1: integer (nullable = true) + // ||||-- subsubfield2: integer (nullable = true) + // ||-- field8: map (nullable = true) + // |||-- key: string + // |||-- value: array (valueContainsNull = false) + // ||||-- element: struct (containsNull = true) + // |||||-- subfield1: integer (nullable = true) + // |||||-- subfield2: array (nullable = true) + // ||||||-- element: integer (containsNull = false) + // ||-- field9: map (nullable = true) + // |||-- key: string + // |||-- value: integer (valueContainsNull = false) + // |-- col3: array (nullable = false) + // ||-- element: struct (containsNull = false) + // |||-- field1: struct (nullable = true) + // ||||-- subfield1: integer (nullable = false) + // ||||-- subfield2: integer (nullable = true) + // |||-- field2: map (nullable = true) + // ||||-- key: string + // ||||-- value: integer (valueContainsNu
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148455060 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala --- @@ -0,0 +1,440 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.types._ + +// scalastyle:off line.size.limit +class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll { + // The test schema as a tree string, i.e. `schema.treeString` + // root + // |-- col1: string (nullable = false) + // |-- col2: struct (nullable = true) + // ||-- field1: integer (nullable = true) + // ||-- field2: array (nullable = true) + // |||-- element: integer (containsNull = false) + // ||-- field3: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: integer (nullable = true) + // ||||-- subfield3: array (nullable = true) + // |||||-- element: integer (containsNull = true) + // ||-- field4: map (nullable = true) + // |||-- key: string + // |||-- value: struct (valueContainsNull = false) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: array (nullable = true) + // |||||-- element: integer (containsNull = false) + // ||-- field5: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: struct (nullable = false) + // |||||-- subsubfield1: integer (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||||-- subfield2: struct (nullable = true) + // |||||-- subsubfield1: struct (nullable = true) + // ||||||-- subsubsubfield1: string (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||-- field6: struct (nullable = true) + // |||-- subfield1: string (nullable = false) + // |||-- subfield2: string (nullable = true) + // ||-- field7: struct (nullable = true) + // |||-- subfield1: struct (nullable = true) + // ||||-- subsubfield1: integer (nullable = true) + // ||||-- subsubfield2: integer (nullable = true) + // ||-- field8: map (nullable = true) + // |||-- key: string + // |||-- value: array (valueContainsNull = false) + // ||||-- element: struct (containsNull = true) + // |||||-- subfield1: integer (nullable = true) + // |||||-- subfield2: array (nullable = true) + // ||||||-- element: integer (containsNull = false) + // ||-- field9: map (nullable = true) + // |||-- key: string + // |||-- value: integer (valueContainsNull = false) + // |-- col3: array (nullable = false) + // ||-- element: struct (containsNull = false) + // |||-- field1: struct (nullable = true) + // ||||-- subfield1: integer (nullable = false) + // ||||-- subfield2: integer (nullable = true) + // |||-- field2: map (nullable = true) + // ||||-- key: string + // ||||-- value: integer (valueContainsNu
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148452868 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a nested field, continue. Otherwise, +// return [[op]] +if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) { + val prunedSchema = requestedFields +.map { case (field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) + val dataSchemaFieldNames = dataSchema.fieldNames.toSet + val prunedDataSchema = +StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) + + // If the data schema is different from the pruned data schema, continue. Otherwise, + // return [[op]]. We effect this comparison by counting the number of "leaf" fields in + // each schemata, assuming the fields in [[prunedDataSchema]] are a subset of the fields + // in [[dataSchema]]. + if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { +val prunedParquetRelation = + hadoopFsRelation.copy(dataSchema = prunedDataSchema)(hadoopFsRelation.sparkSession) + +// We need to replace the expression ids of the pruned relation output attributes +// with the expression ids of the original relation output attributes so that +// references to the original relation's output are not broken +val outputIdMap = l.output.map(att => (att.name, att.exprId)).toMap +val prunedRelationOutput = --- End diff -- Will we change the top-level attributes during pruning and make a different output for the logical relation? I think only the data types of the nested columns are changed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148450744 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -127,8 +127,8 @@ private[parquet] class ParquetRowConverter( extends ParquetGroupConverter(updater) with Logging { assert( -parquetType.getFieldCount == catalystType.length, -s"""Field counts of the Parquet schema and the Catalyst schema don't match: +parquetType.getFieldCount <= catalystType.length, --- End diff -- Why it can be less than catalyst type length now? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148450632 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { + // Parquet-mr will throw an exception if we try to read a superset of the file's schema. + // Therefore, we intersect our clipped schema with the underlying file's schema --- End diff -- Can you give an example it would fail? We didn't change `clipParquetSchema`, so I think even when pruning happens, why we read a super set of the file's schema and cause the exception, according to the comment? We won't add new fields but remove existing from the file's schema, right? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148456140 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/FileSchemaPruningTest.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution + +import org.scalactic.Equality +import org.scalatest.Assertions + +import org.apache.spark.sql.DataFrame +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.types.StructType + +private[sql] trait FileSchemaPruningTest { + _: Assertions => + + private val schemaEquality = new Equality[StructType] { +override def areEqual(a: StructType, b: Any) = + b match { +case otherType: StructType => a sameType otherType +case _ => false + } + } + + protected def checkScanSchemata(df: DataFrame, expectedSchemaCatalogStrings: String*): Unit = { --- End diff -- `checkScanSchemata` -> `checkScanSchema`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148452275 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a nested field, continue. Otherwise, +// return [[op]] +if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) { + val prunedSchema = requestedFields +.map { case (field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) + val dataSchemaFieldNames = dataSchema.fieldNames.toSet + val prunedDataSchema = +StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) + + // If the data schema is different from the pruned data schema, continue. Otherwise, + // return [[op]]. We effect this comparison by counting the number of "leaf" fields in + // each schemata, assuming the fields in [[prunedDataSchema]] are a subset of the fields + // in [[dataSchema]]. + if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { +val prunedParquetRelation = + hadoopFsRelation.copy(dataSchema = prunedDataSchema)(hadoopFsRelation.sparkSession) + +// We need to replace the expression ids of the pruned relation output attributes +// with the expression ids of the original relation output attributes so that +// references to the original relation's output are not broken +val outputIdMap = l.output.map(att => (att.name, att.exprId)).toMap +val prunedRelationOutput = + prunedParquetRelation +.schema +.toAttributes +.map { + case att if outputIdMap.contains(att.name) => +att.withExprId(outputIdMap(att.name)) + case att => att +} +val prunedRelation = + l.copy(relation = prunedParquetRelation, output = prunedRelationOutput, +catalogTable = None) --- End diff -- Why remove original `catalogTable` if any? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org Fo
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148446149 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/JoinFieldExtractionPushdown.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project} + +/** + * Pushes down aliases to [[expressions.GetStructField]] expressions in a projection over a join --- End diff -- Please describe this is specialized for pushing down pruned nested column. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148446166 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} + +/** + * Pushes down aliases to [[expressions.GetStructField]] expressions in an aggregate's grouping and --- End diff -- Please describe this is specialized for pushing down pruned nested column. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148437575 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} + +/** + * Pushes down aliases to [[expressions.GetStructField]] expressions in an aggregate's grouping and + * aggregate expressions into a projection over its children. The original + * [[expressions.GetStructField]] expressions are replaced with references to the pushed down + * aliases. + */ +object AggregateFieldExtractionPushdown extends FieldExtractionPushdown { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case agg @ Aggregate(groupingExpressions, aggregateExpressions, child) => +val expressions = groupingExpressions ++ aggregateExpressions +val attributes = AttributeSet(expressions.collect { case att: Attribute => att }) +val childAttributes = AttributeSet(child.expressions) +val fieldExtractors0 = + expressions +.flatMap(getFieldExtractors) +.distinct +val fieldExtractors1 = + fieldExtractors0 +.filter(_.collectFirst { case att: Attribute => att } + .filter(attributes.contains).isEmpty) +val fieldExtractors = + fieldExtractors1 +.filter(_.collectFirst { case att: Attribute => att } + .filter(childAttributes.contains).nonEmpty) + +if (fieldExtractors.nonEmpty) { + val (aliases, substituteAttributes) = constructAliasesAndSubstitutions(fieldExtractors) --- End diff -- We can return original plan if aliases is empty. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148434529 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that projects an expression over a given schema. Data types, + * field indexes and array lengths of complex type extractors and attributes + * are adjusted to fit the schema. All other expressions are left as-is. + */ +case class ProjectionOverSchema(schema: StructType) { --- End diff -- Can you describe clearly in comment that this is used in nested column pruning? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148446321 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/FieldExtractionPushdown.scala --- @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Alias, Expression, GetStructField} +import org.apache.spark.sql.catalyst.planning.SelectedField +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.catalyst.rules.Rule + +abstract class FieldExtractionPushdown extends Rule[LogicalPlan] { --- End diff -- nit: Does this need to be a `Rule`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148434648 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that projects an expression over a given schema. Data types, + * field indexes and array lengths of complex type extractors and attributes + * are adjusted to fit the schema. All other expressions are left as-is. + */ +case class ProjectionOverSchema(schema: StructType) { --- End diff -- `ProjectionOverPrunedSchema`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148446396 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/JoinFieldExtractionPushdown.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project} + +/** + * Pushes down aliases to [[expressions.GetStructField]] expressions in a projection over a join + * and its join condition. The original [[expressions.GetStructField]] expressions are replaced + * with references to the pushed down aliases. + */ +object JoinFieldExtractionPushdown extends FieldExtractionPushdown { --- End diff -- Oh, I just meant we may skip most of the rule if it can't be applied. Please see my comment below. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148437069 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/JoinFieldExtractionPushdown.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project} + +/** + * Pushes down aliases to [[expressions.GetStructField]] expressions in a projection over a join + * and its join condition. The original [[expressions.GetStructField]] expressions are replaced + * with references to the pushed down aliases. + */ +object JoinFieldExtractionPushdown extends FieldExtractionPushdown { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, Seq(), + join @ Join(left, right, joinType, Some(joinCondition))) => +val fieldExtractors = (projects :+ joinCondition).flatMap(getFieldExtractors).distinct + +if (fieldExtractors.nonEmpty) { + val (aliases, substituteAttributes) = constructAliasesAndSubstitutions(fieldExtractors) + + // Construct the new projections and join condition by substituting each GetStructField + // expression with a reference to its alias + val newProjects = +projects.map(substituteAttributes).collect { case named: NamedExpression => named } --- End diff -- `asInstanceOf[Seq[NamedExpression]]`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148433577 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala --- @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that projects an expression over a given schema. Data types, + * field indexes and array lengths of complex type extractors and attributes --- End diff -- Have we changed array lengths? I just saw adjusting to data type, field index. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148437549 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/JoinFieldExtractionPushdown.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project} + +/** + * Pushes down aliases to [[expressions.GetStructField]] expressions in a projection over a join + * and its join condition. The original [[expressions.GetStructField]] expressions are replaced + * with references to the pushed down aliases. + */ +object JoinFieldExtractionPushdown extends FieldExtractionPushdown { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, Seq(), + join @ Join(left, right, joinType, Some(joinCondition))) => +val fieldExtractors = (projects :+ joinCondition).flatMap(getFieldExtractors).distinct + +if (fieldExtractors.nonEmpty) { + val (aliases, substituteAttributes) = constructAliasesAndSubstitutions(fieldExtractors) --- End diff -- We can return original plan if `aliases` is empty. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148435955 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} + +/** + * Pushes down aliases to [[expressions.GetStructField]] expressions in an aggregate's grouping and + * aggregate expressions into a projection over its children. The original + * [[expressions.GetStructField]] expressions are replaced with references to the pushed down + * aliases. + */ +object AggregateFieldExtractionPushdown extends FieldExtractionPushdown { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case agg @ Aggregate(groupingExpressions, aggregateExpressions, child) => +val expressions = groupingExpressions ++ aggregateExpressions +val attributes = AttributeSet(expressions.collect { case att: Attribute => att }) +val childAttributes = AttributeSet(child.expressions) +val fieldExtractors0 = + expressions +.flatMap(getFieldExtractors) +.distinct +val fieldExtractors1 = + fieldExtractors0 +.filter(_.collectFirst { case att: Attribute => att } + .filter(attributes.contains).isEmpty) --- End diff -- If the query is: ```sql select a.b, count(1) from r1 group by a.b ``` `fieldExtractors0` gets `GetStructField` `a.b`. Won't `fieldExtractors1` filter it out, because the attribute `a` is contained in the attribute set of all expressions? But we don't need all fields of `a` now. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r148436498 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} + +/** + * Pushes down aliases to [[expressions.GetStructField]] expressions in an aggregate's grouping and + * aggregate expressions into a projection over its children. The original + * [[expressions.GetStructField]] expressions are replaced with references to the pushed down + * aliases. + */ +object AggregateFieldExtractionPushdown extends FieldExtractionPushdown { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case agg @ Aggregate(groupingExpressions, aggregateExpressions, child) => +val expressions = groupingExpressions ++ aggregateExpressions +val attributes = AttributeSet(expressions.collect { case att: Attribute => att }) +val childAttributes = AttributeSet(child.expressions) +val fieldExtractors0 = + expressions +.flatMap(getFieldExtractors) +.distinct +val fieldExtractors1 = + fieldExtractors0 +.filter(_.collectFirst { case att: Attribute => att } + .filter(attributes.contains).isEmpty) +val fieldExtractors = + fieldExtractors1 +.filter(_.collectFirst { case att: Attribute => att } + .filter(childAttributes.contains).nonEmpty) + +if (fieldExtractors.nonEmpty) { + val (aliases, substituteAttributes) = constructAliasesAndSubstitutions(fieldExtractors) + + // Construct the new grouping and aggregate expressions by substituting + // each GetStructField expression with a reference to its alias + val newAggregateExpressions = +aggregateExpressions.map(substituteAttributes) + .collect { case named: NamedExpression => named } --- End diff -- `asInstanceOf[Seq[NamedExpression]]`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r142140412 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a proper field, continue. Otherwise, +// return [[op]] +if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) { + val prunedSchema = requestedFields +.map { case (field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) --- End diff -- I've updated the logic for comparing the original schema to the pruned schema: https://github.com/apache/spark/blob/52fddc181f32726cea1dd12a23ebf7201986be01/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala#L53-L57 The following test validates that the selection order is irrelevant in comparing the pruned schema to the original schema: https://github.com/apache/spark/blob/52fddc181f32726cea1dd12a23ebf7201986be01/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala#L82-L107 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r142119164 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala --- @@ -77,20 +77,21 @@ trait QueryPlanConstraints { self: LogicalPlan => constraint match { // When the root is IsNotNull, we can push IsNotNull through the child null intolerant // expressions - case IsNotNull(expr) => scanNullIntolerantAttribute(expr).map(IsNotNull(_)) + case IsNotNull(expr) => scanNullIntolerantField(expr).map(IsNotNull(_)) // Constraints always return true for all the inputs. That means, null will never be returned. // Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull through the child // null intolerant expressions. - case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_)) + case _ => scanNullIntolerantField(constraint).map(IsNotNull(_)) --- End diff -- > I don't believe this is covered in the current unit tests, so I will add or modify a test to cover it. I added the following test to cover this case: https://github.com/apache/spark/blob/88786d3eaf9d3a3d2c80c2235c5014074ade3dc1/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala#L62-L79 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r142117188 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { + // Parquet-mr will throw an exception if we try to read a superset of the file's schema. + // Therefore, we intersect our clipped schema with the underlying file's schema --- End diff -- As I wrote below, this problem does exist with parquet-mr 1.8.2. Hence, I've reverted back to the version with the `parquetMrCompatibility`-related logic. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r140611282 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a proper field, continue. Otherwise, --- End diff -- I suggest "sub-field". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r140368054 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala --- @@ -77,20 +77,21 @@ trait QueryPlanConstraints { self: LogicalPlan => constraint match { // When the root is IsNotNull, we can push IsNotNull through the child null intolerant // expressions - case IsNotNull(expr) => scanNullIntolerantAttribute(expr).map(IsNotNull(_)) + case IsNotNull(expr) => scanNullIntolerantField(expr).map(IsNotNull(_)) // Constraints always return true for all the inputs. That means, null will never be returned. // Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull through the child // null intolerant expressions. - case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_)) + case _ => scanNullIntolerantField(constraint).map(IsNotNull(_)) --- End diff -- > Incidentally, I have noticed that I need to update the code comments where I made the changes. I will push a commit with revised comments. I've pushed the commit with the revised documentation: https://github.com/apache/spark/pull/16578/commits/38cec5f05066f75d868b64d360b04edb18dcebeb. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r140366890 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} + +/** + * Pushes down aliases to [[expressions.GetStructField]] expressions in an aggregate's grouping and + * aggregate expressions into a projection over its children. The original + * [[expressions.GetStructField]] expressions are replaced with references to the pushed down + * aliases. + */ +object AggregateFieldExtractionPushdown extends FieldExtractionPushdown { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case agg @ Aggregate(groupingExpressions, aggregateExpressions, child) => +val expressions = groupingExpressions ++ aggregateExpressions +val attributes = AttributeSet(expressions.collect { case att: Attribute => att }) +val childAttributes = AttributeSet(child.expressions) +val fieldExtractors0 = + expressions +.flatMap(getFieldExtractors) +.distinct +val fieldExtractors1 = + fieldExtractors0 +.filter(_.collectFirst { case att: Attribute => att } + .filter(attributes.contains).isEmpty) --- End diff -- > Why do we need to trim the extractors which contain attributes referred from `groupingExpressions ++ aggregateExpressions`? Consider this query: ```sql select a, a.b, count(1) from r1 group by a, a.b ``` The grouping fields are `a` and `a.b`. `a` is an `Attribute`. `a.b` is a `GetStructField`. Since we need all of `a` to answer this query, it doesn't make sense to attempt to push down `a.b`. At the same time, `fieldExtractors0` includes all `GetStructField` instances. This includes `a.b`. The code you refer to above filters out the `a.b` `GetStructField` because our query requires all of `a`. If we do not filter out `a.b`, then the child (projection) of the new `Aggregate` will not contain `a` in its `output`. The query planner will barf. The logic for the creation of the new child projection is here: https://github.com/apache/spark/blob/00ab80c9b78c45c1a8f8c202c5bab04a62cda2ef/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala#L63-L70 This case is tested by https://github.com/apache/spark/blob/00ab80c9b78c45c1a8f8c202c5bab04a62cda2ef/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdownSuite.scala#L60-L76 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r140358379 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { + // Parquet-mr will throw an exception if we try to read a superset of the file's schema. + // Therefore, we intersect our clipped schema with the underlying file's schema --- End diff -- I had to go back in time to figure this one out. I wrote this code for parquet-mr 1.7.0, and indeed this code is necessary for that version of the library as validated by a test in `ParquetSchemaPruningSuite`. However, parquet-mr version 1.8.2 no longer has this limitation/bug. Therefore, I will remove the `parquetMrCompatibility`-related logic from this file. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r140357267 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/JoinFieldExtractionPushdown.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project} + +/** + * Pushes down aliases to [[expressions.GetStructField]] expressions in a projection over a join + * and its join condition. The original [[expressions.GetStructField]] expressions are replaced + * with references to the pushed down aliases. + */ +object JoinFieldExtractionPushdown extends FieldExtractionPushdown { --- End diff -- I'm not sure I know what you mean. This test https://github.com/apache/spark/blob/38cec5f05066f75d868b64d360b04edb18dcebeb/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinFieldExtractionPushdownSuite.scala#L53-L61 checks that `JoinFieldExtractionPushdown` does not modify a join not involving nested fields. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r140351256 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { + // Parquet-mr will throw an exception if we try to read a superset of the file's schema. + // Therefore, we intersect our clipped schema with the underlying file's schema + ParquetReadSupport.intersectParquetGroups(clippedParquetSchema, context.getFileSchema) +.map(intersectionGroup => + new MessageType(intersectionGroup.getName, intersectionGroup.getFields)) +.getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE) +} else { + // Spark's built-in Parquet reader will throw an exception in some cases if the requested + // schema is not the same as the clipped schema --- End diff -- We do use it in the vectorized reader, but it's not easy to see. It's instantiated here: https://github.com/apache/spark/blob/38cec5f05066f75d868b64d360b04edb18dcebeb/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/SpecificParquetRecordReaderBase.java#L141 via reflection. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r140338751 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala --- @@ -77,20 +77,21 @@ trait QueryPlanConstraints { self: LogicalPlan => constraint match { // When the root is IsNotNull, we can push IsNotNull through the child null intolerant // expressions - case IsNotNull(expr) => scanNullIntolerantAttribute(expr).map(IsNotNull(_)) + case IsNotNull(expr) => scanNullIntolerantField(expr).map(IsNotNull(_)) // Constraints always return true for all the inputs. That means, null will never be returned. // Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull through the child // null intolerant expressions. - case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_)) + case _ => scanNullIntolerantField(constraint).map(IsNotNull(_)) --- End diff -- Incidentally, I have noticed that I need to update the code comments where I made the changes. I will push a commit with revised comments. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r140338601 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala --- @@ -77,20 +77,21 @@ trait QueryPlanConstraints { self: LogicalPlan => constraint match { // When the root is IsNotNull, we can push IsNotNull through the child null intolerant // expressions - case IsNotNull(expr) => scanNullIntolerantAttribute(expr).map(IsNotNull(_)) + case IsNotNull(expr) => scanNullIntolerantField(expr).map(IsNotNull(_)) // Constraints always return true for all the inputs. That means, null will never be returned. // Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull through the child // null intolerant expressions. - case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_)) + case _ => scanNullIntolerantField(constraint).map(IsNotNull(_)) --- End diff -- I think the best way to explain is through an example. Suppose we have a Parquet file with two string columns: `name.first` and `name.last`. We'll register this file as a temp view with name `people`. Now consider the following query: ```sql select name.first from people where name.first = 'Michael' ``` The only column we need to read from the file to answer this query is `name.first`. Without the change to `QueryPlanConstraints.scala`, the optimized plan will look like: ``` Project [name#19.first AS first#30] +- Filter (isnotnull(name#19) && (name#19.first = Michael)) +- Relation[name#19] parquet ``` Evaluating `isnotnull(name#19)` requires the entire name struct. So both `name.first` and `name.last` will be read. With the change, the optimized pln will look like: ``` Project [name#19.first AS first#30] +- Filter (isnotnull(name#19.first) && (name#19.first = Michael)) +- Relation[name#19] parquet ``` Now the `isnotnull` expression requires only the `name.first` column. The `name.last` column will not be read. I don't believe this is covered in the current unit tests, so I will add or modify a test to cover it. Note that in either case the `LogicalRelation` is shown as `Relation[name#19] parquet`, which is confusing. The string output for a logical relation returns attributes, not fields. Perhaps we should do something about that? So in the latter case it would print `Relation[name#19.first] parquet` instead. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r140226905 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a proper field, continue. Otherwise, +// return [[op]] +if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) { + val prunedSchema = requestedFields +.map { case (field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) --- End diff -- > Can the merging order of the fields affect the comparison later in line 57? Not the merging order, no. However the selection order can, leading to unnecessary "pruning". I will address this issue and add a test case to verify it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r139346525 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { + // Parquet-mr will throw an exception if we try to read a superset of the file's schema. + // Therefore, we intersect our clipped schema with the underlying file's schema --- End diff -- Is the exception only thrown when we read a superset of fields for a nested column? E.g., there is a struct of two fields "a" and "b", and we try to read "c". --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r139337054 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdown.scala --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression} +import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan, Project} + +/** + * Pushes down aliases to [[expressions.GetStructField]] expressions in an aggregate's grouping and + * aggregate expressions into a projection over its children. The original + * [[expressions.GetStructField]] expressions are replaced with references to the pushed down + * aliases. + */ +object AggregateFieldExtractionPushdown extends FieldExtractionPushdown { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case agg @ Aggregate(groupingExpressions, aggregateExpressions, child) => +val expressions = groupingExpressions ++ aggregateExpressions +val attributes = AttributeSet(expressions.collect { case att: Attribute => att }) +val childAttributes = AttributeSet(child.expressions) +val fieldExtractors0 = + expressions +.flatMap(getFieldExtractors) +.distinct +val fieldExtractors1 = + fieldExtractors0 +.filter(_.collectFirst { case att: Attribute => att } + .filter(attributes.contains).isEmpty) --- End diff -- Why do we need to trim the extractors which contain attributes referred from `groupingExpressions ++ aggregateExpressions`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r139336399 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/JoinFieldExtractionPushdown.scala --- @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeSet, NamedExpression} +import org.apache.spark.sql.catalyst.planning.PhysicalOperation +import org.apache.spark.sql.catalyst.plans.logical.{Join, LogicalPlan, Project} + +/** + * Pushes down aliases to [[expressions.GetStructField]] expressions in a projection over a join + * and its join condition. The original [[expressions.GetStructField]] expressions are replaced + * with references to the pushed down aliases. + */ +object JoinFieldExtractionPushdown extends FieldExtractionPushdown { --- End diff -- This applies generally. But for the non-nested column pruning cases, this seems a burden and making the query plan complicated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r139333125 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a proper field, continue. Otherwise, +// return [[op]] +if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) { + val prunedSchema = requestedFields +.map { case (field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) --- End diff -- We should add related tests for such cases. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r139331293 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { + // Parquet-mr will throw an exception if we try to read a superset of the file's schema. + // Therefore, we intersect our clipped schema with the underlying file's schema --- End diff -- `clipParquetSchema` can add column paths only exist in catalyst schema into parquet schema. I think those columns are useful but looks like the `intersectParquetGroups` will remove them again? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r139331198 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -63,9 +74,22 @@ private[parquet] class ParquetReadSupport extends ReadSupport[UnsafeRow] with Lo StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { + // Parquet-mr will throw an exception if we try to read a superset of the file's schema. + // Therefore, we intersect our clipped schema with the underlying file's schema + ParquetReadSupport.intersectParquetGroups(clippedParquetSchema, context.getFileSchema) +.map(intersectionGroup => + new MessageType(intersectionGroup.getName, intersectionGroup.getFields)) +.getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE) +} else { + // Spark's built-in Parquet reader will throw an exception in some cases if the requested + // schema is not the same as the clipped schema --- End diff -- I think the built-in Parquet reader means vectorized reader. But I think we don't use `ParquetReadSupport` in vectorized reader? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r139312657 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala --- @@ -77,20 +77,21 @@ trait QueryPlanConstraints { self: LogicalPlan => constraint match { // When the root is IsNotNull, we can push IsNotNull through the child null intolerant // expressions - case IsNotNull(expr) => scanNullIntolerantAttribute(expr).map(IsNotNull(_)) + case IsNotNull(expr) => scanNullIntolerantField(expr).map(IsNotNull(_)) // Constraints always return true for all the inputs. That means, null will never be returned. // Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull through the child // null intolerant expressions. - case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_)) + case _ => scanNullIntolerantField(constraint).map(IsNotNull(_)) --- End diff -- Previously IsNotNull constraints are Attribute-specific, why do we need to expand it to `ExtractValue`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user vkhristenko commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r139312613 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a proper field, continue. Otherwise, +// return [[op]] +if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) { + val prunedSchema = requestedFields +.map { case (field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) --- End diff -- @viirya @mallman If I may add here, given a schema: ``` root | - a: StructType || - f1: Int || - f2: Int ``` and a selection `df.select("a.f1", "a.f2")` vs `df.select("a.f2", "a.f1")` will produce different requiredSchema fed into buildReader upon some action. In the first case it will be `StructType( StructField( "a", StructType(StructField("f1") :: StructField("f2") :: Nil)) :: Nil)` and in the second `StructType( StructField( "a", StructType(StructField("f2") :: StructField("f1") :: Nil)) :: Nil)` which means that the original schema is different from the one that is required for the second case. But as long as fields f1 and f2 are splitted (can be read without reading the other), it's remappable on the data source level. VK --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r139312181 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a proper field, continue. Otherwise, +// return [[op]] +if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) { + val prunedSchema = requestedFields +.map { case (field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) --- End diff -- Can the merging order of the fields affect the comparison later in line 57? For example, depending on the merging order, if we require `a.field1` and `a.field2`, is it possibly we may have `StructType(StructField("a", StructType(StructField("field1") :: StructField("field2") :: Nil)) :: Nil)` or `StructType(StructField("a", StructType(StructField("field2") :: StructField("field1") :: Nil)) :: Nil)` as merged schema? Once we have the later, and the data schema is the first, at line 57 we may think they are different? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r139311738 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a proper field, continue. Otherwise, --- End diff -- `proper field` sounds vague. Maybe more specified like `nested columns`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r139311726 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a proper field, continue. Otherwise, +// return [[op]] +if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) { + val prunedSchema = requestedFields +.map { case (field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) + val dataSchemaFieldNames = dataSchema.fieldNames + val prunedDataSchema = +StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) + val parquetDataFields = dataSchema.fields.toSet + val prunedDataFields = prunedDataSchema.fields.toSet + + // If the original Parquet relation data fields are different from the + // pruned data fields, continue. Otherwise, return [[op]] + if (parquetDataFields != prunedDataFields) { +val prunedParquetRelation = + hadoopFsRelation.copy(dataSchema = prunedDataSchema)(hadoopFsRelation.sparkSession) + +// We need to replace the expression ids of the pruned relation output attributes +// with the expression ids of the original relation output attributes so that +// references to the original relation's output are not broken +val outputIdMap = l.output.map(att => (att.name, att.exprId)).toMap +val prunedRelationOutput = + prunedParquetRelation +.schema +.toAttributes +.map { + case att if outputIdMap.contains(att.name) => +att.withExprId(outputIdMap(att.name)) + case att => att +} +val prunedRelation = + LogicalRelation(prunedParquetRelation, prunedRelationOutput, None) + +val projectionOverSchema = ProjectionOverSchema(prunedDataSchema) + +// Construct a new target for our projection by rewriting and +// including the original filters where available +val projectionChild = + if (filters.nonEmpty) { +val projectedFilters = filte
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r139311391 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SelectedField.scala --- @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that builds a [[org.apache.spark.sql.types.StructField]] from a Catalyst + * complex type extractor. For example, consider a relation with the following schema: + * + * {{{ + * root + *|-- name: struct (nullable = true) + *||-- first: string (nullable = true) + *||-- last: string (nullable = true) + *}}} + * + * Further, suppose we take the select expression `name.first`. This will parse into an + * `Alias(child, "first")`. Ignoring the alias, `child` matches the following pattern: + * + * {{{ + * GetStructFieldObject( + * AttributeReference("name", StructType(_), _, _), + * StructField("first", StringType, _, _)) + * }}} + * + * [[SelectedField]] converts that expression into + * + * {{{ + * StructField("name", StructType(Array(StructField("first", StringType + * }}} + * + * by mapping each complex type extractor to a [[org.apache.spark.sql.types.StructField]] with the + * same name as its child (or "parent" going right to left in the select expression) and a data + * type appropriate to the complex type extractor. In our example, the name of the child expression + * is "name" and its data type is a [[org.apache.spark.sql.types.StructType]] with a single string + * field named "first". + * + * @param expr the top-level complex type extractor + */ +object SelectedField { + def unapply(expr: Expression): Option[StructField] = { +// If this expression is an alias, work on its child instead +val unaliased = expr match { + case Alias(child, _) => child + case expr => expr +} +selectField(unaliased, None) + } + + private def selectField(expr: Expression, fieldOpt: Option[StructField]): Option[StructField] = { +expr match { + // No children. Returns a StructField with the attribute name or None if fieldOpt is None. + case AttributeReference(name, dataType, nullable, metadata) => +fieldOpt.map(field => + StructField(name, wrapStructType(dataType, field), nullable, metadata)) + // Handles case "expr0.field[n]", where "expr0" is of struct type and "expr0.field" is of + // array type. + case GetArrayItem(x @ GetStructFieldObject(child, field @ StructField(name, + dataType, nullable, metadata)), _) => +val childField = fieldOpt.map(field => StructField(name, + wrapStructType(dataType, field), nullable, metadata)).getOrElse(field) +selectField(child, Some(childField)) + // Handles case "expr0.field[n]", where "expr0.field" is of array type. + case GetArrayItem(child, _) => +selectField(child, fieldOpt) + // Handles case "expr0.field.subfield", where "expr0" and "expr0.field" are of array type. + case GetArrayStructFields(child: GetArrayStructFields, + field @ StructField(name, dataType, nullable, metadata), _, _, _) => +val childField = fieldOpt.map(field => StructField(name, +wrapStructType(dataType, field), +nullable, metadata)).getOrElse(field) +selectField(child, Some(childField)) + // Handles case "expr0.field", where "expr0" is of array type. + case GetArrayStructFields(child, + field @ StructField(name, dataType, nullable, metadata), _, _, containsNull) => +val childField = + fieldOpt.map(field => StructField(name, +
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r100360523 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/GetStructField2.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField} +import org.apache.spark.sql.types.StructField + +/** + * A Scala extractor that extracts the child expression and struct field from a [[GetStructField]]. + * This is in contrast to the [[GetStructField]] case class extractor which returns the field + * ordinal instead of the field itself. + */ +private[planning] object GetStructField2 { --- End diff -- Let's go with `GetStructFieldObject`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r100232773 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/GetStructField2.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField} +import org.apache.spark.sql.types.StructField + +/** + * A Scala extractor that extracts the child expression and struct field from a [[GetStructField]]. + * This is in contrast to the [[GetStructField]] case class extractor which returns the field + * ordinal instead of the field itself. + */ +private[planning] object GetStructField2 { --- End diff -- `GetStructFieldObject` or `GetStructFieldExtractor`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r100229358 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/GetStructField2.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField} +import org.apache.spark.sql.types.StructField + +/** + * A Scala extractor that extracts the child expression and struct field from a [[GetStructField]]. + * This is in contrast to the [[GetStructField]] case class extractor which returns the field + * ordinal instead of the field itself. + */ +private[planning] object GetStructField2 { --- End diff -- How about `GetStructFieldObject`? Or `GetStructFieldRef`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r100229300 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SelectedField.scala --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that builds a [[StructField]] from a Catalyst complex type + * extractor. This is like the opposite of [[ExtractValue#apply]]. + */ +object SelectedField { + def unapply(expr: Expression): Option[StructField] = { +// If this expression is an alias, work on its child instead +val unaliased = expr match { + case Alias(child, _) => child + case expr => expr +} +selectField(unaliased, None) + } + + /** + * Converts some chain of complex type extractors into a [[StructField]]. + * + * @param expr the top-level complex type extractor + * @param fieldOpt the subfield of [[expr]], where relevent + */ + private def selectField(expr: Expression, fieldOpt: Option[StructField]): Option[StructField] = +expr match { + case AttributeReference(name, _, nullable, _) => +fieldOpt.map(field => StructField(name, StructType(Array(field)), nullable)) + case GetArrayItem(GetStructField2(child, field @ StructField(name, + ArrayType(_, arrayNullable), fieldNullable, _)), _) => +val childField = fieldOpt.map(field => StructField(name, ArrayType( + StructType(Array(field)), arrayNullable), fieldNullable)).getOrElse(field) +selectField(child, Some(childField)) + case GetArrayStructFields(child, --- End diff -- I've spent some time this week developing a few different solutions to this problem, however none of them are very easy to understand or verify. I'm going to spend some more time working on a simpler solution before posting something back. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r99174674 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SelectedField.scala --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that builds a [[StructField]] from a Catalyst complex type + * extractor. This is like the opposite of [[ExtractValue#apply]]. + */ +object SelectedField { + def unapply(expr: Expression): Option[StructField] = { +// If this expression is an alias, work on its child instead +val unaliased = expr match { + case Alias(child, _) => child + case expr => expr +} +selectField(unaliased, None) + } + + /** + * Converts some chain of complex type extractors into a [[StructField]]. + * + * @param expr the top-level complex type extractor + * @param fieldOpt the subfield of [[expr]], where relevent + */ + private def selectField(expr: Expression, fieldOpt: Option[StructField]): Option[StructField] = +expr match { + case AttributeReference(name, _, nullable, _) => +fieldOpt.map(field => StructField(name, StructType(Array(field)), nullable)) + case GetArrayItem(GetStructField2(child, field @ StructField(name, + ArrayType(_, arrayNullable), fieldNullable, _)), _) => +val childField = fieldOpt.map(field => StructField(name, ArrayType( + StructType(Array(field)), arrayNullable), fieldNullable)).getOrElse(field) +selectField(child, Some(childField)) + case GetArrayStructFields(child, --- End diff -- I believe I have a fix for this, but I probably won't be able to post a new commit until early next weekâI'm working on a proposal for the Spark Summit RFP. Cheers. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r98920657 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/GetStructField2.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField} +import org.apache.spark.sql.types.StructField + +/** + * A Scala extractor that extracts the child expression and struct field from a [[GetStructField]]. + * This is in contrast to the [[GetStructField]] case class extractor which returns the field + * ordinal instead of the field itself. + */ +private[planning] object GetStructField2 { --- End diff -- Agreed. I think the best name in this context is `GetStructField`, but that's already taken. I'll keep thinking about a good alternative. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r98859135 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SelectedField.scala --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that builds a [[StructField]] from a Catalyst complex type + * extractor. This is like the opposite of [[ExtractValue#apply]]. + */ +object SelectedField { + def unapply(expr: Expression): Option[StructField] = { +// If this expression is an alias, work on its child instead +val unaliased = expr match { + case Alias(child, _) => child + case expr => expr +} +selectField(unaliased, None) + } + + /** + * Converts some chain of complex type extractors into a [[StructField]]. + * + * @param expr the top-level complex type extractor + * @param fieldOpt the subfield of [[expr]], where relevent + */ + private def selectField(expr: Expression, fieldOpt: Option[StructField]): Option[StructField] = +expr match { + case AttributeReference(name, _, nullable, _) => +fieldOpt.map(field => StructField(name, StructType(Array(field)), nullable)) + case GetArrayItem(GetStructField2(child, field @ StructField(name, + ArrayType(_, arrayNullable), fieldNullable, _)), _) => +val childField = fieldOpt.map(field => StructField(name, ArrayType( + StructType(Array(field)), arrayNullable), fieldNullable)).getOrElse(field) +selectField(child, Some(childField)) + case GetArrayStructFields(child, --- End diff -- If we have a chain of `GetArrayStructFields`, looks like this will produce wrong result. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r98845999 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a proper field, continue. Otherwise, +// return [[op]] +if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) { + val prunedSchema = requestedFields +.map { case (field, _) => field } +.map(field => StructType(Array(field))) +.reduceLeft(_ merge _) + val parquetDataColumnNames = dataSchema.fieldNames + val prunedDataSchema = +StructType(prunedSchema.filter(f => parquetDataColumnNames.contains(f.name))) + val parquetDataFields = dataSchema.fields.toSet + val prunedDataFields = prunedDataSchema.fields.toSet + + // If the original Parquet relation data fields are different from the + // pruned data fields, continue. Otherwise, return [[op]] + if (parquetDataFields != prunedDataFields) { +val dataSchemaFieldNames = hadoopFsRelation.dataSchema.fieldNames +val newDataSchema = + StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) +val prunedParquetRelation = + hadoopFsRelation.copy(dataSchema = newDataSchema)(hadoopFsRelation.sparkSession) +val outputMap = l.output.map(att => (att.name, att)).toMap + +// We need to map the output of the original logical relation +// to the attributes of the pruned parquet schema where +// possible so that references to those attributes elsewhere in +// the query plan are not broken +val expectedOutputAttributes = + prunedParquetRelation +.schema +.toAttributes.map(att => outputMap.getOrElse(att.name, att)) +val prunedRelation = + LogicalRelation(prunedParquetRelation, Some(expectedOutputAttributes)) + +val projectionOverSchema = ProjectionOverSchema(prunedDataSchema) + +// Construct a new target for our projection by rewriting and +// including the original filters where available +val projectionChild = +
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r98845200 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a proper field, continue. Otherwise, +// return [[op]] +if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) { + val prunedSchema = requestedFields +.map { case (field, _) => field } +.map(field => StructType(Array(field))) +.reduceLeft(_ merge _) + val parquetDataColumnNames = dataSchema.fieldNames + val prunedDataSchema = +StructType(prunedSchema.filter(f => parquetDataColumnNames.contains(f.name))) + val parquetDataFields = dataSchema.fields.toSet + val prunedDataFields = prunedDataSchema.fields.toSet + + // If the original Parquet relation data fields are different from the + // pruned data fields, continue. Otherwise, return [[op]] + if (parquetDataFields != prunedDataFields) { +val dataSchemaFieldNames = hadoopFsRelation.dataSchema.fieldNames +val newDataSchema = + StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) --- End diff -- Is `newDataSchema` actually `prunedDataSchema`, if `dataSchemaFieldNames` is `parquetDataColumnNames`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r98844948 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a proper field, continue. Otherwise, +// return [[op]] +if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) { + val prunedSchema = requestedFields +.map { case (field, _) => field } +.map(field => StructType(Array(field))) +.reduceLeft(_ merge _) + val parquetDataColumnNames = dataSchema.fieldNames + val prunedDataSchema = +StructType(prunedSchema.filter(f => parquetDataColumnNames.contains(f.name))) + val parquetDataFields = dataSchema.fields.toSet + val prunedDataFields = prunedDataSchema.fields.toSet + + // If the original Parquet relation data fields are different from the + // pruned data fields, continue. Otherwise, return [[op]] + if (parquetDataFields != prunedDataFields) { +val dataSchemaFieldNames = hadoopFsRelation.dataSchema.fieldNames --- End diff -- I think `dataSchemaFieldNames` is just `parquetDataColumnNames` above? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r98844304 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.types.{StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a proper field, continue. Otherwise, +// return [[op]] +if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) { + val prunedSchema = requestedFields +.map { case (field, _) => field } --- End diff -- Those two `map` can be done one `map`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r98843749 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SelectedField.scala --- @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that builds a [[StructField]] from a Catalyst complex type + * extractor. This is like the opposite of [[ExtractValue#apply]]. + */ +object SelectedField { + def unapply(expr: Expression): Option[StructField] = { +// If this expression is an alias, work on its child instead +val unaliased = expr match { + case Alias(child, _) => child + case expr => expr +} +selectField(unaliased, None) + } + + /** + * Converts some chain of complex type extractors into a [[StructField]]. --- End diff -- It is better to add few example input and output in the comment. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r98833772 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/GetStructField2.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField} +import org.apache.spark.sql.types.StructField + +/** + * A Scala extractor that extracts the child expression and struct field from a [[GetStructField]]. + * This is in contrast to the [[GetStructField]] case class extractor which returns the field + * ordinal instead of the field itself. + */ +private[planning] object GetStructField2 { --- End diff -- But we can have a better name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r98833717 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/GetStructField2.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField} +import org.apache.spark.sql.types.StructField + +/** + * A Scala extractor that extracts the child expression and struct field from a [[GetStructField]]. + * This is in contrast to the [[GetStructField]] case class extractor which returns the field + * ordinal instead of the field itself. + */ +private[planning] object GetStructField2 { --- End diff -- oh, nvm, I thought `GetStructField` is another Scala extractor. Actually it is a case class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r98819150 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/GetStructField2.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField} +import org.apache.spark.sql.types.StructField + +/** + * A Scala extractor that extracts the child expression and struct field from a [[GetStructField]]. + * This is in contrast to the [[GetStructField]] case class extractor which returns the field + * ordinal instead of the field itself. + */ +private[planning] object GetStructField2 { --- End diff -- What do you mean by combining it with the existing case class extractor? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r98809992 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/GetStructField2.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField} +import org.apache.spark.sql.types.StructField + +/** + * A Scala extractor that extracts the child expression and struct field from a [[GetStructField]]. + * This is in contrast to the [[GetStructField]] case class extractor which returns the field + * ordinal instead of the field itself. + */ +private[planning] object GetStructField2 { --- End diff -- Or combine it with `GetStructField` case class extractor? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r98809770 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/GetStructField2.scala --- @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions.{Expression, GetStructField} +import org.apache.spark.sql.types.StructField + +/** + * A Scala extractor that extracts the child expression and struct field from a [[GetStructField]]. + * This is in contrast to the [[GetStructField]] case class extractor which returns the field + * ordinal instead of the field itself. + */ +private[planning] object GetStructField2 { --- End diff -- Can we have a better name for this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/16578#discussion_r96048060 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/AggregateFieldExtractionPushdownSuite.scala --- @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.optimizer + +import org.apache.spark.sql.catalyst.dsl.expressions._ +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.aggregate.Count +import org.apache.spark.sql.catalyst.plans.PlanTest +import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.rules.RuleExecutor +import org.apache.spark.sql.types._ + +class AggregateFieldExtractionPushdownSuite extends PlanTest { + private val testRelation = +LocalRelation( + StructField("a", StructType( +StructField("a1", IntegerType) :: Nil)), + StructField("b", IntegerType), + StructField("c", StructType( +StructField("c1", IntegerType) :: Nil))) + + object Optimizer extends RuleExecutor[LogicalPlan] { +val batches = + Batch("Aggregate Field Extraction Pushdown", Once, +AggregateFieldExtractionPushdown) :: Nil + } + + test("basic aggregate field extraction pushdown") { +val originalQuery = + testRelation +.select('a) +.groupBy('a getField "a1")('a getField "a1" as 'a1, Count('*)) --- End diff -- replace it by `Count("*")`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #16578: [SPARK-4502][SQL] Parquet nested column pruning
GitHub user mallman opened a pull request: https://github.com/apache/spark/pull/16578 [SPARK-4502][SQL] Parquet nested column pruning (Link to Jira: https://issues.apache.org/jira/browse/SPARK-4502) ## What changes were proposed in this pull request? One of the hallmarks of a column-oriented data storage format is the ability to read data from a subset of columns, efficiently skipping reads from other columns. Spark has long had support for pruning unneeded top-level schema fields from the scan of a parquet file. For example, consider a table, `contacts`, backed by parquet with the following Spark SQL schema: ``` root |-- name: struct ||-- first: string ||-- last: string |-- address: string ``` Parquet stores this table's data in three physical columns: `name.first`, `name.last` and `address`. To answer the query ```SQL select address from contacts ``` Spark will read only from the `address` column of parquet data. However, to answer the query ```SQL select name.first from contacts ``` Spark will read `name.first` and `name.last` from parquet. This PR modifies Spark SQL to support a finer-grain of schema pruning. With this patch, Spark reads only the `name.first` column to answer the previous query. ### Implementation There are three main components of this patch. First, there is a `ParquetSchemaPruning` optimizer rule for gathering the required schema fields of a `PhysicalOperation` over a parquet file, constructing a new schema based on those required fields and rewriting the plan in terms of that pruned schema. The pruned schema fields are pushed down to the parquet requested read schema. `ParquetSchemaPruning` uses a new `ProjectionOverSchema` extractor for rewriting a catalyst expression in terms of a pruned schema. Second, the `ParquetRowConverter` has been patched to ensure the ordinals of the parquet columns read are correct for the pruned schema. `ParquetReadSupport` has been patched to address a compatibility mismatch between Spark's built in vectorized reader and the parquet-mr library's reader. Third, we introduce two new catalyst query transformations, `AggregateFieldExtractionPushdown` and `JoinFieldExtractionPushdown`, to support schema pruning in aggregation and join query plans. These rules extract field references in aggregations and joins respectively, push down aliases to those references and replace them with references to the pushed down aliases. They use a new `SelectedField` extractor that transforms a catalyst complex type extractor (the "selected field") into a corresponding `StructField`. ### Performance The performance difference in executing queries with this patch compared to master is related to the depth of the table schema and the query itself. At VideoAmp, one of our biggest tables stores OpenRTB bid requests we receive from our exchange partners. Our bid request table's schema closely follows the OpenRTB bid request object schema. Additionally, when we bid we save our response along with the request in the same table. We store these two objects as two top-level fields in our table. Therefore, all bid request and response data are contained within nested fields. For the purposes of measuring the performance impact of this patch, we ran some queries on our bid request table with the un-patched and patched master. We measured query execution time and the amount of data read from the underlying parquet files. I'll focus on a couple of benchmarks. (All benchmarks were run on an AWS EC2 cluster with four c3.8xl workers.) The first query I'll highlight is ```SQL select count(request.device.ip) from event.bid_request where ds=20161128 and h=0 ``` (Hopefully it's obvious what this query means.) On the un-patched master, this query ran in 2.7 minutes and read 34.3 GB of data. On the patched master, this query ran in 4 seconds and read 987.3 MB of data. We also ran a reporting-oriented query benchmark. I won't reproduce the query here, but it reads a larger subset of the bid request fields and joins against another table with a deeply nested schema. In addition to a join, we perform several aggregations in this query. On the un-patched master, this query ran in 3.4 minutes and read 34.6 GB of data. On the patched master, this query ran in 59 seconds and read 2.6 GB of data. ### Limitation Among the complex Spark SQL data types, this patch supports parquet column pruning of nested sequences of struct fields only. ## How was this patch tested? Care has been taken to ensure correctness and prevent regressions. This patch introduces over two dozen new unit tests and has been running on a production Spark 1.5 cluster at VideoAmp for about a year. In that time, one