[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21320 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r212476709 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -31,6 +32,7 @@ import org.apache.spark.sql.types.StructType class ParquetSchemaPruningSuite extends QueryTest with ParquetTest +with SchemaPruningTest --- End diff -- nvm. This commit just adds it back --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r212476400 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -31,6 +32,7 @@ import org.apache.spark.sql.types.StructType class ParquetSchemaPruningSuite extends QueryTest with ParquetTest +with SchemaPruningTest --- End diff -- In my local, all of them still can pass. Am I wrong? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r212476268 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -31,6 +32,7 @@ import org.apache.spark.sql.types.StructType class ParquetSchemaPruningSuite extends QueryTest with ParquetTest +with SchemaPruningTest --- End diff -- Why this is removed? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r212414888 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 -while (i < currentRow.numFields) { +while (i < fieldConverters.length) { fieldConverters(i).updater.start() currentRow.setNullAt(i) i += 1 } +while (i < currentRow.numFields) { --- End diff -- My suggestion is to remove the changes from this file at first. We can handle the ignored cases in the follow-up PRs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r212396370 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 -while (i < currentRow.numFields) { +while (i < fieldConverters.length) { fieldConverters(i).updater.start() currentRow.setNullAt(i) i += 1 } +while (i < currentRow.numFields) { --- End diff -- These changes are related to my fix for the ignored unit test. If I apply my fix but keep the master version of this file, 24 unit tests fail. If I apply my fix along with this file diff then all tests pass, including the test that is currently ignored. I'm not sure I can develop a unit test for this current commit that should pass but will fail without this file's changes. I haven't spent any time thinking about it, and I really need to work on other things right now. If you want I will back out this change. However, I will re-incorporate it in a follow-on PR. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r212388958 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 -while (i < currentRow.numFields) { +while (i < fieldConverters.length) { fieldConverters(i).updater.start() currentRow.setNullAt(i) i += 1 } +while (i < currentRow.numFields) { --- End diff -- I'll get back to you on this shortly. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r212194825 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter( override def start(): Unit = { var i = 0 -while (i < currentRow.numFields) { +while (i < fieldConverters.length) { fieldConverters(i).updater.start() currentRow.setNullAt(i) i += 1 } +while (i < currentRow.numFields) { --- End diff -- @mallman It sounds like the changes in this file are not needed. Could you help me point out which test cases will fail? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r205340696 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +if (SQLConf.get.nestedSchemaPruningEnabled) { + apply0(plan) +} else { + plan +} + + private def apply0(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, --- End diff -- Also honestly this logic looks convoluted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user ajacques commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r205329769 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SelectedField.scala --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that builds a [[org.apache.spark.sql.types.StructField]] from a Catalyst + * complex type extractor. For example, consider a relation with the following schema: + * + * {{{ + * root + *|-- name: struct (nullable = true) + *||-- first: string (nullable = true) + *||-- last: string (nullable = true) + *}}} + * + * Further, suppose we take the select expression `name.first`. This will parse into an + * `Alias(child, "first")`. Ignoring the alias, `child` matches the following pattern: + * + * {{{ + * GetStructFieldObject( + * AttributeReference("name", StructType(_), _, _), + * StructField("first", StringType, _, _)) + * }}} + * + * [[SelectedField]] converts that expression into + * + * {{{ + * StructField("name", StructType(Array(StructField("first", StringType + * }}} + * + * by mapping each complex type extractor to a [[org.apache.spark.sql.types.StructField]] with the + * same name as its child (or "parent" going right to left in the select expression) and a data + * type appropriate to the complex type extractor. In our example, the name of the child expression + * is "name" and its data type is a [[org.apache.spark.sql.types.StructType]] with a single string + * field named "first". + * + * @param expr the top-level complex type extractor + */ +object SelectedField { + def unapply(expr: Expression): Option[StructField] = { --- End diff -- ``` Error:(61, 12) constructor cannot be instantiated to expected type; found : org.apache.spark.sql.catalyst.expressions.Alias required: org.apache.spark.sql.catalyst.expressions.ExtractValue case Alias(child, _) => child ``` Alias takes: `Alias(child: Expression, name: String)` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user ajacques commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r205329633 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that projects an expression over a given schema. Data types, + * field indexes and field counts of complex type extractors and attributes + * are adjusted to fit the schema. All other expressions are left as-is. This + * class is motivated by columnar nested schema pruning. + */ +case class ProjectionOverSchema(schema: StructType) { --- End diff -- We can move this to `sql.execution` if we move all three classes: `ProjectionOverSchema`, `GetStructFieldObject`, and `SelectedField`. Is there a difference in the catalyst.planning vs the execution packages? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r205063684 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that projects an expression over a given schema. Data types, + * field indexes and field counts of complex type extractors and attributes + * are adjusted to fit the schema. All other expressions are left as-is. This + * class is motivated by columnar nested schema pruning. + */ +case class ProjectionOverSchema(schema: StructType) { --- End diff -- can we move it to `catalyst`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r205022974 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.types._ + +// scalastyle:off line.size.limit +class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll { + // The test schema as a tree string, i.e. `schema.treeString` + // root + // |-- col1: string (nullable = false) + // |-- col2: struct (nullable = true) + // ||-- field1: integer (nullable = true) + // ||-- field2: array (nullable = true) + // |||-- element: integer (containsNull = false) + // ||-- field3: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: integer (nullable = true) + // ||||-- subfield3: array (nullable = true) + // |||||-- element: integer (containsNull = true) + // ||-- field4: map (nullable = true) + // |||-- key: string + // |||-- value: struct (valueContainsNull = false) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: array (nullable = true) + // |||||-- element: integer (containsNull = false) + // ||-- field5: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: struct (nullable = false) + // |||||-- subsubfield1: integer (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||||-- subfield2: struct (nullable = true) + // |||||-- subsubfield1: struct (nullable = true) + // ||||||-- subsubsubfield1: string (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||-- field6: struct (nullable = true) + // |||-- subfield1: string (nullable = false) + // |||-- subfield2: string (nullable = true) + // ||-- field7: struct (nullable = true) + // |||-- subfield1: struct (nullable = true) + // ||||-- subsubfield1: integer (nullable = true) + // ||||-- subsubfield2: integer (nullable = true) + // ||-- field8: map (nullable = true) + // |||-- key: string + // |||-- value: array (valueContainsNull = false) + // ||||-- element: struct (containsNull = true) + // |||||-- subfield1: integer (nullable = true) + // |||||-- subfield2: array (nullable = true) + // ||||||-- element: integer (containsNull = false) + // ||-- field9: map (nullable = true) + // |||-- key: string + // |||-- value: integer (valueContainsNull = false) + // |-- col3: array (nullable = false) + // ||-- element: struct (containsNull = false) + // |||-- field1: struct (nullable = true) + // ||||-- subfield1: integer (nullable = false) + // ||||-- subfield2: integer (nullable = true) + // |||-- field2: map (nullable = true) + // ||||-- key: string + // ||||-- value: integer
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r205022799 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.FileSchemaPruningTest +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class ParquetSchemaPruningSuite +extends QueryTest +with ParquetTest +with FileSchemaPruningTest +with SharedSQLContext { + case class FullName(first: String, middle: String, last: String) + case class Contact(name: FullName, address: String, pets: Int, friends: Array[FullName] = Array(), +relatives: Map[String, FullName] = Map()) + + val contacts = +Contact(FullName("Jane", "X.", "Doe"), "123 Main Street", 1) :: +Contact(FullName("John", "Y.", "Doe"), "321 Wall Street", 3) :: Nil + + case class Name(first: String, last: String) + case class BriefContact(name: Name, address: String) + + val briefContacts = +BriefContact(Name("Janet", "Jones"), "567 Maple Drive") :: +BriefContact(Name("Jim", "Jones"), "6242 Ash Street") :: Nil + + case class ContactWithDataPartitionColumn(name: FullName, address: String, pets: Int, +friends: Array[FullName] = Array(), relatives: Map[String, FullName] = Map(), p: Int) + + case class BriefContactWithDataPartitionColumn(name: Name, address: String, p: Int) + + val contactsWithDataPartitionColumn = +contacts.map { case Contact(name, address, pets, friends, relatives) => + ContactWithDataPartitionColumn(name, address, pets, friends, relatives, 1) } + val briefContactsWithDataPartitionColumn = +briefContacts.map { case BriefContact(name: Name, address: String) => + BriefContactWithDataPartitionColumn(name, address, 2) } + + testSchemaPruning("select a single complex field") { +val query = sql("select name.middle from contacts") +checkScanSchemata(query, "struct>") +checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: Nil) + } + + testSchemaPruning("select a single complex field and the partition column") { +val query = sql("select name.middle, p from contacts") +checkScanSchemata(query, "struct>") +checkAnswer(query, Row("X.", 1) :: Row("Y.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil) + } + + ignore("partial schema intersection - select missing subfield") { +val query = sql("select name.middle, address from contacts where p=2") +checkScanSchemata(query, "struct,address:string>") +checkAnswer(query, + Row(null, "567 Maple Drive") :: + Row(null, "6242 Ash Street") :: Nil) + } + + testSchemaPruning("no unnecessary schema pruning") { +val query = + sql("select name.last, name.middle, name.first, relatives[''].last, " + +"relatives[''].middle, relatives[''].first, friends[0].last, friends[0].middle, " + +"friends[0].first, pets, address from contacts where p=2") +// We've selected every field in the schema. Therefore, no schema pruning should be performed. +// We check this by asserting that the scanned schema of the query is identical to the schema +// of the contacts relation, even though the fields are selected in different orders. +checkScanSchemata(query, + "struct,address:string,pets:int," + + "friends:array>," + + "relatives:map>>") +checkAnswer(query, + Row("Jones", null, "Janet", null, null, null, null, null, null, null, "567 Maple Drive") :: + Row("Jones", null, "Jim", null, null, null, null, null, null, null, "6242
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r205022895 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +if (SQLConf.get.nestedSchemaPruningEnabled) { + apply0(plan) +} else { + plan +} + + private def apply0(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) => +val projectionRootFields = projects.flatMap(getRootFields) +val filterRootFields = filters.flatMap(getRootFields) +val requestedRootFields = (projectionRootFields ++ filterRootFields).distinct + +// If [[requestedRootFields]] includes a nested field, continue. Otherwise, +// return [[op]] +if (requestedRootFields.exists { case RootField(_, derivedFromAtt) => !derivedFromAtt }) { + val prunedSchema = requestedRootFields +.map { case RootField(field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) + val dataSchemaFieldNames = dataSchema.fieldNames.toSet + val prunedDataSchema = +StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) + + // If the data schema is different from the pruned data schema, continue. Otherwise, + // return [[op]]. We effect this comparison by counting the number of "leaf" fields in + // each schemata, assuming the fields in [[prunedDataSchema]] are a subset of the fields + // in [[dataSchema]]. + if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { --- End diff -- No comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r205021712 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.types._ + +// scalastyle:off line.size.limit --- End diff -- No comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r205021469 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +if (SQLConf.get.nestedSchemaPruningEnabled) { + apply0(plan) +} else { + plan +} + + private def apply0(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) => +val projectionRootFields = projects.flatMap(getRootFields) +val filterRootFields = filters.flatMap(getRootFields) +val requestedRootFields = (projectionRootFields ++ filterRootFields).distinct + +// If [[requestedRootFields]] includes a nested field, continue. Otherwise, +// return [[op]] +if (requestedRootFields.exists { case RootField(_, derivedFromAtt) => !derivedFromAtt }) { + val prunedSchema = requestedRootFields +.map { case RootField(field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) + val dataSchemaFieldNames = dataSchema.fieldNames.toSet + val prunedDataSchema = +StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) + + // If the data schema is different from the pruned data schema, continue. Otherwise, + // return [[op]]. We effect this comparison by counting the number of "leaf" fields in + // each schemata, assuming the fields in [[prunedDataSchema]] are a subset of the fields + // in [[dataSchema]]. + if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { +val prunedParquetRelation = + hadoopFsRelation.copy(dataSchema = prunedDataSchema)(hadoopFsRelation.sparkSession) + +// We need to replace the expression ids of the pruned relation output attributes +// with the expression ids of the original relation output attributes so that +// references to the original relation's output are not broken +val outputIdMap = l.output.map(att => (att.name, att.exprId)).toMap +val prunedRelationOutput = + prunedParquetRelation +.schema +.toAttributes +.map { + case att if outputIdMap.contains(att.name) => +att.withExprId(outputIdMap(att.name)) + case att => att +} +val prunedRelation = + l.copy(relation =
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r205021282 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that projects an expression over a given schema. Data types, + * field indexes and field counts of complex type extractors and attributes + * are adjusted to fit the schema. All other expressions are left as-is. This + * class is motivated by columnar nested schema pruning. + */ +case class ProjectionOverSchema(schema: StructType) { --- End diff -- Okay. So... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r205020970 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -71,9 +80,22 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { + // Parquet-mr will throw an exception if we try to read a superset of the file's schema. --- End diff -- This change is not part of this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r205021140 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.FileSchemaPruningTest +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class ParquetSchemaPruningSuite +extends QueryTest +with ParquetTest +with FileSchemaPruningTest +with SharedSQLContext { --- End diff -- No comment. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r204607379 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.types._ + +// scalastyle:off line.size.limit +class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll { + // The test schema as a tree string, i.e. `schema.treeString` + // root + // |-- col1: string (nullable = false) + // |-- col2: struct (nullable = true) + // ||-- field1: integer (nullable = true) + // ||-- field2: array (nullable = true) + // |||-- element: integer (containsNull = false) + // ||-- field3: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: integer (nullable = true) + // ||||-- subfield3: array (nullable = true) + // |||||-- element: integer (containsNull = true) + // ||-- field4: map (nullable = true) + // |||-- key: string + // |||-- value: struct (valueContainsNull = false) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: array (nullable = true) + // |||||-- element: integer (containsNull = false) + // ||-- field5: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: struct (nullable = false) + // |||||-- subsubfield1: integer (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||||-- subfield2: struct (nullable = true) + // |||||-- subsubfield1: struct (nullable = true) + // ||||||-- subsubsubfield1: string (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||-- field6: struct (nullable = true) + // |||-- subfield1: string (nullable = false) + // |||-- subfield2: string (nullable = true) + // ||-- field7: struct (nullable = true) + // |||-- subfield1: struct (nullable = true) + // ||||-- subsubfield1: integer (nullable = true) + // ||||-- subsubfield2: integer (nullable = true) + // ||-- field8: map (nullable = true) + // |||-- key: string + // |||-- value: array (valueContainsNull = false) + // ||||-- element: struct (containsNull = true) + // |||||-- subfield1: integer (nullable = true) + // |||||-- subfield2: array (nullable = true) + // ||||||-- element: integer (containsNull = false) + // ||-- field9: map (nullable = true) + // |||-- key: string + // |||-- value: integer (valueContainsNull = false) + // |-- col3: array (nullable = false) + // ||-- element: struct (containsNull = false) + // |||-- field1: struct (nullable = true) + // ||||-- subfield1: integer (nullable = false) + // ||||-- subfield2: integer (nullable = true) + // |||-- field2: map (nullable = true) + // ||||-- key: string + // ||||-- value: integer
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r204289193 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +if (SQLConf.get.nestedSchemaPruningEnabled) { + apply0(plan) +} else { + plan +} + + private def apply0(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) => +val projectionRootFields = projects.flatMap(getRootFields) +val filterRootFields = filters.flatMap(getRootFields) +val requestedRootFields = (projectionRootFields ++ filterRootFields).distinct + +// If [[requestedRootFields]] includes a nested field, continue. Otherwise, +// return [[op]] +if (requestedRootFields.exists { case RootField(_, derivedFromAtt) => !derivedFromAtt }) { + val prunedSchema = requestedRootFields +.map { case RootField(field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) + val dataSchemaFieldNames = dataSchema.fieldNames.toSet + val prunedDataSchema = +StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) + + // If the data schema is different from the pruned data schema, continue. Otherwise, + // return [[op]]. We effect this comparison by counting the number of "leaf" fields in + // each schemata, assuming the fields in [[prunedDataSchema]] are a subset of the fields + // in [[dataSchema]]. + if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { --- End diff -- Can we make some private functions for these? Looks hard to follow. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r204288547 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.FileSchemaPruningTest +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class ParquetSchemaPruningSuite +extends QueryTest +with ParquetTest +with FileSchemaPruningTest +with SharedSQLContext { + case class FullName(first: String, middle: String, last: String) + case class Contact(name: FullName, address: String, pets: Int, friends: Array[FullName] = Array(), +relatives: Map[String, FullName] = Map()) + + val contacts = +Contact(FullName("Jane", "X.", "Doe"), "123 Main Street", 1) :: +Contact(FullName("John", "Y.", "Doe"), "321 Wall Street", 3) :: Nil + + case class Name(first: String, last: String) + case class BriefContact(name: Name, address: String) + + val briefContacts = +BriefContact(Name("Janet", "Jones"), "567 Maple Drive") :: +BriefContact(Name("Jim", "Jones"), "6242 Ash Street") :: Nil + + case class ContactWithDataPartitionColumn(name: FullName, address: String, pets: Int, +friends: Array[FullName] = Array(), relatives: Map[String, FullName] = Map(), p: Int) + + case class BriefContactWithDataPartitionColumn(name: Name, address: String, p: Int) + + val contactsWithDataPartitionColumn = +contacts.map { case Contact(name, address, pets, friends, relatives) => + ContactWithDataPartitionColumn(name, address, pets, friends, relatives, 1) } + val briefContactsWithDataPartitionColumn = +briefContacts.map { case BriefContact(name: Name, address: String) => + BriefContactWithDataPartitionColumn(name, address, 2) } + + testSchemaPruning("select a single complex field") { +val query = sql("select name.middle from contacts") +checkScanSchemata(query, "struct>") +checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: Nil) + } + + testSchemaPruning("select a single complex field and the partition column") { +val query = sql("select name.middle, p from contacts") +checkScanSchemata(query, "struct>") +checkAnswer(query, Row("X.", 1) :: Row("Y.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil) + } + + ignore("partial schema intersection - select missing subfield") { +val query = sql("select name.middle, address from contacts where p=2") +checkScanSchemata(query, "struct,address:string>") +checkAnswer(query, + Row(null, "567 Maple Drive") :: + Row(null, "6242 Ash Street") :: Nil) + } + + testSchemaPruning("no unnecessary schema pruning") { +val query = + sql("select name.last, name.middle, name.first, relatives[''].last, " + +"relatives[''].middle, relatives[''].first, friends[0].last, friends[0].middle, " + +"friends[0].first, pets, address from contacts where p=2") +// We've selected every field in the schema. Therefore, no schema pruning should be performed. +// We check this by asserting that the scanned schema of the query is identical to the schema +// of the contacts relation, even though the fields are selected in different orders. +checkScanSchemata(query, + "struct,address:string,pets:int," + + "friends:array>," + + "relatives:map>>") +checkAnswer(query, + Row("Jones", null, "Janet", null, null, null, null, null, null, null, "567 Maple Drive") :: + Row("Jones", null, "Jim", null, null, null, null, null, null, null,
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r204288381 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala --- @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.types._ + +// scalastyle:off line.size.limit --- End diff -- tiny nit: I think we don't really have to disable the length limit. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r204288233 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +if (SQLConf.get.nestedSchemaPruningEnabled) { + apply0(plan) +} else { + plan +} + + private def apply0(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) => +val projectionRootFields = projects.flatMap(getRootFields) +val filterRootFields = filters.flatMap(getRootFields) +val requestedRootFields = (projectionRootFields ++ filterRootFields).distinct + +// If [[requestedRootFields]] includes a nested field, continue. Otherwise, +// return [[op]] +if (requestedRootFields.exists { case RootField(_, derivedFromAtt) => !derivedFromAtt }) { + val prunedSchema = requestedRootFields +.map { case RootField(field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) + val dataSchemaFieldNames = dataSchema.fieldNames.toSet + val prunedDataSchema = +StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) + + // If the data schema is different from the pruned data schema, continue. Otherwise, + // return [[op]]. We effect this comparison by counting the number of "leaf" fields in + // each schemata, assuming the fields in [[prunedDataSchema]] are a subset of the fields + // in [[dataSchema]]. + if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { +val prunedParquetRelation = + hadoopFsRelation.copy(dataSchema = prunedDataSchema)(hadoopFsRelation.sparkSession) + +// We need to replace the expression ids of the pruned relation output attributes +// with the expression ids of the original relation output attributes so that +// references to the original relation's output are not broken +val outputIdMap = l.output.map(att => (att.name, att.exprId)).toMap +val prunedRelationOutput = + prunedParquetRelation +.schema +.toAttributes +.map { + case att if outputIdMap.contains(att.name) => +att.withExprId(outputIdMap(att.name)) + case att => att +} +val prunedRelation = + l.copy(relation =
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r204209033 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1298,8 +1298,18 @@ object SQLConf { "issues. Turn on this config to insert a local sort before actually doing repartition " + "to generate consistent repartition results. The performance of repartition() may go " + "down since we insert extra local sort before it.") +.booleanConf +.createWithDefault(true) + + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data. Currently Parquet is the only data source that " + +"implements this optimization.") .booleanConf - .createWithDefault(true) + .createWithDefault(false) --- End diff -- I'm against enabling this feature by default with a known failing test case. For example, https://github.com/apache/spark/pull/21320/files#diff-0c6c7481232e9637b91c179f1005426aR71. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r204208518 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.types._ + +// scalastyle:off line.size.limit +class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll { + // The test schema as a tree string, i.e. `schema.treeString` + // root + // |-- col1: string (nullable = false) + // |-- col2: struct (nullable = true) + // ||-- field1: integer (nullable = true) + // ||-- field2: array (nullable = true) + // |||-- element: integer (containsNull = false) + // ||-- field3: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: integer (nullable = true) + // ||||-- subfield3: array (nullable = true) + // |||||-- element: integer (containsNull = true) + // ||-- field4: map (nullable = true) + // |||-- key: string + // |||-- value: struct (valueContainsNull = false) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: array (nullable = true) + // |||||-- element: integer (containsNull = false) + // ||-- field5: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: struct (nullable = false) + // |||||-- subsubfield1: integer (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||||-- subfield2: struct (nullable = true) + // |||||-- subsubfield1: struct (nullable = true) + // ||||||-- subsubsubfield1: string (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||-- field6: struct (nullable = true) + // |||-- subfield1: string (nullable = false) + // |||-- subfield2: string (nullable = true) + // ||-- field7: struct (nullable = true) + // |||-- subfield1: struct (nullable = true) + // ||||-- subsubfield1: integer (nullable = true) + // ||||-- subsubfield2: integer (nullable = true) + // ||-- field8: map (nullable = true) + // |||-- key: string + // |||-- value: array (valueContainsNull = false) + // ||||-- element: struct (containsNull = true) + // |||||-- subfield1: integer (nullable = true) + // |||||-- subfield2: array (nullable = true) + // ||||||-- element: integer (containsNull = false) + // ||-- field9: map (nullable = true) + // |||-- key: string + // |||-- value: integer (valueContainsNull = false) + // |-- col3: array (nullable = false) + // ||-- element: struct (containsNull = false) + // |||-- field1: struct (nullable = true) + // ||||-- subfield1: integer (nullable = false) + // ||||-- subfield2: integer (nullable = true) + // |||-- field2: map (nullable = true) + // ||||-- key: string + // ||||-- value: integer
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r204206252 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.FileSchemaPruningTest +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class ParquetSchemaPruningSuite +extends QueryTest +with ParquetTest +with FileSchemaPruningTest +with SharedSQLContext { + case class FullName(first: String, middle: String, last: String) + case class Contact(name: FullName, address: String, pets: Int, friends: Array[FullName] = Array(), +relatives: Map[String, FullName] = Map()) + + val contacts = +Contact(FullName("Jane", "X.", "Doe"), "123 Main Street", 1) :: +Contact(FullName("John", "Y.", "Doe"), "321 Wall Street", 3) :: Nil + + case class Name(first: String, last: String) + case class BriefContact(name: Name, address: String) + + val briefContacts = +BriefContact(Name("Janet", "Jones"), "567 Maple Drive") :: +BriefContact(Name("Jim", "Jones"), "6242 Ash Street") :: Nil + + case class ContactWithDataPartitionColumn(name: FullName, address: String, pets: Int, +friends: Array[FullName] = Array(), relatives: Map[String, FullName] = Map(), p: Int) + + case class BriefContactWithDataPartitionColumn(name: Name, address: String, p: Int) + + val contactsWithDataPartitionColumn = +contacts.map { case Contact(name, address, pets, friends, relatives) => + ContactWithDataPartitionColumn(name, address, pets, friends, relatives, 1) } + val briefContactsWithDataPartitionColumn = +briefContacts.map { case BriefContact(name: Name, address: String) => + BriefContactWithDataPartitionColumn(name, address, 2) } + + testSchemaPruning("select a single complex field") { +val query = sql("select name.middle from contacts") +checkScanSchemata(query, "struct>") +checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: Nil) + } + + testSchemaPruning("select a single complex field and the partition column") { +val query = sql("select name.middle, p from contacts") +checkScanSchemata(query, "struct>") +checkAnswer(query, Row("X.", 1) :: Row("Y.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil) + } + + testSchemaPruning("partial schema intersection - select missing subfield") { +val query = sql("select name.middle, address from contacts where p=2") +checkScanSchemata(query, "struct,address:string>") +checkAnswer(query, + Row(null, "567 Maple Drive") :: + Row(null, "6242 Ash Street") :: Nil) + } + + testSchemaPruning("no unnecessary schema pruning") { +val query = + sql("select name.last, name.middle, name.first, relatives[''].last, " + +"relatives[''].middle, relatives[''].first, friends[0].last, friends[0].middle, " + +"friends[0].first, pets, address from contacts where p=2") +// We've selected every field in the schema. Therefore, no schema pruning should be performed. +// We check this by asserting that the scanned schema of the query is identical to the schema +// of the contacts relation, even though the fields are selected in different orders. +checkScanSchemata(query, + "struct,address:string,pets:int," + + "friends:array>," + + "relatives:map>>") +checkAnswer(query, + Row("Jones", null, "Janet", null, null, null, null, null, null, null, "567 Maple Drive") :: + Row("Jones", null, "Jim", null, null, null, null, null, null,
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r204206072 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.FileSchemaPruningTest +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class ParquetSchemaPruningSuite +extends QueryTest +with ParquetTest +with FileSchemaPruningTest +with SharedSQLContext { + case class FullName(first: String, middle: String, last: String) + case class Contact(name: FullName, address: String, pets: Int, friends: Array[FullName] = Array(), +relatives: Map[String, FullName] = Map()) + + val contacts = +Contact(FullName("Jane", "X.", "Doe"), "123 Main Street", 1) :: +Contact(FullName("John", "Y.", "Doe"), "321 Wall Street", 3) :: Nil + + case class Name(first: String, last: String) + case class BriefContact(name: Name, address: String) + + val briefContacts = +BriefContact(Name("Janet", "Jones"), "567 Maple Drive") :: +BriefContact(Name("Jim", "Jones"), "6242 Ash Street") :: Nil + + case class ContactWithDataPartitionColumn(name: FullName, address: String, pets: Int, +friends: Array[FullName] = Array(), relatives: Map[String, FullName] = Map(), p: Int) + + case class BriefContactWithDataPartitionColumn(name: Name, address: String, p: Int) + + val contactsWithDataPartitionColumn = +contacts.map { case Contact(name, address, pets, friends, relatives) => + ContactWithDataPartitionColumn(name, address, pets, friends, relatives, 1) } + val briefContactsWithDataPartitionColumn = +briefContacts.map { case BriefContact(name: Name, address: String) => + BriefContactWithDataPartitionColumn(name, address, 2) } + + testSchemaPruning("select a single complex field") { +val query = sql("select name.middle from contacts") +checkScanSchemata(query, "struct>") +checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: Nil) + } + + testSchemaPruning("select a single complex field and the partition column") { +val query = sql("select name.middle, p from contacts") +checkScanSchemata(query, "struct>") +checkAnswer(query, Row("X.", 1) :: Row("Y.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil) + } + + testSchemaPruning("partial schema intersection - select missing subfield") { +val query = sql("select name.middle, address from contacts where p=2") +checkScanSchemata(query, "struct,address:string>") +checkAnswer(query, + Row(null, "567 Maple Drive") :: + Row(null, "6242 Ash Street") :: Nil) + } + + testSchemaPruning("no unnecessary schema pruning") { +val query = + sql("select name.last, name.middle, name.first, relatives[''].last, " + +"relatives[''].middle, relatives[''].first, friends[0].last, friends[0].middle, " + +"friends[0].first, pets, address from contacts where p=2") +// We've selected every field in the schema. Therefore, no schema pruning should be performed. +// We check this by asserting that the scanned schema of the query is identical to the schema +// of the contacts relation, even though the fields are selected in different orders. +checkScanSchemata(query, + "struct,address:string,pets:int," + + "friends:array>," + + "relatives:map>>") +checkAnswer(query, + Row("Jones", null, "Janet", null, null, null, null, null, null, null, "567 Maple Drive") :: + Row("Jones", null, "Jim", null, null, null, null, null, null,
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r204205744 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -71,9 +80,22 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { + // Parquet-mr will throw an exception if we try to read a superset of the file's schema. --- End diff -- What kind of exception does it throw? I thought Parquet supports it so far. Do you mean it doesn't support nested clipping schema between Parquet and Catalyst's? Which case does it now work? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r203934633 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -288,6 +310,27 @@ private[parquet] object ParquetReadSupport { } } + /** + * Computes the structural intersection between two Parquet group types. + */ + private def intersectParquetGroups( + groupType1: GroupType, groupType2: GroupType): Option[GroupType] = { +val fields = + groupType1.getFields.asScala +.filter(field => groupType2.containsField(field.getName)) +.flatMap { + case field1: GroupType => --- End diff -- `field1` -> `field`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r203934281 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -47,16 +47,25 @@ import org.apache.spark.sql.types._ * * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]] * to [[prepareForRead()]], but use a private `var` for simplicity. + * + * @param parquetMrCompatibility support reading with parquet-mr or Spark's built-in Parquet reader */ -private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) +private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], +parquetMrCompatibility: Boolean) extends ReadSupport[UnsafeRow] with Logging { private var catalystRequestedSchema: StructType = _ + /** + * Construct a [[ParquetReadSupport]] with [[convertTz]] set to [[None]] and + * [[parquetMrCompatibility]] set to [[false]]. + * + * We need a zero-arg constructor for SpecificParquetRecordReaderBase. But that is only + * used in the vectorized reader, where we get the convertTz value directly, and the value here + * is ignored. Further, we set [[parquetMrCompatibility]] to [[false]] as this constructor is only + * called by the Spark reader. --- End diff -- re: https://github.com/apache/spark/pull/21320/files/cb858f202e49d69f2044681e37f982dc10676296#r199631341 actually, it doesn't looks clear to me too. What does the flag indicate? you mean normal parquet reader vs vectorized parquet reader? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r203933423 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala --- @@ -0,0 +1,387 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.types._ + +// scalastyle:off line.size.limit +class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll { + // The test schema as a tree string, i.e. `schema.treeString` + // root + // |-- col1: string (nullable = false) + // |-- col2: struct (nullable = true) + // ||-- field1: integer (nullable = true) + // ||-- field2: array (nullable = true) + // |||-- element: integer (containsNull = false) + // ||-- field3: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: integer (nullable = true) + // ||||-- subfield3: array (nullable = true) + // |||||-- element: integer (containsNull = true) + // ||-- field4: map (nullable = true) + // |||-- key: string + // |||-- value: struct (valueContainsNull = false) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: array (nullable = true) + // |||||-- element: integer (containsNull = false) + // ||-- field5: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: struct (nullable = false) + // |||||-- subsubfield1: integer (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||||-- subfield2: struct (nullable = true) + // |||||-- subsubfield1: struct (nullable = true) + // ||||||-- subsubsubfield1: string (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||-- field6: struct (nullable = true) + // |||-- subfield1: string (nullable = false) + // |||-- subfield2: string (nullable = true) + // ||-- field7: struct (nullable = true) + // |||-- subfield1: struct (nullable = true) + // ||||-- subsubfield1: integer (nullable = true) + // ||||-- subsubfield2: integer (nullable = true) + // ||-- field8: map (nullable = true) + // |||-- key: string + // |||-- value: array (valueContainsNull = false) + // ||||-- element: struct (containsNull = true) + // |||||-- subfield1: integer (nullable = true) + // |||||-- subfield2: array (nullable = true) + // ||||||-- element: integer (containsNull = false) + // ||-- field9: map (nullable = true) + // |||-- key: string + // |||-- value: integer (valueContainsNull = false) + // |-- col3: array (nullable = false) + // ||-- element: struct (containsNull = false) + // |||-- field1: struct (nullable = true) + // ||||-- subfield1: integer (nullable = false) + // ||||-- subfield2: integer (nullable = true) + // |||-- field2: map (nullable = true) + // ||||-- key: string + // ||||-- value: integer
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r203933307 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import org.apache.spark.sql.{QueryTest, Row} +import org.apache.spark.sql.execution.FileSchemaPruningTest +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext + +class ParquetSchemaPruningSuite +extends QueryTest +with ParquetTest +with FileSchemaPruningTest +with SharedSQLContext { --- End diff -- Can we just simply: ```scala override def beforeEach(): Unit = { super.beforeEach() spark.conf.set(SQLConf. NESTED_SCHEMA_PRUNING_ENABLED.key, "true") } override def afterEach(): Unit = { try { spark.conf.unset(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key) } finally { super.afterEach() } } ``` without the complicated hierarchy? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r203931755 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that projects an expression over a given schema. Data types, + * field indexes and field counts of complex type extractors and attributes + * are adjusted to fit the schema. All other expressions are left as-is. This + * class is motivated by columnar nested schema pruning. + */ +case class ProjectionOverSchema(schema: StructType) { --- End diff -- This still looks weird that we place this under `catalyst` since we currently only use it under `execution`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r203931060 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that projects an expression over a given schema. Data types, + * field indexes and field counts of complex type extractors and attributes + * are adjusted to fit the schema. All other expressions are left as-is. This + * class is motivated by columnar nested schema pruning. + */ +case class ProjectionOverSchema(schema: StructType) { + private val fieldNames = schema.fieldNames.toSet + + def unapply(expr: Expression): Option[Expression] = getProjection(expr) + + private def getProjection(expr: Expression): Option[Expression] = +expr match { + case a @ AttributeReference(name, _, _, _) if (fieldNames.contains(name)) => +Some(a.copy(dataType = schema(name).dataType)(a.exprId, a.qualifier)) + case GetArrayItem(child, arrayItemOrdinal) => +getProjection(child).map { + case projection => +GetArrayItem(projection, arrayItemOrdinal) --- End diff -- nit: ``` getProjection(child).map { projection => GetArrayItem(projection, arrayItemOrdinal) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r203931030 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala --- @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that projects an expression over a given schema. Data types, + * field indexes and field counts of complex type extractors and attributes + * are adjusted to fit the schema. All other expressions are left as-is. This + * class is motivated by columnar nested schema pruning. + */ +case class ProjectionOverSchema(schema: StructType) { + private val fieldNames = schema.fieldNames.toSet + + def unapply(expr: Expression): Option[Expression] = getProjection(expr) + + private def getProjection(expr: Expression): Option[Expression] = +expr match { + case a @ AttributeReference(name, _, _, _) if (fieldNames.contains(name)) => +Some(a.copy(dataType = schema(name).dataType)(a.exprId, a.qualifier)) + case GetArrayItem(child, arrayItemOrdinal) => +getProjection(child).map { + case projection => +GetArrayItem(projection, arrayItemOrdinal) +} + case GetArrayStructFields(child, StructField(name, _, _, _), _, numFields, containsNull) => +getProjection(child).map(p => (p, p.dataType)).map { + case (projection, ArrayType(projSchema @ StructType(_), _)) => +GetArrayStructFields(projection, + projSchema(name), projSchema.fieldIndex(name), projSchema.size, containsNull) +} + case GetMapValue(child, key) => +getProjection(child).map { + case projection => --- End diff -- nit: ```scala getProjection(child).map { projection => GetMapValue(projection, key) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r201863353 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +if (SQLConf.get.nestedSchemaPruningEnabled) { + apply0(plan) +} else { + plan +} + + private def apply0(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) => +val projectionRootFields = projects.flatMap(getRootFields) +val filterRootFields = filters.flatMap(getRootFields) +val requestedRootFields = (projectionRootFields ++ filterRootFields).distinct + +// If [[requestedRootFields]] includes a nested field, continue. Otherwise, +// return [[op]] +if (requestedRootFields.exists { case RootField(_, derivedFromAtt) => !derivedFromAtt }) { + val prunedSchema = requestedRootFields +.map { case RootField(field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) + val dataSchemaFieldNames = dataSchema.fieldNames.toSet --- End diff -- Yes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r201863463 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SelectedField.scala --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that builds a [[org.apache.spark.sql.types.StructField]] from a Catalyst + * complex type extractor. For example, consider a relation with the following schema: + * + * {{{ + * root + *|-- name: struct (nullable = true) + *||-- first: string (nullable = true) + *||-- last: string (nullable = true) + *}}} + * + * Further, suppose we take the select expression `name.first`. This will parse into an + * `Alias(child, "first")`. Ignoring the alias, `child` matches the following pattern: + * + * {{{ + * GetStructFieldObject( + * AttributeReference("name", StructType(_), _, _), + * StructField("first", StringType, _, _)) + * }}} + * + * [[SelectedField]] converts that expression into + * + * {{{ + * StructField("name", StructType(Array(StructField("first", StringType + * }}} + * + * by mapping each complex type extractor to a [[org.apache.spark.sql.types.StructField]] with the + * same name as its child (or "parent" going right to left in the select expression) and a data + * type appropriate to the complex type extractor. In our example, the name of the child expression + * is "name" and its data type is a [[org.apache.spark.sql.types.StructType]] with a single string + * field named "first". + * + * @param expr the top-level complex type extractor + */ +object SelectedField { + def unapply(expr: Expression): Option[StructField] = { --- End diff -- The code does not compile with that change. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r201863251 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -71,9 +80,22 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { + // Parquet-mr will throw an exception if we try to read a superset of the file's schema. + // Therefore, we intersect our clipped schema with the underlying file's schema + ParquetReadSupport.intersectParquetGroups(clippedParquetSchema, context.getFileSchema) +.map(intersectionGroup => + new MessageType(intersectionGroup.getName, intersectionGroup.getFields)) +.getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE) +} else { + // Spark's built-in Parquet reader will throw an exception in some cases if the requested + // schema is not the same as the clipped schema --- End diff -- @gatorsmile @rdblue @mswit-databricks What is your position on this? I don't know that the parquet spec provides a definitive answer on this question. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r199648692 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -182,18 +182,20 @@ private[parquet] class ParquetRowConverter( // Converters for each field. private val fieldConverters: Array[Converter with HasParentContainerUpdater] = { -parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map { - case ((parquetFieldType, catalystField), ordinal) => -// Converted field value should be set to the `ordinal`-th cell of `currentRow` -newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal)) +parquetType.getFields.asScala.map { + case parquetField => +val fieldIndex = catalystType.fieldIndex(parquetField.getName) --- End diff -- I dropped into the `sql/console` and attempted to write a parquet file with duplicate column names. It didn't work. Transcript below. ``` scala> import org.apache.spark.sql._ import org.apache.spark.sql._ scala> val sameColumnNames = StructType(StructField("a", IntegerType) :: StructField("a", StringType) :: Nil) sameColumnNames: org.apache.spark.sql.types.StructType = StructType(StructField(a,IntegerType,true), StructField(a,StringType,true)) scala> val rowRDD = sqlContext.sparkContext.parallelize(Row(1, "one") :: Row(2, "two") :: Nil, 1) rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = ParallelCollectionRDD[0] at parallelize at :51 scala> val df = sqlContext.createDataFrame(rowRDD, sameColumnNames) 18/07/02 16:31:33 INFO SharedState: Setting hive.metastore.warehouse.dir ('null') to the value of spark.sql.warehouse.dir ('file:/Volumes/VideoAmpCS/msa/workspace/spark-public/spark-warehouse'). 18/07/02 16:31:33 INFO SharedState: Warehouse path is 'file:/Volumes/VideoAmpCS/msa/workspace/spark-public/spark-warehouse'. 18/07/02 16:31:33 INFO ContextHandler: Started o.e.j.s.ServletContextHandler@7b13b737{/SQL,null,AVAILABLE,@Spark} 18/07/02 16:31:33 INFO ContextHandler: Started o.e.j.s.ServletContextHandler@3c9fb104{/SQL/json,null,AVAILABLE,@Spark} 18/07/02 16:31:33 INFO ContextHandler: Started o.e.j.s.ServletContextHandler@3d5cadbe{/SQL/execution,null,AVAILABLE,@Spark} 18/07/02 16:31:33 INFO ContextHandler: Started o.e.j.s.ServletContextHandler@73732e26{/SQL/execution/json,null,AVAILABLE,@Spark} 18/07/02 16:31:33 INFO ContextHandler: Started o.e.j.s.ServletContextHandler@72a13c4a{/static/sql,null,AVAILABLE,@Spark} 18/07/02 16:31:34 INFO StateStoreCoordinatorRef: Registered StateStoreCoordinator endpoint df: org.apache.spark.sql.DataFrame = [a: int, a: string] scala> df.write.parquet("sameColumnNames.parquet") org.apache.spark.sql.AnalysisException: Found duplicate column(s) when inserting into file:/Volumes/VideoAmpCS/msa/workspace/spark-public/sameColumnNames.parquet: `a`; at org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:85) at org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:42) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:64) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104) at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102) at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:662) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:662) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r199643803 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -71,9 +80,22 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { + // Parquet-mr will throw an exception if we try to read a superset of the file's schema. + // Therefore, we intersect our clipped schema with the underlying file's schema + ParquetReadSupport.intersectParquetGroups(clippedParquetSchema, context.getFileSchema) +.map(intersectionGroup => + new MessageType(intersectionGroup.getName, intersectionGroup.getFields)) +.getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE) +} else { + // Spark's built-in Parquet reader will throw an exception in some cases if the requested + // schema is not the same as the clipped schema --- End diff -- I believe the failure occurs because the requested schema and file schema—while having columns with identical names and types—have columns in different order. Of the one test that fails in the `ParquetFilterSuite`, namely "Filter applied on merged Parquet schema with new column should work", it appears to be the only one for which the order of the columns is changed. These are the file and requested schema for that test: ``` Parquet file schema: message spark_schema { required int32 c; optional binary b (UTF8); } Parquet requested schema: message spark_schema { optional binary b (UTF8); required int32 c; } ``` I would say the Spark reader expects identical column order, whereas the parquet-mr reader accepts different column order but identical (or compatible) column names. That's my supposition at least. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r199631341 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -47,16 +47,25 @@ import org.apache.spark.sql.types._ * * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]] * to [[prepareForRead()]], but use a private `var` for simplicity. + * + * @param parquetMrCompatibility support reading with parquet-mr or Spark's built-in Parquet reader */ -private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) +private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], +parquetMrCompatibility: Boolean) extends ReadSupport[UnsafeRow] with Logging { private var catalystRequestedSchema: StructType = _ + /** + * Construct a [[ParquetReadSupport]] with [[convertTz]] set to [[None]] and + * [[parquetMrCompatibility]] set to [[false]]. + * + * We need a zero-arg constructor for SpecificParquetRecordReaderBase. But that is only + * used in the vectorized reader, where we get the convertTz value directly, and the value here + * is ignored. Further, we set [[parquetMrCompatibility]] to [[false]] as this constructor is only + * called by the Spark reader. --- End diff -- I don't understand your confusion. I think the comment makes it very clear why we need to set that parameter to false. How can I make it better? Or can you be more specific about what is unclear to you? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r199516297 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SelectedField.scala --- @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.types._ + +/** + * A Scala extractor that builds a [[org.apache.spark.sql.types.StructField]] from a Catalyst + * complex type extractor. For example, consider a relation with the following schema: + * + * {{{ + * root + *|-- name: struct (nullable = true) + *||-- first: string (nullable = true) + *||-- last: string (nullable = true) + *}}} + * + * Further, suppose we take the select expression `name.first`. This will parse into an + * `Alias(child, "first")`. Ignoring the alias, `child` matches the following pattern: + * + * {{{ + * GetStructFieldObject( + * AttributeReference("name", StructType(_), _, _), + * StructField("first", StringType, _, _)) + * }}} + * + * [[SelectedField]] converts that expression into + * + * {{{ + * StructField("name", StructType(Array(StructField("first", StringType + * }}} + * + * by mapping each complex type extractor to a [[org.apache.spark.sql.types.StructField]] with the + * same name as its child (or "parent" going right to left in the select expression) and a data + * type appropriate to the complex type extractor. In our example, the name of the child expression + * is "name" and its data type is a [[org.apache.spark.sql.types.StructType]] with a single string + * field named "first". + * + * @param expr the top-level complex type extractor + */ +object SelectedField { + def unapply(expr: Expression): Option[StructField] = { --- End diff -- nit: `Expression` -> `ExtractValue`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r199538123 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +if (SQLConf.get.nestedSchemaPruningEnabled) { + apply0(plan) +} else { + plan +} + + private def apply0(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) => +val projectionRootFields = projects.flatMap(getRootFields) +val filterRootFields = filters.flatMap(getRootFields) +val requestedRootFields = (projectionRootFields ++ filterRootFields).distinct + +// If [[requestedRootFields]] includes a nested field, continue. Otherwise, +// return [[op]] +if (requestedRootFields.exists { case RootField(_, derivedFromAtt) => !derivedFromAtt }) { + val prunedSchema = requestedRootFields +.map { case RootField(field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) + val dataSchemaFieldNames = dataSchema.fieldNames.toSet --- End diff -- dataSchema may also contains partition columns(see the doc of `HadoopFsRelation`), is this rule prepared for this case? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r199516625 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1288,8 +1288,18 @@ object SQLConf { "issues. Turn on this config to insert a local sort before actually doing repartition " + "to generate consistent repartition results. The performance of repartition() may go " + "down since we insert extra local sort before it.") +.booleanConf +.createWithDefault(true) + + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data. Currently Parquet is the only data source that " + +"implements this optimization.") .booleanConf - .createWithDefault(true) + .createWithDefault(false) --- End diff -- +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mswit-databricks commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r199415439 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -71,9 +80,22 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { + // Parquet-mr will throw an exception if we try to read a superset of the file's schema. + // Therefore, we intersect our clipped schema with the underlying file's schema + ParquetReadSupport.intersectParquetGroups(clippedParquetSchema, context.getFileSchema) +.map(intersectionGroup => + new MessageType(intersectionGroup.getName, intersectionGroup.getFields)) +.getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE) +} else { + // Spark's built-in Parquet reader will throw an exception in some cases if the requested + // schema is not the same as the clipped schema --- End diff -- CC @michal-databricks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r199389588 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala --- @@ -182,18 +182,20 @@ private[parquet] class ParquetRowConverter( // Converters for each field. private val fieldConverters: Array[Converter with HasParentContainerUpdater] = { -parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map { - case ((parquetFieldType, catalystField), ordinal) => -// Converted field value should be set to the `ordinal`-th cell of `currentRow` -newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal)) +parquetType.getFields.asScala.map { + case parquetField => +val fieldIndex = catalystType.fieldIndex(parquetField.getName) --- End diff -- The name can be used as the identifiers? Could you double check whether we can save the a parquet file with duplicate column names? [Note: the previous version of Spark does not check name duplication. Thus, I guess the previous version of Spark might generate the file with duplicate column names] --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r199368095 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -47,16 +47,25 @@ import org.apache.spark.sql.types._ * * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]] * to [[prepareForRead()]], but use a private `var` for simplicity. + * + * @param parquetMrCompatibility support reading with parquet-mr or Spark's built-in Parquet reader */ -private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) +private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], +parquetMrCompatibility: Boolean) extends ReadSupport[UnsafeRow] with Logging { private var catalystRequestedSchema: StructType = _ + /** + * Construct a [[ParquetReadSupport]] with [[convertTz]] set to [[None]] and + * [[parquetMrCompatibility]] set to [[false]]. + * + * We need a zero-arg constructor for SpecificParquetRecordReaderBase. But that is only + * used in the vectorized reader, where we get the convertTz value directly, and the value here + * is ignored. Further, we set [[parquetMrCompatibility]] to [[false]] as this constructor is only + * called by the Spark reader. --- End diff -- Based on this comment, we have no idea why this is set to false. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r199364935 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -71,9 +80,22 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { --- End diff -- This is only for nested schema pruning? Turn this off when `spark.sql.nestedSchemaPruning.enabled` is off? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r199389252 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -71,9 +80,22 @@ private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) StructType.fromString(schemaString) } -val parquetRequestedSchema = +val clippedParquetSchema = ParquetReadSupport.clipParquetSchema(context.getFileSchema, catalystRequestedSchema) +val parquetRequestedSchema = if (parquetMrCompatibility) { + // Parquet-mr will throw an exception if we try to read a superset of the file's schema. + // Therefore, we intersect our clipped schema with the underlying file's schema + ParquetReadSupport.intersectParquetGroups(clippedParquetSchema, context.getFileSchema) +.map(intersectionGroup => + new MessageType(intersectionGroup.getName, intersectionGroup.getFields)) +.getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE) +} else { + // Spark's built-in Parquet reader will throw an exception in some cases if the requested + // schema is not the same as the clipped schema --- End diff -- cc @rdblue @mswit-databricks Do you know the root cause? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r199356283 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -47,16 +47,25 @@ import org.apache.spark.sql.types._ * * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]] * to [[prepareForRead()]], but use a private `var` for simplicity. + * + * @param parquetMrCompatibility support reading with parquet-mr or Spark's built-in Parquet reader */ -private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone]) +private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone], +parquetMrCompatibility: Boolean) --- End diff -- ```Scala private[parquet] class ParquetReadSupport( val convertTz: Option[TimeZone], parquetMrCompatibility: Boolean) extends ReadSupport[UnsafeRow] with Logging { ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r199354841 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala --- @@ -417,11 +417,12 @@ class ParquetFileFormat } else { logDebug(s"Falling back to parquet-mr") // ParquetRecordReader returns UnsafeRow +val readSupport = new ParquetReadSupport(convertTz, true) --- End diff -- > val readSupport = new ParquetReadSupport(convertTz, parquetMrCompatibility = true) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r199365004 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala --- @@ -47,16 +47,25 @@ import org.apache.spark.sql.types._ * * Due to this reason, we no longer rely on [[ReadContext]] to pass requested schema from [[init()]] * to [[prepareForRead()]], but use a private `var` for simplicity. + * + * @param parquetMrCompatibility support reading with parquet-mr or Spark's built-in Parquet reader --- End diff -- The description is not clear. Could you make it better? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r197646687 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala --- @@ -99,27 +100,28 @@ trait ConstraintHelper { } /** - * Infer the Attribute-specific IsNotNull constraints from the null intolerant child expressions - * of constraints. + * Infer the Attribute and ExtractValue-specific IsNotNull constraints from the null intolerant + * child expressions of constraints. */ private def inferIsNotNullConstraints(constraint: Expression): Seq[Expression] = constraint match { // When the root is IsNotNull, we can push IsNotNull through the child null intolerant // expressions - case IsNotNull(expr) => scanNullIntolerantAttribute(expr).map(IsNotNull(_)) + case IsNotNull(expr) => scanNullIntolerantField(expr).map(IsNotNull(_)) // Constraints always return true for all the inputs. That means, null will never be returned. // Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull through the child // null intolerant expressions. - case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_)) + case _ => scanNullIntolerantField(constraint).map(IsNotNull(_)) } /** - * Recursively explores the expressions which are null intolerant and returns all attributes - * in these expressions. + * Recursively explores the expressions which are null intolerant and returns all attributes and + * complex type extractors in these expressions. */ - private def scanNullIntolerantAttribute(expr: Expression): Seq[Attribute] = expr match { + private def scanNullIntolerantField(expr: Expression): Seq[Expression] = expr match { +case ev: ExtractValue => Seq(ev) --- End diff -- Because of this change, we also need to change the code in basicPhysicalOperators.scala. I do not think this is the right solution. More importantly, the changes in basicPhysicalOperators.scala might break the others. We need a separate PR for these changes. Please remove the changes made in basicPhysicalOperators.scala and QueryPlanConstraints.scala --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r197629698 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -301,7 +301,6 @@ case class FileSourceScanExec( } getOrElse { withOptPartitionCount } - --- End diff -- Remove this line? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user DaimonPl commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r195704995 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1288,8 +1288,18 @@ object SQLConf { "issues. Turn on this config to insert a local sort before actually doing repartition " + "to generate consistent repartition results. The performance of repartition() may go " + "down since we insert extra local sort before it.") +.booleanConf +.createWithDefault(true) + + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data. Currently Parquet is the only data source that " + +"implements this optimization.") .booleanConf - .createWithDefault(true) + .createWithDefault(false) --- End diff -- how about enabling it as default? there should be enough time to find any unexpected problems with 2.4.0 + nested column pruning would be enabled during for all other automatic tests --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user jainaks commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r194049288 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +if (SQLConf.get.nestedSchemaPruningEnabled) { + apply0(plan) +} else { + plan +} + + private def apply0(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) => +val projectionRootFields = projects.flatMap(getRootFields) +val filterRootFields = filters.flatMap(getRootFields) +val requestedRootFields = (projectionRootFields ++ filterRootFields).distinct + +// If [[requestedRootFields]] includes a nested field, continue. Otherwise, +// return [[op]] +if (requestedRootFields.exists { case RootField(_, derivedFromAtt) => !derivedFromAtt }) { + val prunedSchema = requestedRootFields +.map { case RootField(field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) + val dataSchemaFieldNames = dataSchema.fieldNames.toSet + val prunedDataSchema = +StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) --- End diff -- This check also filters the fields have same name, but in different case than the schema. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r190494243 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.types._ + +// scalastyle:off line.size.limit +class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll { + // The test schema as a tree string, i.e. `schema.treeString` + // root + // |-- col1: string (nullable = false) + // |-- col2: struct (nullable = true) + // ||-- field1: integer (nullable = true) + // ||-- field2: array (nullable = true) + // |||-- element: integer (containsNull = false) + // ||-- field3: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: integer (nullable = true) + // ||||-- subfield3: array (nullable = true) + // |||||-- element: integer (containsNull = true) + // ||-- field4: map (nullable = true) + // |||-- key: string + // |||-- value: struct (valueContainsNull = false) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: array (nullable = true) + // |||||-- element: integer (containsNull = false) + // ||-- field5: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: struct (nullable = false) + // |||||-- subsubfield1: integer (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||||-- subfield2: struct (nullable = true) + // |||||-- subsubfield1: struct (nullable = true) + // ||||||-- subsubsubfield1: string (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||-- field6: struct (nullable = true) + // |||-- subfield1: string (nullable = false) + // |||-- subfield2: string (nullable = true) + // ||-- field7: struct (nullable = true) + // |||-- subfield1: struct (nullable = true) + // ||||-- subsubfield1: integer (nullable = true) + // ||||-- subsubfield2: integer (nullable = true) + // ||-- field8: map (nullable = true) + // |||-- key: string + // |||-- value: array (valueContainsNull = false) + // ||||-- element: struct (containsNull = true) + // |||||-- subfield1: integer (nullable = true) + // |||||-- subfield2: array (nullable = true) + // ||||||-- element: integer (containsNull = false) + // ||-- field9: map (nullable = true) + // |||-- key: string + // |||-- value: integer (valueContainsNull = false) + // |-- col3: array (nullable = false) + // ||-- element: struct (containsNull = false) + // |||-- field1: struct (nullable = true) + // ||||-- subfield1: integer (nullable = false) + // ||||-- subfield2: integer (nullable = true) + // |||-- field2: map (nullable = true) + // ||||-- key: string + // ||||-- value: integer
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r190493689 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +if (SQLConf.get.nestedSchemaPruningEnabled) { + apply0(plan) +} else { + plan +} + + private def apply0(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a nested field, continue. Otherwise, +// return [[op]] +if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) { + val prunedSchema = requestedFields +.map { case (field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) + val dataSchemaFieldNames = dataSchema.fieldNames.toSet + val prunedDataSchema = +StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) + + // If the data schema is different from the pruned data schema, continue. Otherwise, + // return [[op]]. We effect this comparison by counting the number of "leaf" fields in + // each schemata, assuming the fields in [[prunedDataSchema]] are a subset of the fields + // in [[dataSchema]]. + if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { +val prunedParquetRelation = + hadoopFsRelation.copy(dataSchema = prunedDataSchema)(hadoopFsRelation.sparkSession) + +// We need to replace the expression ids of the pruned relation output attributes +// with the expression ids of the original relation output attributes so that +// references to the original relation's output are not broken +val outputIdMap = l.output.map(att => (att.name, att.exprId)).toMap +val prunedRelationOutput = + prunedParquetRelation +.schema +.toAttributes +.map { + case att if outputIdMap.contains(att.name) => +att.withExprId(outputIdMap(att.name)) + case att => att +} +val prunedRelation = + l.copy(relation = prunedParquetRelation, output = prunedRelationOutput) + +
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r190492424 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +if (SQLConf.get.nestedSchemaPruningEnabled) { + apply0(plan) +} else { + plan +} + + private def apply0(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a nested field, continue. Otherwise, +// return [[op]] +if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) { + val prunedSchema = requestedFields +.map { case (field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) + val dataSchemaFieldNames = dataSchema.fieldNames.toSet + val prunedDataSchema = +StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) + + // If the data schema is different from the pruned data schema, continue. Otherwise, + // return [[op]]. We effect this comparison by counting the number of "leaf" fields in + // each schemata, assuming the fields in [[prunedDataSchema]] are a subset of the fields + // in [[dataSchema]]. + if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { +val prunedParquetRelation = + hadoopFsRelation.copy(dataSchema = prunedDataSchema)(hadoopFsRelation.sparkSession) + +// We need to replace the expression ids of the pruned relation output attributes +// with the expression ids of the original relation output attributes so that +// references to the original relation's output are not broken +val outputIdMap = l.output.map(att => (att.name, att.exprId)).toMap +val prunedRelationOutput = + prunedParquetRelation +.schema +.toAttributes +.map { + case att if outputIdMap.contains(att.name) => +att.withExprId(outputIdMap(att.name)) + case att => att +} +val prunedRelation = + l.copy(relation = prunedParquetRelation, output = prunedRelationOutput) + +
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r190491050 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala --- @@ -879,6 +879,15 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("select function over nested data") { --- End diff -- I did some forensics to understand the origin and reason for this test. It was part of the first commit of my original PR almost two years ago. Even then, this test passes without the rest of the commit. So I can't say why I added this test except perhaps that I felt it was useful code coverage. In any case, if you don't think it's a valuable contribution in this PR I'd rather just remove it entirely. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r190486220 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1256,8 +1256,18 @@ object SQLConf { "issues. Turn on this config to insert a local sort before actually doing repartition " + "to generate consistent repartition results. The performance of repartition() may go " + "down since we insert extra local sort before it.") +.booleanConf +.createWithDefault(true) + + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data. Currently Parquet is the only data source that " + --- End diff -- ORC should be able to support this capability as well, but this PR does not address that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r190485768 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -286,7 +286,19 @@ case class FileSourceScanExec( } getOrElse { metadata } -withOptPartitionCount +val withOptColumnCount = relation.fileFormat match { + case columnar: ColumnarFileFormat => +SparkSession + .getActiveSession + .map { sparkSession => +val columnCount = columnar.columnCountForSchema(sparkSession, requiredSchema) +withOptPartitionCount + ("ColumnCount" -> columnCount.toString) --- End diff -- Replied above. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r190485713 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ColumnarFileFormat.scala --- @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.StructType + +/** + * An optional mix-in for columnar [[FileFormat]]s. This trait provides some helpful metadata when + * debugging a physical query plan. + */ +private[sql] trait ColumnarFileFormat { --- End diff -- I'm okay with adding this as a separate PR as requested, but I wouldn't want to ship this PR in a release without being able to see the physical column count associated with a query in a query plan. That is invaluable functionality in validating that physical column pruning is occurring as expected. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r190484386 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala --- @@ -99,27 +100,28 @@ trait ConstraintHelper { } /** - * Infer the Attribute-specific IsNotNull constraints from the null intolerant child expressions - * of constraints. + * Infer the Attribute and ExtractValue-specific IsNotNull constraints from the null intolerant + * child expressions of constraints. */ private def inferIsNotNullConstraints(constraint: Expression): Seq[Expression] = constraint match { // When the root is IsNotNull, we can push IsNotNull through the child null intolerant // expressions - case IsNotNull(expr) => scanNullIntolerantAttribute(expr).map(IsNotNull(_)) + case IsNotNull(expr) => scanNullIntolerantField(expr).map(IsNotNull(_)) // Constraints always return true for all the inputs. That means, null will never be returned. // Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull through the child // null intolerant expressions. - case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_)) + case _ => scanNullIntolerantField(constraint).map(IsNotNull(_)) } /** - * Recursively explores the expressions which are null intolerant and returns all attributes - * in these expressions. + * Recursively explores the expressions which are null intolerant and returns all attributes and + * complex type extractors in these expressions. */ - private def scanNullIntolerantAttribute(expr: Expression): Seq[Attribute] = expr match { + private def scanNullIntolerantField(expr: Expression): Seq[Expression] = expr match { +case ev: ExtractValue => Seq(ev) --- End diff -- I agree adding more direct and independent test coverage for this change is a good idea. However, omitting this change will weaken the capabilities of this PR. It would also imply the removal of the failing test case in `ParquetSchemaPruningSuite`, which would imply two follow on PRs. The first would be to add this specific change plus the right test coverage. The next would be to restore the test case removed from 'ParquetSchemaPruningSuite'. Let me suggest an alternative. As this change is a valuable enhancement for this PR, let me try adding an appropriate test case in `InferFiltersFromConstraintsSuite` as part of this PR. That will eliminate the requirement for two more follow-on PRs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user mallman commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r190479026 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -162,7 +162,9 @@ case class FilterExec(condition: Expression, child: SparkPlan) val generatedIsNotNullChecks = new Array[Boolean](notNullPreds.length) val generated = otherPreds.map { c => val nullChecks = c.references.map { r => -val idx = notNullPreds.indexWhere { n => n.asInstanceOf[IsNotNull].child.semanticEquals(r)} +val idx = notNullPreds.indexWhere { n => + n.asInstanceOf[IsNotNull].child.references.contains(r) --- End diff -- Yes. IIRC a community member identified a bug and @viirya contributed a fix and unit test. See https://github.com/VideoAmp/spark-public/commit/8b5661cefc3aa12cc79d2af162b0d84ab2afa39f. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r189961165 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1256,8 +1256,18 @@ object SQLConf { "issues. Turn on this config to insert a local sort before actually doing repartition " + "to generate consistent repartition results. The performance of repartition() may go " + "down since we insert extra local sort before it.") +.booleanConf +.createWithDefault(true) + + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data. Currently Parquet is the only data source that " + --- End diff -- Thank you for pinging me, @gatorsmile . Let me check it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r189493854 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +if (SQLConf.get.nestedSchemaPruningEnabled) { + apply0(plan) +} else { + plan +} + + private def apply0(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a nested field, continue. Otherwise, +// return [[op]] +if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) { + val prunedSchema = requestedFields +.map { case (field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) + val dataSchemaFieldNames = dataSchema.fieldNames.toSet + val prunedDataSchema = +StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) + + // If the data schema is different from the pruned data schema, continue. Otherwise, + // return [[op]]. We effect this comparison by counting the number of "leaf" fields in + // each schemata, assuming the fields in [[prunedDataSchema]] are a subset of the fields + // in [[dataSchema]]. + if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { +val prunedParquetRelation = + hadoopFsRelation.copy(dataSchema = prunedDataSchema)(hadoopFsRelation.sparkSession) + +// We need to replace the expression ids of the pruned relation output attributes +// with the expression ids of the original relation output attributes so that +// references to the original relation's output are not broken +val outputIdMap = l.output.map(att => (att.name, att.exprId)).toMap +val prunedRelationOutput = + prunedParquetRelation +.schema +.toAttributes +.map { + case att if outputIdMap.contains(att.name) => +att.withExprId(outputIdMap(att.name)) + case att => att +} +val prunedRelation = + l.copy(relation = prunedParquetRelation, output = prunedRelationOutput) +
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r189493986 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, Expression, NamedExpression} +import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, ProjectionOverSchema, SelectedField} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation} +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.types.{ArrayType, DataType, MapType, StructField, StructType} + +/** + * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a + * [[ParquetRelation]]. By "Parquet column", we mean a column as defined in the + * Parquet format. In Spark SQL, a root-level Parquet column corresponds to a + * SQL column, and a nested Parquet column corresponds to a [[StructField]]. + */ +private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] { + override def apply(plan: LogicalPlan): LogicalPlan = +if (SQLConf.get.nestedSchemaPruningEnabled) { + apply0(plan) +} else { + plan +} + + private def apply0(plan: LogicalPlan): LogicalPlan = +plan transformDown { + case op @ PhysicalOperation(projects, filters, + l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, partitionSchema, +dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) => +val projectionFields = projects.flatMap(getFields) +val filterFields = filters.flatMap(getFields) +val requestedFields = (projectionFields ++ filterFields).distinct + +// If [[requestedFields]] includes a nested field, continue. Otherwise, +// return [[op]] +if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) { + val prunedSchema = requestedFields +.map { case (field, _) => StructType(Array(field)) } +.reduceLeft(_ merge _) + val dataSchemaFieldNames = dataSchema.fieldNames.toSet + val prunedDataSchema = +StructType(prunedSchema.filter(f => dataSchemaFieldNames.contains(f.name))) + + // If the data schema is different from the pruned data schema, continue. Otherwise, + // return [[op]]. We effect this comparison by counting the number of "leaf" fields in + // each schemata, assuming the fields in [[prunedDataSchema]] are a subset of the fields + // in [[dataSchema]]. + if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) { +val prunedParquetRelation = + hadoopFsRelation.copy(dataSchema = prunedDataSchema)(hadoopFsRelation.sparkSession) + +// We need to replace the expression ids of the pruned relation output attributes +// with the expression ids of the original relation output attributes so that +// references to the original relation's output are not broken +val outputIdMap = l.output.map(att => (att.name, att.exprId)).toMap +val prunedRelationOutput = + prunedParquetRelation +.schema +.toAttributes +.map { + case att if outputIdMap.contains(att.name) => +att.withExprId(outputIdMap(att.name)) + case att => att +} +val prunedRelation = + l.copy(relation = prunedParquetRelation, output = prunedRelationOutput) +
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r189491559 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala --- @@ -879,6 +879,15 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("select function over nested data") { --- End diff -- Without this PR, this test still can pass, right? Could you submit a separate PR for these test coverage improvement? We really welcome the test coverage improvement PRs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r189491063 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ColumnarFileFormat.scala --- @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources + +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.types.StructType + +/** + * An optional mix-in for columnar [[FileFormat]]s. This trait provides some helpful metadata when + * debugging a physical query plan. + */ +private[sql] trait ColumnarFileFormat { --- End diff -- Can we do this in a separate PR? No need to block this PR due to the discussion about this implementation. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r189489061 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -162,7 +162,9 @@ case class FilterExec(condition: Expression, child: SparkPlan) val generatedIsNotNullChecks = new Array[Boolean](notNullPreds.length) val generated = otherPreds.map { c => val nullChecks = c.references.map { r => -val idx = notNullPreds.indexWhere { n => n.asInstanceOf[IsNotNull].child.semanticEquals(r)} +val idx = notNullPreds.indexWhere { n => + n.asInstanceOf[IsNotNull].child.references.contains(r) --- End diff -- Is this change related? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r189489577 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala --- @@ -99,27 +100,28 @@ trait ConstraintHelper { } /** - * Infer the Attribute-specific IsNotNull constraints from the null intolerant child expressions - * of constraints. + * Infer the Attribute and ExtractValue-specific IsNotNull constraints from the null intolerant + * child expressions of constraints. */ private def inferIsNotNullConstraints(constraint: Expression): Seq[Expression] = constraint match { // When the root is IsNotNull, we can push IsNotNull through the child null intolerant // expressions - case IsNotNull(expr) => scanNullIntolerantAttribute(expr).map(IsNotNull(_)) + case IsNotNull(expr) => scanNullIntolerantField(expr).map(IsNotNull(_)) // Constraints always return true for all the inputs. That means, null will never be returned. // Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull through the child // null intolerant expressions. - case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_)) + case _ => scanNullIntolerantField(constraint).map(IsNotNull(_)) } /** - * Recursively explores the expressions which are null intolerant and returns all attributes - * in these expressions. + * Recursively explores the expressions which are null intolerant and returns all attributes and + * complex type extractors in these expressions. */ - private def scanNullIntolerantAttribute(expr: Expression): Seq[Attribute] = expr match { + private def scanNullIntolerantField(expr: Expression): Seq[Expression] = expr match { +case ev: ExtractValue => Seq(ev) --- End diff -- For this improvement, can we do it in a separate PR? The corresponding unit test case are needed in `InferFiltersFromConstraintsSuite` instead of `ParquetSchemaPruningSuite`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r189491217 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -286,7 +286,19 @@ case class FileSourceScanExec( } getOrElse { metadata } -withOptPartitionCount +val withOptColumnCount = relation.fileFormat match { + case columnar: ColumnarFileFormat => +SparkSession + .getActiveSession + .map { sparkSession => +val columnCount = columnar.columnCountForSchema(sparkSession, requiredSchema) +withOptPartitionCount + ("ColumnCount" -> columnCount.toString) --- End diff -- This needs to be in a separate PR as I suggested above. BTW, we could easily lose this metadata if this change does not have a test case. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r189479383 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1256,8 +1256,18 @@ object SQLConf { "issues. Turn on this config to insert a local sort before actually doing repartition " + "to generate consistent repartition results. The performance of repartition() may go " + "down since we insert extra local sort before it.") +.booleanConf +.createWithDefault(true) + + val NESTED_SCHEMA_PRUNING_ENABLED = +buildConf("spark.sql.nestedSchemaPruning.enabled") + .internal() + .doc("Prune nested fields from a logical relation's output which are unnecessary in " + +"satisfying a query. This optimization allows columnar file format readers to avoid " + +"reading unnecessary nested column data. Currently Parquet is the only data source that " + --- End diff -- How about ORC? cc @dongjoon-hyun Do you know whether it is also doable in the latest ORC version? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21320#discussion_r189492534 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala --- @@ -0,0 +1,432 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.planning + +import org.scalatest.BeforeAndAfterAll +import org.scalatest.exceptions.TestFailedException + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.dsl.plans._ +import org.apache.spark.sql.catalyst.expressions.NamedExpression +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.catalyst.plans.logical.LocalRelation +import org.apache.spark.sql.types._ + +// scalastyle:off line.size.limit +class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll { + // The test schema as a tree string, i.e. `schema.treeString` + // root + // |-- col1: string (nullable = false) + // |-- col2: struct (nullable = true) + // ||-- field1: integer (nullable = true) + // ||-- field2: array (nullable = true) + // |||-- element: integer (containsNull = false) + // ||-- field3: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: integer (nullable = true) + // ||||-- subfield3: array (nullable = true) + // |||||-- element: integer (containsNull = true) + // ||-- field4: map (nullable = true) + // |||-- key: string + // |||-- value: struct (valueContainsNull = false) + // ||||-- subfield1: integer (nullable = true) + // ||||-- subfield2: array (nullable = true) + // |||||-- element: integer (containsNull = false) + // ||-- field5: array (nullable = false) + // |||-- element: struct (containsNull = true) + // ||||-- subfield1: struct (nullable = false) + // |||||-- subsubfield1: integer (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||||-- subfield2: struct (nullable = true) + // |||||-- subsubfield1: struct (nullable = true) + // ||||||-- subsubsubfield1: string (nullable = true) + // |||||-- subsubfield2: integer (nullable = true) + // ||-- field6: struct (nullable = true) + // |||-- subfield1: string (nullable = false) + // |||-- subfield2: string (nullable = true) + // ||-- field7: struct (nullable = true) + // |||-- subfield1: struct (nullable = true) + // ||||-- subsubfield1: integer (nullable = true) + // ||||-- subsubfield2: integer (nullable = true) + // ||-- field8: map (nullable = true) + // |||-- key: string + // |||-- value: array (valueContainsNull = false) + // ||||-- element: struct (containsNull = true) + // |||||-- subfield1: integer (nullable = true) + // |||||-- subfield2: array (nullable = true) + // ||||||-- element: integer (containsNull = false) + // ||-- field9: map (nullable = true) + // |||-- key: string + // |||-- value: integer (valueContainsNull = false) + // |-- col3: array (nullable = false) + // ||-- element: struct (containsNull = false) + // |||-- field1: struct (nullable = true) + // ||||-- subfield1: integer (nullable = false) + // ||||-- subfield2: integer (nullable = true) + // |||-- field2: map (nullable = true) + // ||||-- key: string + // ||||-- value: integer
[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...
GitHub user mallman opened a pull request: https://github.com/apache/spark/pull/21320 [SPARK-4502][SQL] Parquet nested column pruning - foundation (Link to Jira: https://issues.apache.org/jira/browse/SPARK-4502) _N.B. This is a restart of PR #16578 which includes everything in that PR except the aggregation and join schema pruning rules. Relevant review comments from that PR should be considered incorporated by reference. Please avoid duplication in review by reviewing that PR first. The summary below is an edited copy of the summary of the previous PR._ ## What changes were proposed in this pull request? One of the hallmarks of a column-oriented data storage format is the ability to read data from a subset of columns, efficiently skipping reads from other columns. Spark has long had support for pruning unneeded top-level schema fields from the scan of a parquet file. For example, consider a table, `contacts`, backed by parquet with the following Spark SQL schema: ``` root |-- name: struct ||-- first: string ||-- last: string |-- address: string ``` Parquet stores this table's data in three physical columns: `name.first`, `name.last` and `address`. To answer the query ```SQL select address from contacts ``` Spark will read only from the `address` column of parquet data. However, to answer the query ```SQL select name.first from contacts ``` Spark will read `name.first` and `name.last` from parquet. This PR modifies Spark SQL to support a finer-grain of schema pruning. With this patch, Spark reads only the `name.first` column to answer the previous query. ### Implementation There are two main components of this patch. First, there is a `ParquetSchemaPruning` optimizer rule for gathering the required schema fields of a `PhysicalOperation` over a parquet file, constructing a new schema based on those required fields and rewriting the plan in terms of that pruned schema. The pruned schema fields are pushed down to the parquet requested read schema. `ParquetSchemaPruning` uses a new `ProjectionOverSchema` extractor for rewriting a catalyst expression in terms of a pruned schema. Second, the `ParquetRowConverter` has been patched to ensure the ordinals of the parquet columns read are correct for the pruned schema. `ParquetReadSupport` has been patched to address a compatibility mismatch between Spark's built in vectorized reader and the parquet-mr library's reader. ### Limitation Among the complex Spark SQL data types, this patch supports parquet column pruning of nested sequences of struct fields only. ## How was this patch tested? Care has been taken to ensure correctness and prevent regressions. A more advanced version of this patch incorporating optimizations for rewriting queries involving aggregations and joins has been running on a production Spark cluster at VideoAmp for several years. In that time, one bug was found and fixed early on, and we added a regression test for that bug. We forward-ported this patch to Spark master in June 2016 and have been running this patch against Spark 2.x branches on ad-hoc clusters since then. You can merge this pull request into a Git repository by running: $ git pull https://github.com/VideoAmp/spark-public spark-4502-parquet_column_pruning-foundation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21320.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21320 commit 9e301b37bb5863ef5fad8ec15f1d8f3d4b5e6a6f Author: Michael AllmanDate: 2016-06-24T17:21:24Z [SPARK-4502][SQL] Parquet nested column pruning --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org