Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21889#discussion_r209527485 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala --- @@ -0,0 +1,245 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import java.io.File + +import org.scalactic.Equality + +import org.apache.spark.sql.{DataFrame, QueryTest, Row} +import org.apache.spark.sql.catalyst.parser.CatalystSqlParser +import org.apache.spark.sql.execution.FileSourceScanExec +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSQLContext +import org.apache.spark.sql.types.StructType + +class ParquetSchemaPruningSuite + extends QueryTest + with ParquetTest + with SharedSQLContext { + case class FullName(first: String, middle: String, last: String) + case class Contact( + id: Int, + name: FullName, + address: String, + pets: Int, + friends: Array[FullName] = Array(), + relatives: Map[String, FullName] = Map()) + + val janeDoe = FullName("Jane", "X.", "Doe") + val johnDoe = FullName("John", "Y.", "Doe") + val susanSmith = FullName("Susan", "Z.", "Smith") + + private val contacts = + Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith), + relatives = Map("brother" -> johnDoe)) :: + Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> janeDoe)) :: Nil + + case class Name(first: String, last: String) + case class BriefContact(id: Int, name: Name, address: String) + + private val briefContacts = + BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") :: + BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil + + case class ContactWithDataPartitionColumn( + id: Int, + name: FullName, + address: String, + pets: Int, + friends: Array[FullName] = Array(), + relatives: Map[String, FullName] = Map(), + p: Int) + + case class BriefContactWithDataPartitionColumn(id: Int, name: Name, address: String, p: Int) + + private val contactsWithDataPartitionColumn = + contacts.map { case Contact(id, name, address, pets, friends, relatives) => + ContactWithDataPartitionColumn(id, name, address, pets, friends, relatives, 1) } + private val briefContactsWithDataPartitionColumn = + briefContacts.map { case BriefContact(id, name, address) => + BriefContactWithDataPartitionColumn(id, name, address, 2) } + + override def beforeEach(): Unit = { + super.beforeAll() + conf.setConf(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED, true) + } + + override def afterEach(): Unit = { + try { + conf.unsetConf(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED) + } finally { + super.afterEach() + } + } + + testSchemaPruning("select a single complex field") { + val query = sql("select name.middle from contacts") + checkScan(query, "struct<name:struct<middle:string>>") + checkAnswer(query.orderBy("id"), Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: Nil) + } + + testSchemaPruning("select a single complex field and its parent struct") { + val query = sql("select name.middle, name from contacts") + checkScan(query, "struct<name:struct<first:string,middle:string,last:string>>") + checkAnswer(query.orderBy("id"), + Row("X.", Row("Jane", "X.", "Doe")) :: + Row("Y.", Row("John", "Y.", "Doe")) :: + Row(null, Row("Janet", null, "Jones")) :: + Row(null, Row("Jim", null, "Jones")) :: + Nil) + } + + testSchemaPruning("select a single complex field array and its parent struct array") { + val query = sql("select friends.middle, friends from contacts where p=1") + checkScan(query, + "struct<friends:array<struct<first:string,middle:string,last:string>>>") + checkAnswer(query.orderBy("id"), + Row(Array("Z."), Array(Row("Susan", "Z.", "Smith"))) :: + Row(Array.empty[String], Array.empty[Row]) :: + Nil) + } + + testSchemaPruning("select a single complex field from a map entry and its parent map entry") { + val query = + sql("select relatives[\"brother\"].middle, relatives[\"brother\"] from contacts where p=1") + checkScan(query, + "struct<relatives:map<string,struct<first:string,middle:string,last:string>>>") + checkAnswer(query.orderBy("id"), + Row("Y.", Row("John", "Y.", "Doe")) :: + Row(null, null) :: + Nil) + } + + testSchemaPruning("select a single complex field and the partition column") { + val query = sql("select name.middle, p from contacts") + checkScan(query, "struct<name:struct<middle:string>>") + checkAnswer(query.orderBy("id"), + Row("X.", 1) :: Row("Y.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil) + } + + ignore("partial schema intersection - select missing subfield") { + val query = sql("select name.middle, address from contacts where p=2") + checkScan(query, "struct<name:struct<middle:string>,address:string>") + checkAnswer(query.orderBy("id"), + Row(null, "567 Maple Drive") :: + Row(null, "6242 Ash Street") :: Nil) + } + + testSchemaPruning("no unnecessary schema pruning") { + val query = + sql("select id, name.last, name.middle, name.first, relatives[''].last, " + + "relatives[''].middle, relatives[''].first, friends[0].last, friends[0].middle, " + + "friends[0].first, pets, address from contacts where p=2") + // We've selected every field in the schema. Therefore, no schema pruning should be performed. + // We check this by asserting that the scanned schema of the query is identical to the schema + // of the contacts relation, even though the fields are selected in different orders. + checkScan(query, + "struct<id:int,name:struct<first:string,middle:string,last:string>,address:string,pets:int," + + "friends:array<struct<first:string,middle:string,last:string>>," + + "relatives:map<string,struct<first:string,middle:string,last:string>>>") + checkAnswer(query.orderBy("id"), + Row(2, "Jones", null, "Janet", null, null, null, null, null, null, null, "567 Maple Drive") :: + Row(3, "Jones", null, "Jim", null, null, null, null, null, null, null, "6242 Ash Street") :: + Nil) + } + + testSchemaPruning("empty schema intersection") { + val query = sql("select name.middle from contacts where p=2") + checkScan(query, "struct<name:struct<middle:string>>") + checkAnswer(query.orderBy("id"), + Row(null) :: Row(null) :: Nil) + } + + private def testSchemaPruning(testName: String)(testThunk: => Unit) { + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true") { + test(s"Spark vectorized reader - without partition data column - $testName") { + withContacts(testThunk) + } + test(s"Spark vectorized reader - with partition data column - $testName") { + withContactsWithDataPartitionColumn(testThunk) + } + } + + withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") { + test(s"Native Parquet reader - without partition data column - $testName") { + withContacts(testThunk) + } + test(s"Native Parquet reader - with partition data column - $testName") { + withContactsWithDataPartitionColumn(testThunk) + } + } + } + + private def withContacts(testThunk: => Unit) { + withTempPath { dir => + val path = dir.getCanonicalPath + + makeParquetFile(contacts, new File(path + "/contacts/p=1")) + makeParquetFile(briefContacts, new File(path + "/contacts/p=2")) + + spark.read.parquet(path + "/contacts").createOrReplaceTempView("contacts") + + testThunk + } + } + + private def withContactsWithDataPartitionColumn(testThunk: => Unit) { + withTempPath { dir => + val path = dir.getCanonicalPath + + makeParquetFile(contactsWithDataPartitionColumn, new File(path + "/contacts/p=1")) + makeParquetFile(briefContactsWithDataPartitionColumn, new File(path + "/contacts/p=2")) + + spark.read.parquet(path + "/contacts").createOrReplaceTempView("contacts") + + testThunk + } + } + + private val schemaEquality = new Equality[StructType] { + override def areEqual(a: StructType, b: Any): Boolean = + b match { + case otherType: StructType => a sameType otherType --- End diff -- nit: avoid infix notation per https://github.com/databricks/scala-style-guide#infix
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org