[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/21320


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-23 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r212476709
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 ---
@@ -31,6 +32,7 @@ import org.apache.spark.sql.types.StructType
 class ParquetSchemaPruningSuite
 extends QueryTest
 with ParquetTest
+with SchemaPruningTest
--- End diff --

nvm. This commit just adds it back


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-23 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r212476400
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 ---
@@ -31,6 +32,7 @@ import org.apache.spark.sql.types.StructType
 class ParquetSchemaPruningSuite
 extends QueryTest
 with ParquetTest
+with SchemaPruningTest
--- End diff --

In my local, all of them still can pass. Am I wrong?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-23 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r212476268
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 ---
@@ -31,6 +32,7 @@ import org.apache.spark.sql.types.StructType
 class ParquetSchemaPruningSuite
 extends QueryTest
 with ParquetTest
+with SchemaPruningTest
--- End diff --

Why this is removed?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-23 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r212414888
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 ---
@@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter(
 
   override def start(): Unit = {
 var i = 0
-while (i < currentRow.numFields) {
+while (i < fieldConverters.length) {
   fieldConverters(i).updater.start()
   currentRow.setNullAt(i)
   i += 1
 }
+while (i < currentRow.numFields) {
--- End diff --

My suggestion is to remove the changes from this file at first. We can 
handle the ignored cases in the follow-up PRs. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-23 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r212396370
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 ---
@@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter(
 
   override def start(): Unit = {
 var i = 0
-while (i < currentRow.numFields) {
+while (i < fieldConverters.length) {
   fieldConverters(i).updater.start()
   currentRow.setNullAt(i)
   i += 1
 }
+while (i < currentRow.numFields) {
--- End diff --

These changes are related to my fix for the ignored unit test. If I apply 
my fix but keep the master version of this file, 24 unit tests fail. If I apply 
my fix along with this file diff then all tests pass, including the test that 
is currently ignored.

I'm not sure I can develop a unit test for this current commit that should 
pass but will fail without this file's changes. I haven't spent any time 
thinking about it, and I really need to work on other things right now.

If you want I will back out this change. However, I will re-incorporate it 
in a follow-on PR.

Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-23 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r212388958
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 ---
@@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter(
 
   override def start(): Unit = {
 var i = 0
-while (i < currentRow.numFields) {
+while (i < fieldConverters.length) {
   fieldConverters(i).updater.start()
   currentRow.setNullAt(i)
   i += 1
 }
+while (i < currentRow.numFields) {
--- End diff --

I'll get back to you on this shortly. Thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-23 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r212194825
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 ---
@@ -202,11 +204,15 @@ private[parquet] class ParquetRowConverter(
 
   override def start(): Unit = {
 var i = 0
-while (i < currentRow.numFields) {
+while (i < fieldConverters.length) {
   fieldConverters(i).updater.start()
   currentRow.setNullAt(i)
   i += 1
 }
+while (i < currentRow.numFields) {
--- End diff --

@mallman It sounds like the changes in this file are not needed. Could you 
help me point out which test cases will fail?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-26 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r205340696
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+if (SQLConf.get.nestedSchemaPruningEnabled) {
+  apply0(plan)
+} else {
+  plan
+}
+
+  private def apply0(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
--- End diff --

Also honestly this logic looks convoluted.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-25 Thread ajacques
Github user ajacques commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r205329769
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SelectedField.scala
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that builds a 
[[org.apache.spark.sql.types.StructField]] from a Catalyst
+ * complex type extractor. For example, consider a relation with the 
following schema:
+ *
+ *   {{{
+ *   root
+ *|-- name: struct (nullable = true)
+ *||-- first: string (nullable = true)
+ *||-- last: string (nullable = true)
+ *}}}
+ *
+ * Further, suppose we take the select expression `name.first`. This will 
parse into an
+ * `Alias(child, "first")`. Ignoring the alias, `child` matches the 
following pattern:
+ *
+ *   {{{
+ *   GetStructFieldObject(
+ * AttributeReference("name", StructType(_), _, _),
+ * StructField("first", StringType, _, _))
+ *   }}}
+ *
+ * [[SelectedField]] converts that expression into
+ *
+ *   {{{
+ *   StructField("name", StructType(Array(StructField("first", 
StringType
+ *   }}}
+ *
+ * by mapping each complex type extractor to a 
[[org.apache.spark.sql.types.StructField]] with the
+ * same name as its child (or "parent" going right to left in the select 
expression) and a data
+ * type appropriate to the complex type extractor. In our example, the 
name of the child expression
+ * is "name" and its data type is a 
[[org.apache.spark.sql.types.StructType]] with a single string
+ * field named "first".
+ *
+ * @param expr the top-level complex type extractor
+ */
+object SelectedField {
+  def unapply(expr: Expression): Option[StructField] = {
--- End diff --

```
Error:(61, 12) constructor cannot be instantiated to expected type;
 found   : org.apache.spark.sql.catalyst.expressions.Alias
 required: org.apache.spark.sql.catalyst.expressions.ExtractValue
  case Alias(child, _) => child
```

Alias takes: `Alias(child: Expression, name: String)`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-25 Thread ajacques
Github user ajacques commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r205329633
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that projects an expression over a given schema. Data 
types,
+ * field indexes and field counts of complex type extractors and attributes
+ * are adjusted to fit the schema. All other expressions are left as-is. 
This
+ * class is motivated by columnar nested schema pruning.
+ */
+case class ProjectionOverSchema(schema: StructType) {
--- End diff --

We can move this to `sql.execution` if we move all three classes: 
`ProjectionOverSchema`, `GetStructFieldObject`, and `SelectedField`. Is there a 
difference in the catalyst.planning vs the execution packages?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-25 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r205063684
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that projects an expression over a given schema. Data 
types,
+ * field indexes and field counts of complex type extractors and attributes
+ * are adjusted to fit the schema. All other expressions are left as-is. 
This
+ * class is motivated by columnar nested schema pruning.
+ */
+case class ProjectionOverSchema(schema: StructType) {
--- End diff --

can we move it to `catalyst`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-25 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r205022974
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala
 ---
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.exceptions.TestFailedException
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.types._
+
+// scalastyle:off line.size.limit
+class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
+  // The test schema as a tree string, i.e. `schema.treeString`
+  // root
+  //  |-- col1: string (nullable = false)
+  //  |-- col2: struct (nullable = true)
+  //  ||-- field1: integer (nullable = true)
+  //  ||-- field2: array (nullable = true)
+  //  |||-- element: integer (containsNull = false)
+  //  ||-- field3: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  ||||-- subfield3: array (nullable = true)
+  //  |||||-- element: integer (containsNull = true)
+  //  ||-- field4: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: struct (valueContainsNull = false)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: array (nullable = true)
+  //  |||||-- element: integer (containsNull = false)
+  //  ||-- field5: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: struct (nullable = false)
+  //  |||||-- subsubfield1: integer (nullable = true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||||-- subfield2: struct (nullable = true)
+  //  |||||-- subsubfield1: struct (nullable = true)
+  //  ||||||-- subsubsubfield1: string (nullable = 
true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field6: struct (nullable = true)
+  //  |||-- subfield1: string (nullable = false)
+  //  |||-- subfield2: string (nullable = true)
+  //  ||-- field7: struct (nullable = true)
+  //  |||-- subfield1: struct (nullable = true)
+  //  ||||-- subsubfield1: integer (nullable = true)
+  //  ||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field8: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: array (valueContainsNull = false)
+  //  ||||-- element: struct (containsNull = true)
+  //  |||||-- subfield1: integer (nullable = true)
+  //  |||||-- subfield2: array (nullable = true)
+  //  ||||||-- element: integer (containsNull = false)
+  //  ||-- field9: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: integer (valueContainsNull = false)
+  //  |-- col3: array (nullable = false)
+  //  ||-- element: struct (containsNull = false)
+  //  |||-- field1: struct (nullable = true)
+  //  ||||-- subfield1: integer (nullable = false)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  |||-- field2: map (nullable = true)
+  //  ||||-- key: string
+  //  ||||-- value: integer 

[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-25 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r205022799
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.execution.FileSchemaPruningTest
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetSchemaPruningSuite
+extends QueryTest
+with ParquetTest
+with FileSchemaPruningTest
+with SharedSQLContext {
+  case class FullName(first: String, middle: String, last: String)
+  case class Contact(name: FullName, address: String, pets: Int, friends: 
Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map())
+
+  val contacts =
+Contact(FullName("Jane", "X.", "Doe"), "123 Main Street", 1) ::
+Contact(FullName("John", "Y.", "Doe"), "321 Wall Street", 3) :: Nil
+
+  case class Name(first: String, last: String)
+  case class BriefContact(name: Name, address: String)
+
+  val briefContacts =
+BriefContact(Name("Janet", "Jones"), "567 Maple Drive") ::
+BriefContact(Name("Jim", "Jones"), "6242 Ash Street") :: Nil
+
+  case class ContactWithDataPartitionColumn(name: FullName, address: 
String, pets: Int,
+friends: Array[FullName] = Array(), relatives: Map[String, FullName] = 
Map(), p: Int)
+
+  case class BriefContactWithDataPartitionColumn(name: Name, address: 
String, p: Int)
+
+  val contactsWithDataPartitionColumn =
+contacts.map { case Contact(name, address, pets, friends, relatives) =>
+  ContactWithDataPartitionColumn(name, address, pets, friends, 
relatives, 1) }
+  val briefContactsWithDataPartitionColumn =
+briefContacts.map { case BriefContact(name: Name, address: String) =>
+  BriefContactWithDataPartitionColumn(name, address, 2) }
+
+  testSchemaPruning("select a single complex field") {
+val query = sql("select name.middle from contacts")
+checkScanSchemata(query, "struct>")
+checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: 
Nil)
+  }
+
+  testSchemaPruning("select a single complex field and the partition 
column") {
+val query = sql("select name.middle, p from contacts")
+checkScanSchemata(query, "struct>")
+checkAnswer(query, Row("X.", 1) :: Row("Y.", 1) :: Row(null, 2) :: 
Row(null, 2) :: Nil)
+  }
+
+  ignore("partial schema intersection - select missing subfield") {
+val query = sql("select name.middle, address from contacts where p=2")
+checkScanSchemata(query, 
"struct,address:string>")
+checkAnswer(query,
+  Row(null, "567 Maple Drive") ::
+  Row(null, "6242 Ash Street") :: Nil)
+  }
+
+  testSchemaPruning("no unnecessary schema pruning") {
+val query =
+  sql("select name.last, name.middle, name.first, relatives[''].last, 
" +
+"relatives[''].middle, relatives[''].first, friends[0].last, 
friends[0].middle, " +
+"friends[0].first, pets, address from contacts where p=2")
+// We've selected every field in the schema. Therefore, no schema 
pruning should be performed.
+// We check this by asserting that the scanned schema of the query is 
identical to the schema
+// of the contacts relation, even though the fields are selected in 
different orders.
+checkScanSchemata(query,
+  
"struct,address:string,pets:int,"
 +
+  "friends:array>," +
+  
"relatives:map>>")
+checkAnswer(query,
+  Row("Jones", null, "Janet", null, null, null, null, null, null, 
null, "567 Maple Drive") ::
+  Row("Jones", null, "Jim", null, null, null, null, null, null, null, 
"6242 

[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-25 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r205022895
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+if (SQLConf.get.nestedSchemaPruningEnabled) {
+  apply0(plan)
+} else {
+  plan
+}
+
+  private def apply0(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) 
=>
+val projectionRootFields = projects.flatMap(getRootFields)
+val filterRootFields = filters.flatMap(getRootFields)
+val requestedRootFields = (projectionRootFields ++ 
filterRootFields).distinct
+
+// If [[requestedRootFields]] includes a nested field, continue. 
Otherwise,
+// return [[op]]
+if (requestedRootFields.exists { case RootField(_, derivedFromAtt) 
=> !derivedFromAtt }) {
+  val prunedSchema = requestedRootFields
+.map { case RootField(field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
+  val dataSchemaFieldNames = dataSchema.fieldNames.toSet
+  val prunedDataSchema =
+StructType(prunedSchema.filter(f => 
dataSchemaFieldNames.contains(f.name)))
+
+  // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+  // return [[op]]. We effect this comparison by counting the 
number of "leaf" fields in
+  // each schemata, assuming the fields in [[prunedDataSchema]] 
are a subset of the fields
+  // in [[dataSchema]].
+  if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
--- End diff --

No comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-25 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r205021712
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala
 ---
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.exceptions.TestFailedException
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.types._
+
+// scalastyle:off line.size.limit
--- End diff --

No comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-25 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r205021469
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+if (SQLConf.get.nestedSchemaPruningEnabled) {
+  apply0(plan)
+} else {
+  plan
+}
+
+  private def apply0(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) 
=>
+val projectionRootFields = projects.flatMap(getRootFields)
+val filterRootFields = filters.flatMap(getRootFields)
+val requestedRootFields = (projectionRootFields ++ 
filterRootFields).distinct
+
+// If [[requestedRootFields]] includes a nested field, continue. 
Otherwise,
+// return [[op]]
+if (requestedRootFields.exists { case RootField(_, derivedFromAtt) 
=> !derivedFromAtt }) {
+  val prunedSchema = requestedRootFields
+.map { case RootField(field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
+  val dataSchemaFieldNames = dataSchema.fieldNames.toSet
+  val prunedDataSchema =
+StructType(prunedSchema.filter(f => 
dataSchemaFieldNames.contains(f.name)))
+
+  // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+  // return [[op]]. We effect this comparison by counting the 
number of "leaf" fields in
+  // each schemata, assuming the fields in [[prunedDataSchema]] 
are a subset of the fields
+  // in [[dataSchema]].
+  if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+val prunedParquetRelation =
+  hadoopFsRelation.copy(dataSchema = 
prunedDataSchema)(hadoopFsRelation.sparkSession)
+
+// We need to replace the expression ids of the pruned 
relation output attributes
+// with the expression ids of the original relation output 
attributes so that
+// references to the original relation's output are not broken
+val outputIdMap = l.output.map(att => (att.name, 
att.exprId)).toMap
+val prunedRelationOutput =
+  prunedParquetRelation
+.schema
+.toAttributes
+.map {
+  case att if outputIdMap.contains(att.name) =>
+att.withExprId(outputIdMap(att.name))
+  case att => att
+}
+val prunedRelation =
+  l.copy(relation = 

[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-25 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r205021282
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that projects an expression over a given schema. Data 
types,
+ * field indexes and field counts of complex type extractors and attributes
+ * are adjusted to fit the schema. All other expressions are left as-is. 
This
+ * class is motivated by columnar nested schema pruning.
+ */
+case class ProjectionOverSchema(schema: StructType) {
--- End diff --

Okay. So...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-25 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r205020970
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -71,9 +80,22 @@ private[parquet] class ParquetReadSupport(val convertTz: 
Option[TimeZone])
   StructType.fromString(schemaString)
 }
 
-val parquetRequestedSchema =
+val clippedParquetSchema =
   ParquetReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
 
+val parquetRequestedSchema = if (parquetMrCompatibility) {
+  // Parquet-mr will throw an exception if we try to read a superset 
of the file's schema.
--- End diff --

This change is not part of this PR.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-25 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r205021140
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.execution.FileSchemaPruningTest
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetSchemaPruningSuite
+extends QueryTest
+with ParquetTest
+with FileSchemaPruningTest
+with SharedSQLContext {
--- End diff --

No comment.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r204607379
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala
 ---
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.exceptions.TestFailedException
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.types._
+
+// scalastyle:off line.size.limit
+class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
+  // The test schema as a tree string, i.e. `schema.treeString`
+  // root
+  //  |-- col1: string (nullable = false)
+  //  |-- col2: struct (nullable = true)
+  //  ||-- field1: integer (nullable = true)
+  //  ||-- field2: array (nullable = true)
+  //  |||-- element: integer (containsNull = false)
+  //  ||-- field3: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  ||||-- subfield3: array (nullable = true)
+  //  |||||-- element: integer (containsNull = true)
+  //  ||-- field4: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: struct (valueContainsNull = false)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: array (nullable = true)
+  //  |||||-- element: integer (containsNull = false)
+  //  ||-- field5: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: struct (nullable = false)
+  //  |||||-- subsubfield1: integer (nullable = true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||||-- subfield2: struct (nullable = true)
+  //  |||||-- subsubfield1: struct (nullable = true)
+  //  ||||||-- subsubsubfield1: string (nullable = 
true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field6: struct (nullable = true)
+  //  |||-- subfield1: string (nullable = false)
+  //  |||-- subfield2: string (nullable = true)
+  //  ||-- field7: struct (nullable = true)
+  //  |||-- subfield1: struct (nullable = true)
+  //  ||||-- subsubfield1: integer (nullable = true)
+  //  ||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field8: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: array (valueContainsNull = false)
+  //  ||||-- element: struct (containsNull = true)
+  //  |||||-- subfield1: integer (nullable = true)
+  //  |||||-- subfield2: array (nullable = true)
+  //  ||||||-- element: integer (containsNull = false)
+  //  ||-- field9: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: integer (valueContainsNull = false)
+  //  |-- col3: array (nullable = false)
+  //  ||-- element: struct (containsNull = false)
+  //  |||-- field1: struct (nullable = true)
+  //  ||||-- subfield1: integer (nullable = false)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  |||-- field2: map (nullable = true)
+  //  ||||-- key: string
+  //  ||||-- value: integer 

[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r204289193
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+if (SQLConf.get.nestedSchemaPruningEnabled) {
+  apply0(plan)
+} else {
+  plan
+}
+
+  private def apply0(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) 
=>
+val projectionRootFields = projects.flatMap(getRootFields)
+val filterRootFields = filters.flatMap(getRootFields)
+val requestedRootFields = (projectionRootFields ++ 
filterRootFields).distinct
+
+// If [[requestedRootFields]] includes a nested field, continue. 
Otherwise,
+// return [[op]]
+if (requestedRootFields.exists { case RootField(_, derivedFromAtt) 
=> !derivedFromAtt }) {
+  val prunedSchema = requestedRootFields
+.map { case RootField(field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
+  val dataSchemaFieldNames = dataSchema.fieldNames.toSet
+  val prunedDataSchema =
+StructType(prunedSchema.filter(f => 
dataSchemaFieldNames.contains(f.name)))
+
+  // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+  // return [[op]]. We effect this comparison by counting the 
number of "leaf" fields in
+  // each schemata, assuming the fields in [[prunedDataSchema]] 
are a subset of the fields
+  // in [[dataSchema]].
+  if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
--- End diff --

Can we make some private functions for these? Looks hard to follow.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r204288547
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.execution.FileSchemaPruningTest
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetSchemaPruningSuite
+extends QueryTest
+with ParquetTest
+with FileSchemaPruningTest
+with SharedSQLContext {
+  case class FullName(first: String, middle: String, last: String)
+  case class Contact(name: FullName, address: String, pets: Int, friends: 
Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map())
+
+  val contacts =
+Contact(FullName("Jane", "X.", "Doe"), "123 Main Street", 1) ::
+Contact(FullName("John", "Y.", "Doe"), "321 Wall Street", 3) :: Nil
+
+  case class Name(first: String, last: String)
+  case class BriefContact(name: Name, address: String)
+
+  val briefContacts =
+BriefContact(Name("Janet", "Jones"), "567 Maple Drive") ::
+BriefContact(Name("Jim", "Jones"), "6242 Ash Street") :: Nil
+
+  case class ContactWithDataPartitionColumn(name: FullName, address: 
String, pets: Int,
+friends: Array[FullName] = Array(), relatives: Map[String, FullName] = 
Map(), p: Int)
+
+  case class BriefContactWithDataPartitionColumn(name: Name, address: 
String, p: Int)
+
+  val contactsWithDataPartitionColumn =
+contacts.map { case Contact(name, address, pets, friends, relatives) =>
+  ContactWithDataPartitionColumn(name, address, pets, friends, 
relatives, 1) }
+  val briefContactsWithDataPartitionColumn =
+briefContacts.map { case BriefContact(name: Name, address: String) =>
+  BriefContactWithDataPartitionColumn(name, address, 2) }
+
+  testSchemaPruning("select a single complex field") {
+val query = sql("select name.middle from contacts")
+checkScanSchemata(query, "struct>")
+checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: 
Nil)
+  }
+
+  testSchemaPruning("select a single complex field and the partition 
column") {
+val query = sql("select name.middle, p from contacts")
+checkScanSchemata(query, "struct>")
+checkAnswer(query, Row("X.", 1) :: Row("Y.", 1) :: Row(null, 2) :: 
Row(null, 2) :: Nil)
+  }
+
+  ignore("partial schema intersection - select missing subfield") {
+val query = sql("select name.middle, address from contacts where p=2")
+checkScanSchemata(query, 
"struct,address:string>")
+checkAnswer(query,
+  Row(null, "567 Maple Drive") ::
+  Row(null, "6242 Ash Street") :: Nil)
+  }
+
+  testSchemaPruning("no unnecessary schema pruning") {
+val query =
+  sql("select name.last, name.middle, name.first, relatives[''].last, 
" +
+"relatives[''].middle, relatives[''].first, friends[0].last, 
friends[0].middle, " +
+"friends[0].first, pets, address from contacts where p=2")
+// We've selected every field in the schema. Therefore, no schema 
pruning should be performed.
+// We check this by asserting that the scanned schema of the query is 
identical to the schema
+// of the contacts relation, even though the fields are selected in 
different orders.
+checkScanSchemata(query,
+  
"struct,address:string,pets:int,"
 +
+  "friends:array>," +
+  
"relatives:map>>")
+checkAnswer(query,
+  Row("Jones", null, "Janet", null, null, null, null, null, null, 
null, "567 Maple Drive") ::
+  Row("Jones", null, "Jim", null, null, null, null, null, null, null, 

[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r204288381
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala
 ---
@@ -0,0 +1,388 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.exceptions.TestFailedException
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.types._
+
+// scalastyle:off line.size.limit
--- End diff --

tiny nit: I think we don't really have to disable the length limit.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-22 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r204288233
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+if (SQLConf.get.nestedSchemaPruningEnabled) {
+  apply0(plan)
+} else {
+  plan
+}
+
+  private def apply0(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) 
=>
+val projectionRootFields = projects.flatMap(getRootFields)
+val filterRootFields = filters.flatMap(getRootFields)
+val requestedRootFields = (projectionRootFields ++ 
filterRootFields).distinct
+
+// If [[requestedRootFields]] includes a nested field, continue. 
Otherwise,
+// return [[op]]
+if (requestedRootFields.exists { case RootField(_, derivedFromAtt) 
=> !derivedFromAtt }) {
+  val prunedSchema = requestedRootFields
+.map { case RootField(field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
+  val dataSchemaFieldNames = dataSchema.fieldNames.toSet
+  val prunedDataSchema =
+StructType(prunedSchema.filter(f => 
dataSchemaFieldNames.contains(f.name)))
+
+  // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+  // return [[op]]. We effect this comparison by counting the 
number of "leaf" fields in
+  // each schemata, assuming the fields in [[prunedDataSchema]] 
are a subset of the fields
+  // in [[dataSchema]].
+  if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+val prunedParquetRelation =
+  hadoopFsRelation.copy(dataSchema = 
prunedDataSchema)(hadoopFsRelation.sparkSession)
+
+// We need to replace the expression ids of the pruned 
relation output attributes
+// with the expression ids of the original relation output 
attributes so that
+// references to the original relation's output are not broken
+val outputIdMap = l.output.map(att => (att.name, 
att.exprId)).toMap
+val prunedRelationOutput =
+  prunedParquetRelation
+.schema
+.toAttributes
+.map {
+  case att if outputIdMap.contains(att.name) =>
+att.withExprId(outputIdMap(att.name))
+  case att => att
+}
+val prunedRelation =
+  l.copy(relation = 

[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-21 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r204209033
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1298,8 +1298,18 @@ object SQLConf {
 "issues. Turn on this config to insert a local sort before 
actually doing repartition " +
 "to generate consistent repartition results. The performance of 
repartition() may go " +
 "down since we insert extra local sort before it.")
+.booleanConf
+.createWithDefault(true)
+
+  val NESTED_SCHEMA_PRUNING_ENABLED =
+buildConf("spark.sql.nestedSchemaPruning.enabled")
+  .internal()
+  .doc("Prune nested fields from a logical relation's output which are 
unnecessary in " +
+"satisfying a query. This optimization allows columnar file format 
readers to avoid " +
+"reading unnecessary nested column data. Currently Parquet is the 
only data source that " +
+"implements this optimization.")
   .booleanConf
-  .createWithDefault(true)
+  .createWithDefault(false)
--- End diff --

I'm against enabling this feature by default with a known failing test 
case. For example, 
https://github.com/apache/spark/pull/21320/files#diff-0c6c7481232e9637b91c179f1005426aR71.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-21 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r204208518
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.exceptions.TestFailedException
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.types._
+
+// scalastyle:off line.size.limit
+class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
+  // The test schema as a tree string, i.e. `schema.treeString`
+  // root
+  //  |-- col1: string (nullable = false)
+  //  |-- col2: struct (nullable = true)
+  //  ||-- field1: integer (nullable = true)
+  //  ||-- field2: array (nullable = true)
+  //  |||-- element: integer (containsNull = false)
+  //  ||-- field3: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  ||||-- subfield3: array (nullable = true)
+  //  |||||-- element: integer (containsNull = true)
+  //  ||-- field4: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: struct (valueContainsNull = false)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: array (nullable = true)
+  //  |||||-- element: integer (containsNull = false)
+  //  ||-- field5: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: struct (nullable = false)
+  //  |||||-- subsubfield1: integer (nullable = true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||||-- subfield2: struct (nullable = true)
+  //  |||||-- subsubfield1: struct (nullable = true)
+  //  ||||||-- subsubsubfield1: string (nullable = 
true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field6: struct (nullable = true)
+  //  |||-- subfield1: string (nullable = false)
+  //  |||-- subfield2: string (nullable = true)
+  //  ||-- field7: struct (nullable = true)
+  //  |||-- subfield1: struct (nullable = true)
+  //  ||||-- subsubfield1: integer (nullable = true)
+  //  ||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field8: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: array (valueContainsNull = false)
+  //  ||||-- element: struct (containsNull = true)
+  //  |||||-- subfield1: integer (nullable = true)
+  //  |||||-- subfield2: array (nullable = true)
+  //  ||||||-- element: integer (containsNull = false)
+  //  ||-- field9: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: integer (valueContainsNull = false)
+  //  |-- col3: array (nullable = false)
+  //  ||-- element: struct (containsNull = false)
+  //  |||-- field1: struct (nullable = true)
+  //  ||||-- subfield1: integer (nullable = false)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  |||-- field2: map (nullable = true)
+  //  ||||-- key: string
+  //  ||||-- value: integer 

[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-21 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r204206252
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.execution.FileSchemaPruningTest
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetSchemaPruningSuite
+extends QueryTest
+with ParquetTest
+with FileSchemaPruningTest
+with SharedSQLContext {
+  case class FullName(first: String, middle: String, last: String)
+  case class Contact(name: FullName, address: String, pets: Int, friends: 
Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map())
+
+  val contacts =
+Contact(FullName("Jane", "X.", "Doe"), "123 Main Street", 1) ::
+Contact(FullName("John", "Y.", "Doe"), "321 Wall Street", 3) :: Nil
+
+  case class Name(first: String, last: String)
+  case class BriefContact(name: Name, address: String)
+
+  val briefContacts =
+BriefContact(Name("Janet", "Jones"), "567 Maple Drive") ::
+BriefContact(Name("Jim", "Jones"), "6242 Ash Street") :: Nil
+
+  case class ContactWithDataPartitionColumn(name: FullName, address: 
String, pets: Int,
+friends: Array[FullName] = Array(), relatives: Map[String, FullName] = 
Map(), p: Int)
+
+  case class BriefContactWithDataPartitionColumn(name: Name, address: 
String, p: Int)
+
+  val contactsWithDataPartitionColumn =
+contacts.map { case Contact(name, address, pets, friends, relatives) =>
+  ContactWithDataPartitionColumn(name, address, pets, friends, 
relatives, 1) }
+  val briefContactsWithDataPartitionColumn =
+briefContacts.map { case BriefContact(name: Name, address: String) =>
+  BriefContactWithDataPartitionColumn(name, address, 2) }
+
+  testSchemaPruning("select a single complex field") {
+val query = sql("select name.middle from contacts")
+checkScanSchemata(query, "struct>")
+checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: 
Nil)
+  }
+
+  testSchemaPruning("select a single complex field and the partition 
column") {
+val query = sql("select name.middle, p from contacts")
+checkScanSchemata(query, "struct>")
+checkAnswer(query, Row("X.", 1) :: Row("Y.", 1) :: Row(null, 2) :: 
Row(null, 2) :: Nil)
+  }
+
+  testSchemaPruning("partial schema intersection - select missing 
subfield") {
+val query = sql("select name.middle, address from contacts where p=2")
+checkScanSchemata(query, 
"struct,address:string>")
+checkAnswer(query,
+  Row(null, "567 Maple Drive") ::
+  Row(null, "6242 Ash Street") :: Nil)
+  }
+
+  testSchemaPruning("no unnecessary schema pruning") {
+val query =
+  sql("select name.last, name.middle, name.first, relatives[''].last, 
" +
+"relatives[''].middle, relatives[''].first, friends[0].last, 
friends[0].middle, " +
+"friends[0].first, pets, address from contacts where p=2")
+// We've selected every field in the schema. Therefore, no schema 
pruning should be performed.
+// We check this by asserting that the scanned schema of the query is 
identical to the schema
+// of the contacts relation, even though the fields are selected in 
different orders.
+checkScanSchemata(query,
+  
"struct,address:string,pets:int,"
 +
+  "friends:array>," +
+  
"relatives:map>>")
+checkAnswer(query,
+  Row("Jones", null, "Janet", null, null, null, null, null, null, 
null, "567 Maple Drive") ::
+  Row("Jones", null, "Jim", null, null, null, null, null, null, 

[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-21 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r204206072
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.execution.FileSchemaPruningTest
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetSchemaPruningSuite
+extends QueryTest
+with ParquetTest
+with FileSchemaPruningTest
+with SharedSQLContext {
+  case class FullName(first: String, middle: String, last: String)
+  case class Contact(name: FullName, address: String, pets: Int, friends: 
Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map())
+
+  val contacts =
+Contact(FullName("Jane", "X.", "Doe"), "123 Main Street", 1) ::
+Contact(FullName("John", "Y.", "Doe"), "321 Wall Street", 3) :: Nil
+
+  case class Name(first: String, last: String)
+  case class BriefContact(name: Name, address: String)
+
+  val briefContacts =
+BriefContact(Name("Janet", "Jones"), "567 Maple Drive") ::
+BriefContact(Name("Jim", "Jones"), "6242 Ash Street") :: Nil
+
+  case class ContactWithDataPartitionColumn(name: FullName, address: 
String, pets: Int,
+friends: Array[FullName] = Array(), relatives: Map[String, FullName] = 
Map(), p: Int)
+
+  case class BriefContactWithDataPartitionColumn(name: Name, address: 
String, p: Int)
+
+  val contactsWithDataPartitionColumn =
+contacts.map { case Contact(name, address, pets, friends, relatives) =>
+  ContactWithDataPartitionColumn(name, address, pets, friends, 
relatives, 1) }
+  val briefContactsWithDataPartitionColumn =
+briefContacts.map { case BriefContact(name: Name, address: String) =>
+  BriefContactWithDataPartitionColumn(name, address, 2) }
+
+  testSchemaPruning("select a single complex field") {
+val query = sql("select name.middle from contacts")
+checkScanSchemata(query, "struct>")
+checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: 
Nil)
+  }
+
+  testSchemaPruning("select a single complex field and the partition 
column") {
+val query = sql("select name.middle, p from contacts")
+checkScanSchemata(query, "struct>")
+checkAnswer(query, Row("X.", 1) :: Row("Y.", 1) :: Row(null, 2) :: 
Row(null, 2) :: Nil)
+  }
+
+  testSchemaPruning("partial schema intersection - select missing 
subfield") {
+val query = sql("select name.middle, address from contacts where p=2")
+checkScanSchemata(query, 
"struct,address:string>")
+checkAnswer(query,
+  Row(null, "567 Maple Drive") ::
+  Row(null, "6242 Ash Street") :: Nil)
+  }
+
+  testSchemaPruning("no unnecessary schema pruning") {
+val query =
+  sql("select name.last, name.middle, name.first, relatives[''].last, 
" +
+"relatives[''].middle, relatives[''].first, friends[0].last, 
friends[0].middle, " +
+"friends[0].first, pets, address from contacts where p=2")
+// We've selected every field in the schema. Therefore, no schema 
pruning should be performed.
+// We check this by asserting that the scanned schema of the query is 
identical to the schema
+// of the contacts relation, even though the fields are selected in 
different orders.
+checkScanSchemata(query,
+  
"struct,address:string,pets:int,"
 +
+  "friends:array>," +
+  
"relatives:map>>")
+checkAnswer(query,
+  Row("Jones", null, "Janet", null, null, null, null, null, null, 
null, "567 Maple Drive") ::
+  Row("Jones", null, "Jim", null, null, null, null, null, null, 

[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-21 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r204205744
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -71,9 +80,22 @@ private[parquet] class ParquetReadSupport(val convertTz: 
Option[TimeZone])
   StructType.fromString(schemaString)
 }
 
-val parquetRequestedSchema =
+val clippedParquetSchema =
   ParquetReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
 
+val parquetRequestedSchema = if (parquetMrCompatibility) {
+  // Parquet-mr will throw an exception if we try to read a superset 
of the file's schema.
--- End diff --

What kind of exception does it throw? I thought Parquet supports it so far. 
Do you mean it doesn't support nested clipping schema between Parquet and 
Catalyst's? Which case does it now work?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r203934633
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -288,6 +310,27 @@ private[parquet] object ParquetReadSupport {
 }
   }
 
+  /**
+   * Computes the structural intersection between two Parquet group types.
+   */
+  private def intersectParquetGroups(
+  groupType1: GroupType, groupType2: GroupType): Option[GroupType] = {
+val fields =
+  groupType1.getFields.asScala
+.filter(field => groupType2.containsField(field.getName))
+.flatMap {
+  case field1: GroupType =>
--- End diff --

`field1` -> `field`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r203934281
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -47,16 +47,25 @@ import org.apache.spark.sql.types._
  *
  * Due to this reason, we no longer rely on [[ReadContext]] to pass 
requested schema from [[init()]]
  * to [[prepareForRead()]], but use a private `var` for simplicity.
+ *
+ * @param parquetMrCompatibility support reading with parquet-mr or 
Spark's built-in Parquet reader
  */
-private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
+private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone],
+parquetMrCompatibility: Boolean)
 extends ReadSupport[UnsafeRow] with Logging {
   private var catalystRequestedSchema: StructType = _
 
+  /**
+   * Construct a [[ParquetReadSupport]] with [[convertTz]] set to [[None]] 
and
+   * [[parquetMrCompatibility]] set to [[false]].
+   *
+   * We need a zero-arg constructor for SpecificParquetRecordReaderBase.  
But that is only
+   * used in the vectorized reader, where we get the convertTz value 
directly, and the value here
+   * is ignored. Further, we set [[parquetMrCompatibility]] to [[false]] 
as this constructor is only
+   * called by the Spark reader.
--- End diff --

re: 
https://github.com/apache/spark/pull/21320/files/cb858f202e49d69f2044681e37f982dc10676296#r199631341
 actually, it doesn't looks clear to me too. What does the flag indicate? you 
mean normal parquet reader vs vectorized parquet reader?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r203933423
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala
 ---
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.exceptions.TestFailedException
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.types._
+
+// scalastyle:off line.size.limit
+class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
+  // The test schema as a tree string, i.e. `schema.treeString`
+  // root
+  //  |-- col1: string (nullable = false)
+  //  |-- col2: struct (nullable = true)
+  //  ||-- field1: integer (nullable = true)
+  //  ||-- field2: array (nullable = true)
+  //  |||-- element: integer (containsNull = false)
+  //  ||-- field3: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  ||||-- subfield3: array (nullable = true)
+  //  |||||-- element: integer (containsNull = true)
+  //  ||-- field4: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: struct (valueContainsNull = false)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: array (nullable = true)
+  //  |||||-- element: integer (containsNull = false)
+  //  ||-- field5: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: struct (nullable = false)
+  //  |||||-- subsubfield1: integer (nullable = true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||||-- subfield2: struct (nullable = true)
+  //  |||||-- subsubfield1: struct (nullable = true)
+  //  ||||||-- subsubsubfield1: string (nullable = 
true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field6: struct (nullable = true)
+  //  |||-- subfield1: string (nullable = false)
+  //  |||-- subfield2: string (nullable = true)
+  //  ||-- field7: struct (nullable = true)
+  //  |||-- subfield1: struct (nullable = true)
+  //  ||||-- subsubfield1: integer (nullable = true)
+  //  ||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field8: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: array (valueContainsNull = false)
+  //  ||||-- element: struct (containsNull = true)
+  //  |||||-- subfield1: integer (nullable = true)
+  //  |||||-- subfield2: array (nullable = true)
+  //  ||||||-- element: integer (containsNull = false)
+  //  ||-- field9: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: integer (valueContainsNull = false)
+  //  |-- col3: array (nullable = false)
+  //  ||-- element: struct (containsNull = false)
+  //  |||-- field1: struct (nullable = true)
+  //  ||||-- subfield1: integer (nullable = false)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  |||-- field2: map (nullable = true)
+  //  ||||-- key: string
+  //  ||||-- value: integer 

[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r203933307
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 ---
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.execution.FileSchemaPruningTest
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetSchemaPruningSuite
+extends QueryTest
+with ParquetTest
+with FileSchemaPruningTest
+with SharedSQLContext {
--- End diff --

Can we just simply:

```scala
  override def beforeEach(): Unit = {
super.beforeEach()
spark.conf.set(SQLConf. NESTED_SCHEMA_PRUNING_ENABLED.key, "true")
  }

  override def afterEach(): Unit = {
try {
  spark.conf.unset(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key)
} finally {
  super.afterEach()
}
  }
```

without the complicated hierarchy?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r203931755
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that projects an expression over a given schema. Data 
types,
+ * field indexes and field counts of complex type extractors and attributes
+ * are adjusted to fit the schema. All other expressions are left as-is. 
This
+ * class is motivated by columnar nested schema pruning.
+ */
+case class ProjectionOverSchema(schema: StructType) {
--- End diff --

This still looks weird that we place this under `catalyst` since we 
currently only use it under `execution`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r203931060
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that projects an expression over a given schema. Data 
types,
+ * field indexes and field counts of complex type extractors and attributes
+ * are adjusted to fit the schema. All other expressions are left as-is. 
This
+ * class is motivated by columnar nested schema pruning.
+ */
+case class ProjectionOverSchema(schema: StructType) {
+  private val fieldNames = schema.fieldNames.toSet
+
+  def unapply(expr: Expression): Option[Expression] = getProjection(expr)
+
+  private def getProjection(expr: Expression): Option[Expression] =
+expr match {
+  case a @ AttributeReference(name, _, _, _) if 
(fieldNames.contains(name)) =>
+Some(a.copy(dataType = schema(name).dataType)(a.exprId, 
a.qualifier))
+  case GetArrayItem(child, arrayItemOrdinal) =>
+getProjection(child).map {
+  case projection =>
+GetArrayItem(projection, arrayItemOrdinal)
--- End diff --

nit:

```
getProjection(child).map { projection => GetArrayItem(projection, 
arrayItemOrdinal) }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-19 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r203931030
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/ProjectionOverSchema.scala
 ---
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that projects an expression over a given schema. Data 
types,
+ * field indexes and field counts of complex type extractors and attributes
+ * are adjusted to fit the schema. All other expressions are left as-is. 
This
+ * class is motivated by columnar nested schema pruning.
+ */
+case class ProjectionOverSchema(schema: StructType) {
+  private val fieldNames = schema.fieldNames.toSet
+
+  def unapply(expr: Expression): Option[Expression] = getProjection(expr)
+
+  private def getProjection(expr: Expression): Option[Expression] =
+expr match {
+  case a @ AttributeReference(name, _, _, _) if 
(fieldNames.contains(name)) =>
+Some(a.copy(dataType = schema(name).dataType)(a.exprId, 
a.qualifier))
+  case GetArrayItem(child, arrayItemOrdinal) =>
+getProjection(child).map {
+  case projection =>
+GetArrayItem(projection, arrayItemOrdinal)
+}
+  case GetArrayStructFields(child, StructField(name, _, _, _), _, 
numFields, containsNull) =>
+getProjection(child).map(p => (p, p.dataType)).map {
+  case (projection, ArrayType(projSchema @ StructType(_), _)) =>
+GetArrayStructFields(projection,
+  projSchema(name), projSchema.fieldIndex(name), 
projSchema.size, containsNull)
+}
+  case GetMapValue(child, key) =>
+getProjection(child).map {
+  case projection =>
--- End diff --

nit:

```scala
getProjection(child).map { projection => GetMapValue(projection, key) }
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-11 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r201863353
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+if (SQLConf.get.nestedSchemaPruningEnabled) {
+  apply0(plan)
+} else {
+  plan
+}
+
+  private def apply0(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) 
=>
+val projectionRootFields = projects.flatMap(getRootFields)
+val filterRootFields = filters.flatMap(getRootFields)
+val requestedRootFields = (projectionRootFields ++ 
filterRootFields).distinct
+
+// If [[requestedRootFields]] includes a nested field, continue. 
Otherwise,
+// return [[op]]
+if (requestedRootFields.exists { case RootField(_, derivedFromAtt) 
=> !derivedFromAtt }) {
+  val prunedSchema = requestedRootFields
+.map { case RootField(field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
+  val dataSchemaFieldNames = dataSchema.fieldNames.toSet
--- End diff --

Yes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-11 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r201863463
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SelectedField.scala
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that builds a 
[[org.apache.spark.sql.types.StructField]] from a Catalyst
+ * complex type extractor. For example, consider a relation with the 
following schema:
+ *
+ *   {{{
+ *   root
+ *|-- name: struct (nullable = true)
+ *||-- first: string (nullable = true)
+ *||-- last: string (nullable = true)
+ *}}}
+ *
+ * Further, suppose we take the select expression `name.first`. This will 
parse into an
+ * `Alias(child, "first")`. Ignoring the alias, `child` matches the 
following pattern:
+ *
+ *   {{{
+ *   GetStructFieldObject(
+ * AttributeReference("name", StructType(_), _, _),
+ * StructField("first", StringType, _, _))
+ *   }}}
+ *
+ * [[SelectedField]] converts that expression into
+ *
+ *   {{{
+ *   StructField("name", StructType(Array(StructField("first", 
StringType
+ *   }}}
+ *
+ * by mapping each complex type extractor to a 
[[org.apache.spark.sql.types.StructField]] with the
+ * same name as its child (or "parent" going right to left in the select 
expression) and a data
+ * type appropriate to the complex type extractor. In our example, the 
name of the child expression
+ * is "name" and its data type is a 
[[org.apache.spark.sql.types.StructType]] with a single string
+ * field named "first".
+ *
+ * @param expr the top-level complex type extractor
+ */
+object SelectedField {
+  def unapply(expr: Expression): Option[StructField] = {
--- End diff --

The code does not compile with that change.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-11 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r201863251
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -71,9 +80,22 @@ private[parquet] class ParquetReadSupport(val convertTz: 
Option[TimeZone])
   StructType.fromString(schemaString)
 }
 
-val parquetRequestedSchema =
+val clippedParquetSchema =
   ParquetReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
 
+val parquetRequestedSchema = if (parquetMrCompatibility) {
+  // Parquet-mr will throw an exception if we try to read a superset 
of the file's schema.
+  // Therefore, we intersect our clipped schema with the underlying 
file's schema
+  ParquetReadSupport.intersectParquetGroups(clippedParquetSchema, 
context.getFileSchema)
+.map(intersectionGroup =>
+  new MessageType(intersectionGroup.getName, 
intersectionGroup.getFields))
+.getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE)
+} else {
+  // Spark's built-in Parquet reader will throw an exception in some 
cases if the requested
+  // schema is not the same as the clipped schema
--- End diff --

@gatorsmile @rdblue @mswit-databricks What is your position on this? I 
don't know that the parquet spec provides a definitive answer on this question.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-02 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r199648692
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 ---
@@ -182,18 +182,20 @@ private[parquet] class ParquetRowConverter(
 
   // Converters for each field.
   private val fieldConverters: Array[Converter with 
HasParentContainerUpdater] = {
-parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {
-  case ((parquetFieldType, catalystField), ordinal) =>
-// Converted field value should be set to the `ordinal`-th cell of 
`currentRow`
-newConverter(parquetFieldType, catalystField.dataType, new 
RowUpdater(currentRow, ordinal))
+parquetType.getFields.asScala.map {
+  case parquetField =>
+val fieldIndex = catalystType.fieldIndex(parquetField.getName)
--- End diff --

I dropped into the `sql/console` and attempted to write a parquet file with 
duplicate column names. It didn't work. Transcript below.

```
scala> import org.apache.spark.sql._
import org.apache.spark.sql._

scala> val sameColumnNames = StructType(StructField("a", IntegerType) :: 
StructField("a", StringType) :: Nil)
sameColumnNames: org.apache.spark.sql.types.StructType = 
StructType(StructField(a,IntegerType,true), StructField(a,StringType,true))

scala> val rowRDD = sqlContext.sparkContext.parallelize(Row(1, "one") :: 
Row(2, "two") :: Nil, 1)
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = 
ParallelCollectionRDD[0] at parallelize at :51

scala> val df = sqlContext.createDataFrame(rowRDD, sameColumnNames)
18/07/02 16:31:33 INFO SharedState: Setting hive.metastore.warehouse.dir 
('null') to the value of spark.sql.warehouse.dir 
('file:/Volumes/VideoAmpCS/msa/workspace/spark-public/spark-warehouse').
18/07/02 16:31:33 INFO SharedState: Warehouse path is 
'file:/Volumes/VideoAmpCS/msa/workspace/spark-public/spark-warehouse'.
18/07/02 16:31:33 INFO ContextHandler: Started 
o.e.j.s.ServletContextHandler@7b13b737{/SQL,null,AVAILABLE,@Spark}
18/07/02 16:31:33 INFO ContextHandler: Started 
o.e.j.s.ServletContextHandler@3c9fb104{/SQL/json,null,AVAILABLE,@Spark}
18/07/02 16:31:33 INFO ContextHandler: Started 
o.e.j.s.ServletContextHandler@3d5cadbe{/SQL/execution,null,AVAILABLE,@Spark}
18/07/02 16:31:33 INFO ContextHandler: Started 
o.e.j.s.ServletContextHandler@73732e26{/SQL/execution/json,null,AVAILABLE,@Spark}
18/07/02 16:31:33 INFO ContextHandler: Started 
o.e.j.s.ServletContextHandler@72a13c4a{/static/sql,null,AVAILABLE,@Spark}
18/07/02 16:31:34 INFO StateStoreCoordinatorRef: Registered 
StateStoreCoordinator endpoint
df: org.apache.spark.sql.DataFrame = [a: int, a: string]

scala> df.write.parquet("sameColumnNames.parquet")
org.apache.spark.sql.AnalysisException: Found duplicate column(s) when 
inserting into 
file:/Volumes/VideoAmpCS/msa/workspace/spark-public/sameColumnNames.parquet: 
`a`;
  at 
org.apache.spark.sql.util.SchemaUtils$.checkColumnNameDuplication(SchemaUtils.scala:85)
  at 
org.apache.spark.sql.util.SchemaUtils$.checkSchemaColumnNameDuplication(SchemaUtils.scala:42)
  at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:64)
  at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
  at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
  at 
org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at 
org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at 
org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
  at 
org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
  at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:662)
  at 
org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:662)
  at 
org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at 

[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-02 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r199643803
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -71,9 +80,22 @@ private[parquet] class ParquetReadSupport(val convertTz: 
Option[TimeZone])
   StructType.fromString(schemaString)
 }
 
-val parquetRequestedSchema =
+val clippedParquetSchema =
   ParquetReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
 
+val parquetRequestedSchema = if (parquetMrCompatibility) {
+  // Parquet-mr will throw an exception if we try to read a superset 
of the file's schema.
+  // Therefore, we intersect our clipped schema with the underlying 
file's schema
+  ParquetReadSupport.intersectParquetGroups(clippedParquetSchema, 
context.getFileSchema)
+.map(intersectionGroup =>
+  new MessageType(intersectionGroup.getName, 
intersectionGroup.getFields))
+.getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE)
+} else {
+  // Spark's built-in Parquet reader will throw an exception in some 
cases if the requested
+  // schema is not the same as the clipped schema
--- End diff --

I believe the failure occurs because the requested schema and file 
schema—while having columns with identical names and types—have columns in 
different order. Of the one test that fails in the `ParquetFilterSuite`, namely 
"Filter applied on merged Parquet schema with new column should work", it 
appears to be the only one for which the order of the columns is changed. These 
are the file and requested schema for that test:

```
Parquet file schema:
message spark_schema {
  required int32 c;
  optional binary b (UTF8);
}

Parquet requested schema:
message spark_schema {
  optional binary b (UTF8);
  required int32 c;
}
```

I would say the Spark reader expects identical column order, whereas the 
parquet-mr reader accepts different column order but identical (or compatible) 
column names. That's my supposition at least.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-02 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r199631341
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -47,16 +47,25 @@ import org.apache.spark.sql.types._
  *
  * Due to this reason, we no longer rely on [[ReadContext]] to pass 
requested schema from [[init()]]
  * to [[prepareForRead()]], but use a private `var` for simplicity.
+ *
+ * @param parquetMrCompatibility support reading with parquet-mr or 
Spark's built-in Parquet reader
  */
-private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
+private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone],
+parquetMrCompatibility: Boolean)
 extends ReadSupport[UnsafeRow] with Logging {
   private var catalystRequestedSchema: StructType = _
 
+  /**
+   * Construct a [[ParquetReadSupport]] with [[convertTz]] set to [[None]] 
and
+   * [[parquetMrCompatibility]] set to [[false]].
+   *
+   * We need a zero-arg constructor for SpecificParquetRecordReaderBase.  
But that is only
+   * used in the vectorized reader, where we get the convertTz value 
directly, and the value here
+   * is ignored. Further, we set [[parquetMrCompatibility]] to [[false]] 
as this constructor is only
+   * called by the Spark reader.
--- End diff --

I don't understand your confusion. I think the comment makes it very clear 
why we need to set that parameter to false. How can I make it better? Or can 
you be more specific about what is unclear to you?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-02 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r199516297
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/SelectedField.scala
 ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that builds a 
[[org.apache.spark.sql.types.StructField]] from a Catalyst
+ * complex type extractor. For example, consider a relation with the 
following schema:
+ *
+ *   {{{
+ *   root
+ *|-- name: struct (nullable = true)
+ *||-- first: string (nullable = true)
+ *||-- last: string (nullable = true)
+ *}}}
+ *
+ * Further, suppose we take the select expression `name.first`. This will 
parse into an
+ * `Alias(child, "first")`. Ignoring the alias, `child` matches the 
following pattern:
+ *
+ *   {{{
+ *   GetStructFieldObject(
+ * AttributeReference("name", StructType(_), _, _),
+ * StructField("first", StringType, _, _))
+ *   }}}
+ *
+ * [[SelectedField]] converts that expression into
+ *
+ *   {{{
+ *   StructField("name", StructType(Array(StructField("first", 
StringType
+ *   }}}
+ *
+ * by mapping each complex type extractor to a 
[[org.apache.spark.sql.types.StructField]] with the
+ * same name as its child (or "parent" going right to left in the select 
expression) and a data
+ * type appropriate to the complex type extractor. In our example, the 
name of the child expression
+ * is "name" and its data type is a 
[[org.apache.spark.sql.types.StructType]] with a single string
+ * field named "first".
+ *
+ * @param expr the top-level complex type extractor
+ */
+object SelectedField {
+  def unapply(expr: Expression): Option[StructField] = {
--- End diff --

nit: `Expression` -> `ExtractValue`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-02 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r199538123
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+if (SQLConf.get.nestedSchemaPruningEnabled) {
+  apply0(plan)
+} else {
+  plan
+}
+
+  private def apply0(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) 
=>
+val projectionRootFields = projects.flatMap(getRootFields)
+val filterRootFields = filters.flatMap(getRootFields)
+val requestedRootFields = (projectionRootFields ++ 
filterRootFields).distinct
+
+// If [[requestedRootFields]] includes a nested field, continue. 
Otherwise,
+// return [[op]]
+if (requestedRootFields.exists { case RootField(_, derivedFromAtt) 
=> !derivedFromAtt }) {
+  val prunedSchema = requestedRootFields
+.map { case RootField(field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
+  val dataSchemaFieldNames = dataSchema.fieldNames.toSet
--- End diff --

dataSchema may also contains partition columns(see the doc of 
`HadoopFsRelation`), is this rule prepared for this case?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-02 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r199516625
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1288,8 +1288,18 @@ object SQLConf {
 "issues. Turn on this config to insert a local sort before 
actually doing repartition " +
 "to generate consistent repartition results. The performance of 
repartition() may go " +
 "down since we insert extra local sort before it.")
+.booleanConf
+.createWithDefault(true)
+
+  val NESTED_SCHEMA_PRUNING_ENABLED =
+buildConf("spark.sql.nestedSchemaPruning.enabled")
+  .internal()
+  .doc("Prune nested fields from a logical relation's output which are 
unnecessary in " +
+"satisfying a query. This optimization allows columnar file format 
readers to avoid " +
+"reading unnecessary nested column data. Currently Parquet is the 
only data source that " +
+"implements this optimization.")
   .booleanConf
-  .createWithDefault(true)
+  .createWithDefault(false)
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-02 Thread mswit-databricks
Github user mswit-databricks commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r199415439
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -71,9 +80,22 @@ private[parquet] class ParquetReadSupport(val convertTz: 
Option[TimeZone])
   StructType.fromString(schemaString)
 }
 
-val parquetRequestedSchema =
+val clippedParquetSchema =
   ParquetReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
 
+val parquetRequestedSchema = if (parquetMrCompatibility) {
+  // Parquet-mr will throw an exception if we try to read a superset 
of the file's schema.
+  // Therefore, we intersect our clipped schema with the underlying 
file's schema
+  ParquetReadSupport.intersectParquetGroups(clippedParquetSchema, 
context.getFileSchema)
+.map(intersectionGroup =>
+  new MessageType(intersectionGroup.getName, 
intersectionGroup.getFields))
+.getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE)
+} else {
+  // Spark's built-in Parquet reader will throw an exception in some 
cases if the requested
+  // schema is not the same as the clipped schema
--- End diff --

CC @michal-databricks 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r199389588
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 ---
@@ -182,18 +182,20 @@ private[parquet] class ParquetRowConverter(
 
   // Converters for each field.
   private val fieldConverters: Array[Converter with 
HasParentContainerUpdater] = {
-parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {
-  case ((parquetFieldType, catalystField), ordinal) =>
-// Converted field value should be set to the `ordinal`-th cell of 
`currentRow`
-newConverter(parquetFieldType, catalystField.dataType, new 
RowUpdater(currentRow, ordinal))
+parquetType.getFields.asScala.map {
+  case parquetField =>
+val fieldIndex = catalystType.fieldIndex(parquetField.getName)
--- End diff --

The name can be used as the identifiers? Could you double check whether we 
can save the a parquet file with duplicate column names? [Note: the previous 
version of Spark does not check name duplication. Thus, I guess the previous 
version of Spark might generate the file with duplicate column names]


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r199368095
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -47,16 +47,25 @@ import org.apache.spark.sql.types._
  *
  * Due to this reason, we no longer rely on [[ReadContext]] to pass 
requested schema from [[init()]]
  * to [[prepareForRead()]], but use a private `var` for simplicity.
+ *
+ * @param parquetMrCompatibility support reading with parquet-mr or 
Spark's built-in Parquet reader
  */
-private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
+private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone],
+parquetMrCompatibility: Boolean)
 extends ReadSupport[UnsafeRow] with Logging {
   private var catalystRequestedSchema: StructType = _
 
+  /**
+   * Construct a [[ParquetReadSupport]] with [[convertTz]] set to [[None]] 
and
+   * [[parquetMrCompatibility]] set to [[false]].
+   *
+   * We need a zero-arg constructor for SpecificParquetRecordReaderBase.  
But that is only
+   * used in the vectorized reader, where we get the convertTz value 
directly, and the value here
+   * is ignored. Further, we set [[parquetMrCompatibility]] to [[false]] 
as this constructor is only
+   * called by the Spark reader.
--- End diff --

Based on this comment, we have no idea why this is set to false. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r199364935
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -71,9 +80,22 @@ private[parquet] class ParquetReadSupport(val convertTz: 
Option[TimeZone])
   StructType.fromString(schemaString)
 }
 
-val parquetRequestedSchema =
+val clippedParquetSchema =
   ParquetReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
 
+val parquetRequestedSchema = if (parquetMrCompatibility) {
--- End diff --

This is only for nested schema pruning? Turn this off when 
`spark.sql.nestedSchemaPruning.enabled` is off?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r199389252
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -71,9 +80,22 @@ private[parquet] class ParquetReadSupport(val convertTz: 
Option[TimeZone])
   StructType.fromString(schemaString)
 }
 
-val parquetRequestedSchema =
+val clippedParquetSchema =
   ParquetReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
 
+val parquetRequestedSchema = if (parquetMrCompatibility) {
+  // Parquet-mr will throw an exception if we try to read a superset 
of the file's schema.
+  // Therefore, we intersect our clipped schema with the underlying 
file's schema
+  ParquetReadSupport.intersectParquetGroups(clippedParquetSchema, 
context.getFileSchema)
+.map(intersectionGroup =>
+  new MessageType(intersectionGroup.getName, 
intersectionGroup.getFields))
+.getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE)
+} else {
+  // Spark's built-in Parquet reader will throw an exception in some 
cases if the requested
+  // schema is not the same as the clipped schema
--- End diff --

cc @rdblue @mswit-databricks Do you know the root cause? 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r199356283
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -47,16 +47,25 @@ import org.apache.spark.sql.types._
  *
  * Due to this reason, we no longer rely on [[ReadContext]] to pass 
requested schema from [[init()]]
  * to [[prepareForRead()]], but use a private `var` for simplicity.
+ *
+ * @param parquetMrCompatibility support reading with parquet-mr or 
Spark's built-in Parquet reader
  */
-private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
+private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone],
+parquetMrCompatibility: Boolean)
--- End diff --

```Scala
private[parquet] class ParquetReadSupport(
val convertTz: Option[TimeZone],
parquetMrCompatibility: Boolean)
  extends ReadSupport[UnsafeRow] with Logging {
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r199354841
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 ---
@@ -417,11 +417,12 @@ class ParquetFileFormat
   } else {
 logDebug(s"Falling back to parquet-mr")
 // ParquetRecordReader returns UnsafeRow
+val readSupport = new ParquetReadSupport(convertTz, true)
--- End diff --

> val readSupport = new ParquetReadSupport(convertTz, 
parquetMrCompatibility = true)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-01 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r199365004
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
@@ -47,16 +47,25 @@ import org.apache.spark.sql.types._
  *
  * Due to this reason, we no longer rely on [[ReadContext]] to pass 
requested schema from [[init()]]
  * to [[prepareForRead()]], but use a private `var` for simplicity.
+ *
+ * @param parquetMrCompatibility support reading with parquet-mr or 
Spark's built-in Parquet reader
--- End diff --

The description is not clear. Could you make it better?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-06-24 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r197646687
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
 ---
@@ -99,27 +100,28 @@ trait ConstraintHelper {
   }
 
   /**
-   * Infer the Attribute-specific IsNotNull constraints from the null 
intolerant child expressions
-   * of constraints.
+   * Infer the Attribute and ExtractValue-specific IsNotNull constraints 
from the null intolerant
+   * child expressions of constraints.
*/
   private def inferIsNotNullConstraints(constraint: Expression): 
Seq[Expression] =
 constraint match {
   // When the root is IsNotNull, we can push IsNotNull through the 
child null intolerant
   // expressions
-  case IsNotNull(expr) => 
scanNullIntolerantAttribute(expr).map(IsNotNull(_))
+  case IsNotNull(expr) => 
scanNullIntolerantField(expr).map(IsNotNull(_))
   // Constraints always return true for all the inputs. That means, 
null will never be returned.
   // Thus, we can infer `IsNotNull(constraint)`, and also push 
IsNotNull through the child
   // null intolerant expressions.
-  case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_))
+  case _ => scanNullIntolerantField(constraint).map(IsNotNull(_))
 }
 
   /**
-   * Recursively explores the expressions which are null intolerant and 
returns all attributes
-   * in these expressions.
+   * Recursively explores the expressions which are null intolerant and 
returns all attributes and
+   * complex type extractors in these expressions.
*/
-  private def scanNullIntolerantAttribute(expr: Expression): 
Seq[Attribute] = expr match {
+  private def scanNullIntolerantField(expr: Expression): Seq[Expression] = 
expr match {
+case ev: ExtractValue => Seq(ev)
--- End diff --

Because of this change, we also need to change the code in 
basicPhysicalOperators.scala. I do not think this is the right solution. More 
importantly, the changes in basicPhysicalOperators.scala might break the 
others. We need a separate PR for these changes. Please remove the changes made 
in basicPhysicalOperators.scala and QueryPlanConstraints.scala


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-06-24 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r197629698
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -301,7 +301,6 @@ case class FileSourceScanExec(
 } getOrElse {
   withOptPartitionCount
 }
-
--- End diff --

Remove this line?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-06-15 Thread DaimonPl
Github user DaimonPl commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r195704995
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1288,8 +1288,18 @@ object SQLConf {
 "issues. Turn on this config to insert a local sort before 
actually doing repartition " +
 "to generate consistent repartition results. The performance of 
repartition() may go " +
 "down since we insert extra local sort before it.")
+.booleanConf
+.createWithDefault(true)
+
+  val NESTED_SCHEMA_PRUNING_ENABLED =
+buildConf("spark.sql.nestedSchemaPruning.enabled")
+  .internal()
+  .doc("Prune nested fields from a logical relation's output which are 
unnecessary in " +
+"satisfying a query. This optimization allows columnar file format 
readers to avoid " +
+"reading unnecessary nested column data. Currently Parquet is the 
only data source that " +
+"implements this optimization.")
   .booleanConf
-  .createWithDefault(true)
+  .createWithDefault(false)
--- End diff --

how about enabling it as default? there should be enough time to find any 
unexpected problems with 2.4.0

+ nested column pruning would be enabled during for all other automatic 
tests


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-06-08 Thread jainaks
Github user jainaks commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r194049288
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+if (SQLConf.get.nestedSchemaPruningEnabled) {
+  apply0(plan)
+} else {
+  plan
+}
+
+  private def apply0(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) 
=>
+val projectionRootFields = projects.flatMap(getRootFields)
+val filterRootFields = filters.flatMap(getRootFields)
+val requestedRootFields = (projectionRootFields ++ 
filterRootFields).distinct
+
+// If [[requestedRootFields]] includes a nested field, continue. 
Otherwise,
+// return [[op]]
+if (requestedRootFields.exists { case RootField(_, derivedFromAtt) 
=> !derivedFromAtt }) {
+  val prunedSchema = requestedRootFields
+.map { case RootField(field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
+  val dataSchemaFieldNames = dataSchema.fieldNames.toSet
+  val prunedDataSchema =
+StructType(prunedSchema.filter(f => 
dataSchemaFieldNames.contains(f.name)))
--- End diff --

This check also filters the fields have same name, but in different case 
than the schema.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-05-24 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r190494243
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.exceptions.TestFailedException
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.types._
+
+// scalastyle:off line.size.limit
+class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
+  // The test schema as a tree string, i.e. `schema.treeString`
+  // root
+  //  |-- col1: string (nullable = false)
+  //  |-- col2: struct (nullable = true)
+  //  ||-- field1: integer (nullable = true)
+  //  ||-- field2: array (nullable = true)
+  //  |||-- element: integer (containsNull = false)
+  //  ||-- field3: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  ||||-- subfield3: array (nullable = true)
+  //  |||||-- element: integer (containsNull = true)
+  //  ||-- field4: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: struct (valueContainsNull = false)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: array (nullable = true)
+  //  |||||-- element: integer (containsNull = false)
+  //  ||-- field5: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: struct (nullable = false)
+  //  |||||-- subsubfield1: integer (nullable = true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||||-- subfield2: struct (nullable = true)
+  //  |||||-- subsubfield1: struct (nullable = true)
+  //  ||||||-- subsubsubfield1: string (nullable = 
true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field6: struct (nullable = true)
+  //  |||-- subfield1: string (nullable = false)
+  //  |||-- subfield2: string (nullable = true)
+  //  ||-- field7: struct (nullable = true)
+  //  |||-- subfield1: struct (nullable = true)
+  //  ||||-- subsubfield1: integer (nullable = true)
+  //  ||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field8: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: array (valueContainsNull = false)
+  //  ||||-- element: struct (containsNull = true)
+  //  |||||-- subfield1: integer (nullable = true)
+  //  |||||-- subfield2: array (nullable = true)
+  //  ||||||-- element: integer (containsNull = false)
+  //  ||-- field9: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: integer (valueContainsNull = false)
+  //  |-- col3: array (nullable = false)
+  //  ||-- element: struct (containsNull = false)
+  //  |||-- field1: struct (nullable = true)
+  //  ||||-- subfield1: integer (nullable = false)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  |||-- field2: map (nullable = true)
+  //  ||||-- key: string
+  //  ||||-- value: integer 

[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-05-24 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r190493689
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+if (SQLConf.get.nestedSchemaPruningEnabled) {
+  apply0(plan)
+} else {
+  plan
+}
+
+  private def apply0(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) 
=>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a nested field, continue. 
Otherwise,
+// return [[op]]
+if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) 
{
+  val prunedSchema = requestedFields
+.map { case (field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
+  val dataSchemaFieldNames = dataSchema.fieldNames.toSet
+  val prunedDataSchema =
+StructType(prunedSchema.filter(f => 
dataSchemaFieldNames.contains(f.name)))
+
+  // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+  // return [[op]]. We effect this comparison by counting the 
number of "leaf" fields in
+  // each schemata, assuming the fields in [[prunedDataSchema]] 
are a subset of the fields
+  // in [[dataSchema]].
+  if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+val prunedParquetRelation =
+  hadoopFsRelation.copy(dataSchema = 
prunedDataSchema)(hadoopFsRelation.sparkSession)
+
+// We need to replace the expression ids of the pruned 
relation output attributes
+// with the expression ids of the original relation output 
attributes so that
+// references to the original relation's output are not broken
+val outputIdMap = l.output.map(att => (att.name, 
att.exprId)).toMap
+val prunedRelationOutput =
+  prunedParquetRelation
+.schema
+.toAttributes
+.map {
+  case att if outputIdMap.contains(att.name) =>
+att.withExprId(outputIdMap(att.name))
+  case att => att
+}
+val prunedRelation =
+  l.copy(relation = prunedParquetRelation, output = 
prunedRelationOutput)
+
+  

[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-05-24 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r190492424
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+if (SQLConf.get.nestedSchemaPruningEnabled) {
+  apply0(plan)
+} else {
+  plan
+}
+
+  private def apply0(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) 
=>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a nested field, continue. 
Otherwise,
+// return [[op]]
+if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) 
{
+  val prunedSchema = requestedFields
+.map { case (field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
+  val dataSchemaFieldNames = dataSchema.fieldNames.toSet
+  val prunedDataSchema =
+StructType(prunedSchema.filter(f => 
dataSchemaFieldNames.contains(f.name)))
+
+  // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+  // return [[op]]. We effect this comparison by counting the 
number of "leaf" fields in
+  // each schemata, assuming the fields in [[prunedDataSchema]] 
are a subset of the fields
+  // in [[dataSchema]].
+  if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+val prunedParquetRelation =
+  hadoopFsRelation.copy(dataSchema = 
prunedDataSchema)(hadoopFsRelation.sparkSession)
+
+// We need to replace the expression ids of the pruned 
relation output attributes
+// with the expression ids of the original relation output 
attributes so that
+// references to the original relation's output are not broken
+val outputIdMap = l.output.map(att => (att.name, 
att.exprId)).toMap
+val prunedRelationOutput =
+  prunedParquetRelation
+.schema
+.toAttributes
+.map {
+  case att if outputIdMap.contains(att.name) =>
+att.withExprId(outputIdMap(att.name))
+  case att => att
+}
+val prunedRelation =
+  l.copy(relation = prunedParquetRelation, output = 
prunedRelationOutput)
+
+  

[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-05-24 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r190491050
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 ---
@@ -879,6 +879,15 @@ class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedSQLContext
   }
 }
   }
+
+  test("select function over nested data") {
--- End diff --

I did some forensics to understand the origin and reason for this test. It 
was part of the first commit of my original PR almost two years ago. Even then, 
this test passes without the rest of the commit. So I can't say why I added 
this test except perhaps that I felt it was useful code coverage.

In any case, if you don't think it's a valuable contribution in this PR I'd 
rather just remove it entirely.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-05-24 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r190486220
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1256,8 +1256,18 @@ object SQLConf {
 "issues. Turn on this config to insert a local sort before 
actually doing repartition " +
 "to generate consistent repartition results. The performance of 
repartition() may go " +
 "down since we insert extra local sort before it.")
+.booleanConf
+.createWithDefault(true)
+
+  val NESTED_SCHEMA_PRUNING_ENABLED =
+buildConf("spark.sql.nestedSchemaPruning.enabled")
+  .internal()
+  .doc("Prune nested fields from a logical relation's output which are 
unnecessary in " +
+"satisfying a query. This optimization allows columnar file format 
readers to avoid " +
+"reading unnecessary nested column data. Currently Parquet is the 
only data source that " +
--- End diff --

ORC should be able to support this capability as well, but this PR does not 
address that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-05-24 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r190485768
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -286,7 +286,19 @@ case class FileSourceScanExec(
   } getOrElse {
 metadata
   }
-withOptPartitionCount
+val withOptColumnCount = relation.fileFormat match {
+  case columnar: ColumnarFileFormat =>
+SparkSession
+  .getActiveSession
+  .map { sparkSession =>
+val columnCount = columnar.columnCountForSchema(sparkSession, 
requiredSchema)
+withOptPartitionCount + ("ColumnCount" -> columnCount.toString)
--- End diff --

Replied above.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-05-24 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r190485713
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ColumnarFileFormat.scala
 ---
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+
+/**
+ * An optional mix-in for columnar [[FileFormat]]s. This trait provides 
some helpful metadata when
+ * debugging a physical query plan.
+ */
+private[sql] trait ColumnarFileFormat {
--- End diff --

I'm okay with adding this as a separate PR as requested, but I wouldn't 
want to ship this PR in a release without being able to see the physical column 
count associated with a query in a query plan. That is invaluable functionality 
in validating that physical column pruning is occurring as expected.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-05-24 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r190484386
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
 ---
@@ -99,27 +100,28 @@ trait ConstraintHelper {
   }
 
   /**
-   * Infer the Attribute-specific IsNotNull constraints from the null 
intolerant child expressions
-   * of constraints.
+   * Infer the Attribute and ExtractValue-specific IsNotNull constraints 
from the null intolerant
+   * child expressions of constraints.
*/
   private def inferIsNotNullConstraints(constraint: Expression): 
Seq[Expression] =
 constraint match {
   // When the root is IsNotNull, we can push IsNotNull through the 
child null intolerant
   // expressions
-  case IsNotNull(expr) => 
scanNullIntolerantAttribute(expr).map(IsNotNull(_))
+  case IsNotNull(expr) => 
scanNullIntolerantField(expr).map(IsNotNull(_))
   // Constraints always return true for all the inputs. That means, 
null will never be returned.
   // Thus, we can infer `IsNotNull(constraint)`, and also push 
IsNotNull through the child
   // null intolerant expressions.
-  case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_))
+  case _ => scanNullIntolerantField(constraint).map(IsNotNull(_))
 }
 
   /**
-   * Recursively explores the expressions which are null intolerant and 
returns all attributes
-   * in these expressions.
+   * Recursively explores the expressions which are null intolerant and 
returns all attributes and
+   * complex type extractors in these expressions.
*/
-  private def scanNullIntolerantAttribute(expr: Expression): 
Seq[Attribute] = expr match {
+  private def scanNullIntolerantField(expr: Expression): Seq[Expression] = 
expr match {
+case ev: ExtractValue => Seq(ev)
--- End diff --

I agree adding more direct and independent test coverage for this change is 
a good idea. However, omitting this change will weaken the capabilities of this 
PR. It would also imply the removal of the failing test case in 
`ParquetSchemaPruningSuite`, which would imply two follow on PRs. The first 
would be to add this specific change plus the right test coverage. The next 
would be to restore the test case removed from 'ParquetSchemaPruningSuite'.

Let me suggest an alternative. As this change is a valuable enhancement for 
this PR, let me try adding an appropriate test case in 
`InferFiltersFromConstraintsSuite` as part of this PR. That will eliminate the 
requirement for two more follow-on PRs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-05-24 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r190479026
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -162,7 +162,9 @@ case class FilterExec(condition: Expression, child: 
SparkPlan)
 val generatedIsNotNullChecks = new Array[Boolean](notNullPreds.length)
 val generated = otherPreds.map { c =>
   val nullChecks = c.references.map { r =>
-val idx = notNullPreds.indexWhere { n => 
n.asInstanceOf[IsNotNull].child.semanticEquals(r)}
+val idx = notNullPreds.indexWhere { n =>
+  n.asInstanceOf[IsNotNull].child.references.contains(r)
--- End diff --

Yes. IIRC a community member identified a bug and @viirya contributed a fix 
and unit test. See 
https://github.com/VideoAmp/spark-public/commit/8b5661cefc3aa12cc79d2af162b0d84ab2afa39f.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-05-22 Thread dongjoon-hyun
Github user dongjoon-hyun commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r189961165
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1256,8 +1256,18 @@ object SQLConf {
 "issues. Turn on this config to insert a local sort before 
actually doing repartition " +
 "to generate consistent repartition results. The performance of 
repartition() may go " +
 "down since we insert extra local sort before it.")
+.booleanConf
+.createWithDefault(true)
+
+  val NESTED_SCHEMA_PRUNING_ENABLED =
+buildConf("spark.sql.nestedSchemaPruning.enabled")
+  .internal()
+  .doc("Prune nested fields from a logical relation's output which are 
unnecessary in " +
+"satisfying a query. This optimization allows columnar file format 
readers to avoid " +
+"reading unnecessary nested column data. Currently Parquet is the 
only data source that " +
--- End diff --

Thank you for pinging me, @gatorsmile . Let me check it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-05-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r189493854
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+if (SQLConf.get.nestedSchemaPruningEnabled) {
+  apply0(plan)
+} else {
+  plan
+}
+
+  private def apply0(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) 
=>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a nested field, continue. 
Otherwise,
+// return [[op]]
+if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) 
{
+  val prunedSchema = requestedFields
+.map { case (field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
+  val dataSchemaFieldNames = dataSchema.fieldNames.toSet
+  val prunedDataSchema =
+StructType(prunedSchema.filter(f => 
dataSchemaFieldNames.contains(f.name)))
+
+  // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+  // return [[op]]. We effect this comparison by counting the 
number of "leaf" fields in
+  // each schemata, assuming the fields in [[prunedDataSchema]] 
are a subset of the fields
+  // in [[dataSchema]].
+  if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+val prunedParquetRelation =
+  hadoopFsRelation.copy(dataSchema = 
prunedDataSchema)(hadoopFsRelation.sparkSession)
+
+// We need to replace the expression ids of the pruned 
relation output attributes
+// with the expression ids of the original relation output 
attributes so that
+// references to the original relation's output are not broken
+val outputIdMap = l.output.map(att => (att.name, 
att.exprId)).toMap
+val prunedRelationOutput =
+  prunedParquetRelation
+.schema
+.toAttributes
+.map {
+  case att if outputIdMap.contains(att.name) =>
+att.withExprId(outputIdMap(att.name))
+  case att => att
+}
+val prunedRelation =
+  l.copy(relation = prunedParquetRelation, output = 
prunedRelationOutput)
+

[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-05-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r189493986
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.{PhysicalOperation, 
ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+if (SQLConf.get.nestedSchemaPruningEnabled) {
+  apply0(plan)
+} else {
+  plan
+}
+
+  private def apply0(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, 
partitionSchema,
+dataSchema, _, parquetFormat: ParquetFileFormat, _), _, _, _)) 
=>
+val projectionFields = projects.flatMap(getFields)
+val filterFields = filters.flatMap(getFields)
+val requestedFields = (projectionFields ++ filterFields).distinct
+
+// If [[requestedFields]] includes a nested field, continue. 
Otherwise,
+// return [[op]]
+if (requestedFields.exists { case (_, optAtt) => optAtt.isEmpty }) 
{
+  val prunedSchema = requestedFields
+.map { case (field, _) => StructType(Array(field)) }
+.reduceLeft(_ merge _)
+  val dataSchemaFieldNames = dataSchema.fieldNames.toSet
+  val prunedDataSchema =
+StructType(prunedSchema.filter(f => 
dataSchemaFieldNames.contains(f.name)))
+
+  // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+  // return [[op]]. We effect this comparison by counting the 
number of "leaf" fields in
+  // each schemata, assuming the fields in [[prunedDataSchema]] 
are a subset of the fields
+  // in [[dataSchema]].
+  if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+val prunedParquetRelation =
+  hadoopFsRelation.copy(dataSchema = 
prunedDataSchema)(hadoopFsRelation.sparkSession)
+
+// We need to replace the expression ids of the pruned 
relation output attributes
+// with the expression ids of the original relation output 
attributes so that
+// references to the original relation's output are not broken
+val outputIdMap = l.output.map(att => (att.name, 
att.exprId)).toMap
+val prunedRelationOutput =
+  prunedParquetRelation
+.schema
+.toAttributes
+.map {
+  case att if outputIdMap.contains(att.name) =>
+att.withExprId(outputIdMap(att.name))
+  case att => att
+}
+val prunedRelation =
+  l.copy(relation = prunedParquetRelation, output = 
prunedRelationOutput)
+

[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-05-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r189491559
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 ---
@@ -879,6 +879,15 @@ class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedSQLContext
   }
 }
   }
+
+  test("select function over nested data") {
--- End diff --

Without this PR, this test still can pass, right? 

Could you submit a separate PR for these test coverage improvement? We 
really welcome the test coverage improvement PRs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-05-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r189491063
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ColumnarFileFormat.scala
 ---
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources
+
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.types.StructType
+
+/**
+ * An optional mix-in for columnar [[FileFormat]]s. This trait provides 
some helpful metadata when
+ * debugging a physical query plan.
+ */
+private[sql] trait ColumnarFileFormat {
--- End diff --

Can we do this in a separate PR?  No need to block this PR due to the 
discussion about this implementation. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-05-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r189489061
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -162,7 +162,9 @@ case class FilterExec(condition: Expression, child: 
SparkPlan)
 val generatedIsNotNullChecks = new Array[Boolean](notNullPreds.length)
 val generated = otherPreds.map { c =>
   val nullChecks = c.references.map { r =>
-val idx = notNullPreds.indexWhere { n => 
n.asInstanceOf[IsNotNull].child.semanticEquals(r)}
+val idx = notNullPreds.indexWhere { n =>
+  n.asInstanceOf[IsNotNull].child.references.contains(r)
--- End diff --

Is this change related?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-05-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r189489577
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/QueryPlanConstraints.scala
 ---
@@ -99,27 +100,28 @@ trait ConstraintHelper {
   }
 
   /**
-   * Infer the Attribute-specific IsNotNull constraints from the null 
intolerant child expressions
-   * of constraints.
+   * Infer the Attribute and ExtractValue-specific IsNotNull constraints 
from the null intolerant
+   * child expressions of constraints.
*/
   private def inferIsNotNullConstraints(constraint: Expression): 
Seq[Expression] =
 constraint match {
   // When the root is IsNotNull, we can push IsNotNull through the 
child null intolerant
   // expressions
-  case IsNotNull(expr) => 
scanNullIntolerantAttribute(expr).map(IsNotNull(_))
+  case IsNotNull(expr) => 
scanNullIntolerantField(expr).map(IsNotNull(_))
   // Constraints always return true for all the inputs. That means, 
null will never be returned.
   // Thus, we can infer `IsNotNull(constraint)`, and also push 
IsNotNull through the child
   // null intolerant expressions.
-  case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_))
+  case _ => scanNullIntolerantField(constraint).map(IsNotNull(_))
 }
 
   /**
-   * Recursively explores the expressions which are null intolerant and 
returns all attributes
-   * in these expressions.
+   * Recursively explores the expressions which are null intolerant and 
returns all attributes and
+   * complex type extractors in these expressions.
*/
-  private def scanNullIntolerantAttribute(expr: Expression): 
Seq[Attribute] = expr match {
+  private def scanNullIntolerantField(expr: Expression): Seq[Expression] = 
expr match {
+case ev: ExtractValue => Seq(ev)
--- End diff --

For this improvement, can we do it in a separate PR? The corresponding unit 
test case are needed in `InferFiltersFromConstraintsSuite` instead of 
`ParquetSchemaPruningSuite`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-05-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r189491217
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -286,7 +286,19 @@ case class FileSourceScanExec(
   } getOrElse {
 metadata
   }
-withOptPartitionCount
+val withOptColumnCount = relation.fileFormat match {
+  case columnar: ColumnarFileFormat =>
+SparkSession
+  .getActiveSession
+  .map { sparkSession =>
+val columnCount = columnar.columnCountForSchema(sparkSession, 
requiredSchema)
+withOptPartitionCount + ("ColumnCount" -> columnCount.toString)
--- End diff --

This needs to be in a separate PR as I suggested above. BTW, we could 
easily lose this metadata if this change does not have a test case. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-05-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r189479383
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala ---
@@ -1256,8 +1256,18 @@ object SQLConf {
 "issues. Turn on this config to insert a local sort before 
actually doing repartition " +
 "to generate consistent repartition results. The performance of 
repartition() may go " +
 "down since we insert extra local sort before it.")
+.booleanConf
+.createWithDefault(true)
+
+  val NESTED_SCHEMA_PRUNING_ENABLED =
+buildConf("spark.sql.nestedSchemaPruning.enabled")
+  .internal()
+  .doc("Prune nested fields from a logical relation's output which are 
unnecessary in " +
+"satisfying a query. This optimization allows columnar file format 
readers to avoid " +
+"reading unnecessary nested column data. Currently Parquet is the 
only data source that " +
--- End diff --

How about ORC? 

cc @dongjoon-hyun Do you know whether it is also doable in the latest ORC 
version?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-05-20 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21320#discussion_r189492534
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/planning/SelectedFieldSuite.scala
 ---
@@ -0,0 +1,432 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.planning
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.exceptions.TestFailedException
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.types._
+
+// scalastyle:off line.size.limit
+class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
+  // The test schema as a tree string, i.e. `schema.treeString`
+  // root
+  //  |-- col1: string (nullable = false)
+  //  |-- col2: struct (nullable = true)
+  //  ||-- field1: integer (nullable = true)
+  //  ||-- field2: array (nullable = true)
+  //  |||-- element: integer (containsNull = false)
+  //  ||-- field3: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  ||||-- subfield3: array (nullable = true)
+  //  |||||-- element: integer (containsNull = true)
+  //  ||-- field4: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: struct (valueContainsNull = false)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: array (nullable = true)
+  //  |||||-- element: integer (containsNull = false)
+  //  ||-- field5: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: struct (nullable = false)
+  //  |||||-- subsubfield1: integer (nullable = true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||||-- subfield2: struct (nullable = true)
+  //  |||||-- subsubfield1: struct (nullable = true)
+  //  ||||||-- subsubsubfield1: string (nullable = 
true)
+  //  |||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field6: struct (nullable = true)
+  //  |||-- subfield1: string (nullable = false)
+  //  |||-- subfield2: string (nullable = true)
+  //  ||-- field7: struct (nullable = true)
+  //  |||-- subfield1: struct (nullable = true)
+  //  ||||-- subsubfield1: integer (nullable = true)
+  //  ||||-- subsubfield2: integer (nullable = true)
+  //  ||-- field8: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: array (valueContainsNull = false)
+  //  ||||-- element: struct (containsNull = true)
+  //  |||||-- subfield1: integer (nullable = true)
+  //  |||||-- subfield2: array (nullable = true)
+  //  ||||||-- element: integer (containsNull = false)
+  //  ||-- field9: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: integer (valueContainsNull = false)
+  //  |-- col3: array (nullable = false)
+  //  ||-- element: struct (containsNull = false)
+  //  |||-- field1: struct (nullable = true)
+  //  ||||-- subfield1: integer (nullable = false)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  |||-- field2: map (nullable = true)
+  //  ||||-- key: string
+  //  ||||-- value: integer 

[GitHub] spark pull request #21320: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-05-14 Thread mallman
GitHub user mallman opened a pull request:

https://github.com/apache/spark/pull/21320

[SPARK-4502][SQL] Parquet nested column pruning - foundation

(Link to Jira: https://issues.apache.org/jira/browse/SPARK-4502)

_N.B. This is a restart of PR #16578 which includes everything in that PR 
except the aggregation and join schema pruning rules. Relevant review comments 
from that PR should be considered incorporated by reference. Please avoid 
duplication in review by reviewing that PR first. The summary below is an 
edited copy of the summary of the previous PR._

## What changes were proposed in this pull request?

One of the hallmarks of a column-oriented data storage format is the 
ability to read data from a subset of columns, efficiently skipping reads from 
other columns. Spark has long had support for pruning unneeded top-level schema 
fields from the scan of a parquet file. For example, consider a table, 
`contacts`, backed by parquet with the following Spark SQL schema:

```
root
 |-- name: struct
 ||-- first: string
 ||-- last: string
 |-- address: string
```

Parquet stores this table's data in three physical columns: `name.first`, 
`name.last` and `address`. To answer the query

```SQL
select address from contacts
```

Spark will read only from the `address` column of parquet data. However, to 
answer the query

```SQL
select name.first from contacts
```

Spark will read `name.first` and `name.last` from parquet.

This PR modifies Spark SQL to support a finer-grain of schema pruning. With 
this patch, Spark reads only the `name.first` column to answer the previous 
query.

### Implementation

There are two main components of this patch. First, there is a 
`ParquetSchemaPruning` optimizer rule for gathering the required schema fields 
of a `PhysicalOperation` over a parquet file, constructing a new schema based 
on those required fields and rewriting the plan in terms of that pruned schema. 
The pruned schema fields are pushed down to the parquet requested read schema. 
`ParquetSchemaPruning` uses a new `ProjectionOverSchema` extractor for 
rewriting a catalyst expression in terms of a pruned schema.

Second, the `ParquetRowConverter` has been patched to ensure the ordinals 
of the parquet columns read are correct for the pruned schema. 
`ParquetReadSupport` has been patched to address a compatibility mismatch 
between Spark's built in vectorized reader and the parquet-mr library's reader.

### Limitation

Among the complex Spark SQL data types, this patch supports parquet column 
pruning of nested sequences of struct fields only.

## How was this patch tested?

Care has been taken to ensure correctness and prevent regressions. A more 
advanced version of this patch incorporating optimizations for rewriting 
queries involving aggregations and joins has been running on a production Spark 
cluster at VideoAmp for several years. In that time, one bug was found and 
fixed early on, and we added a regression test for that bug.

We forward-ported this patch to Spark master in June 2016 and have been 
running this patch against Spark 2.x branches on ad-hoc clusters since then.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/VideoAmp/spark-public 
spark-4502-parquet_column_pruning-foundation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21320.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21320


commit 9e301b37bb5863ef5fad8ec15f1d8f3d4b5e6a6f
Author: Michael Allman 
Date:   2016-06-24T17:21:24Z

[SPARK-4502][SQL] Parquet nested column pruning




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org