[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-16 Thread ajacques
Github user ajacques closed the pull request at:

https://github.com/apache/spark/pull/21889


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-14 Thread ajacques
Github user ajacques commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r210170646
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 ---
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.scalactic.Equality
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+
+class ParquetSchemaPruningSuite
+extends QueryTest
+with ParquetTest
+with SharedSQLContext {
+  case class FullName(first: String, middle: String, last: String)
+  case class Contact(
+id: Int,
+name: FullName,
+address: String,
+pets: Int,
+friends: Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map())
+
+  val janeDoe = FullName("Jane", "X.", "Doe")
+  val johnDoe = FullName("John", "Y.", "Doe")
+  val susanSmith = FullName("Susan", "Z.", "Smith")
+
+  private val contacts =
+Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith),
+  relatives = Map("brother" -> johnDoe)) ::
+Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> 
janeDoe)) :: Nil
+
+  case class Name(first: String, last: String)
+  case class BriefContact(id: Int, name: Name, address: String)
+
+  private val briefContacts =
+BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") ::
+BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil
+
+  case class ContactWithDataPartitionColumn(
+id: Int,
+name: FullName,
+address: String,
+pets: Int,
+friends: Array[FullName] = Array(),
--- End diff --

This is a definition in a constructor, so I don't think we can do:
```scala
case class Contact(
  [...]
  friends: Array.empty[FullName],
  relatives: Map.empty[String, FullName]
)
```
Scala wants a colon, so I opted for:
```scala
case class Contact(
  [...]
  friends: Array[FullName] = Array.empty,
  relatives: Map[String, FullName] = Map.empty
)
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-14 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r209839642
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+if (SQLConf.get.nestedSchemaPruningEnabled) {
+  apply0(plan)
+} else {
+  plan
+}
+
+  private def apply0(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _,
+dataSchema, _, _: ParquetFileFormat, _), _, _, _)) =>
+val projectionRootFields = projects.flatMap(getRootFields)
+val filterRootFields = filters.flatMap(getRootFields)
+val requestedRootFields = (projectionRootFields ++ 
filterRootFields).distinct
+
+// If requestedRootFields includes a nested field, continue. 
Otherwise,
+// return op
+if (requestedRootFields.exists { case RootField(_, derivedFromAtt) 
=> !derivedFromAtt }) {
+  val prunedDataSchema = buildPrunedDataSchema(dataSchema, 
requestedRootFields)
+
+  // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+  // return op. We effect this comparison by counting the number 
of "leaf" fields in
+  // each schemata, assuming the fields in [[prunedDataSchema]] 
are a subset of the fields
+  // in dataSchema.
+  if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+val prunedParquetRelation =
+  hadoopFsRelation.copy(dataSchema = 
prunedDataSchema)(hadoopFsRelation.sparkSession)
+
+val prunedRelation = buildPrunedRelation(l, 
prunedParquetRelation)
+val projectionOverSchema = 
ProjectionOverSchema(prunedDataSchema)
+
+// Construct a new target for our projection by rewriting and
+// including the original filters where available
+val projectionChild =
+  if (filters.nonEmpty) {
+val projectedFilters = filters.map(_.transformDown {
+  case projectionOverSchema(expr) => expr
+})
+val newFilterCondition = projectedFilters.reduce(And)
+Filter(newFilterCondition, prunedRelation)
+  } else {
+prunedRelation
+  }
+
+// Construct the new projections of our Project by
+// rewriting the original projections
+val newProjects = projects.map(_.transformDown {
+  case projectionOverSchema(expr) => expr
+}).map { case expr: 

[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-13 Thread ajacques
Github user ajacques commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r209830673
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+if (SQLConf.get.nestedSchemaPruningEnabled) {
+  apply0(plan)
+} else {
+  plan
+}
+
+  private def apply0(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _,
+dataSchema, _, _: ParquetFileFormat, _), _, _, _)) =>
+val projectionRootFields = projects.flatMap(getRootFields)
+val filterRootFields = filters.flatMap(getRootFields)
+val requestedRootFields = (projectionRootFields ++ 
filterRootFields).distinct
+
+// If requestedRootFields includes a nested field, continue. 
Otherwise,
+// return op
+if (requestedRootFields.exists { case RootField(_, derivedFromAtt) 
=> !derivedFromAtt }) {
+  val prunedDataSchema = buildPrunedDataSchema(dataSchema, 
requestedRootFields)
+
+  // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+  // return op. We effect this comparison by counting the number 
of "leaf" fields in
+  // each schemata, assuming the fields in [[prunedDataSchema]] 
are a subset of the fields
+  // in dataSchema.
+  if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+val prunedParquetRelation =
+  hadoopFsRelation.copy(dataSchema = 
prunedDataSchema)(hadoopFsRelation.sparkSession)
+
+val prunedRelation = buildPrunedRelation(l, 
prunedParquetRelation)
+val projectionOverSchema = 
ProjectionOverSchema(prunedDataSchema)
+
+// Construct a new target for our projection by rewriting and
+// including the original filters where available
+val projectionChild =
+  if (filters.nonEmpty) {
+val projectedFilters = filters.map(_.transformDown {
+  case projectionOverSchema(expr) => expr
+})
+val newFilterCondition = projectedFilters.reduce(And)
+Filter(newFilterCondition, prunedRelation)
+  } else {
+prunedRelation
+  }
+
+// Construct the new projections of our Project by
+// rewriting the original projections
+val newProjects = projects.map(_.transformDown {
+  case projectionOverSchema(expr) => expr
+}).map { case expr: 

[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r209528285
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/SelectedField.scala ---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that builds a 
[[org.apache.spark.sql.types.StructField]] from a Catalyst
+ * complex type extractor. For example, consider a relation with the 
following schema:
+ *
+ *   {{{
+ *   root
+ *|-- name: struct (nullable = true)
+ *||-- first: string (nullable = true)
+ *||-- last: string (nullable = true)
+ *}}}
--- End diff --

indentation


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r209528183
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+if (SQLConf.get.nestedSchemaPruningEnabled) {
+  apply0(plan)
+} else {
+  plan
+}
+
+  private def apply0(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _,
+dataSchema, _, _: ParquetFileFormat, _), _, _, _)) =>
+val projectionRootFields = projects.flatMap(getRootFields)
+val filterRootFields = filters.flatMap(getRootFields)
+val requestedRootFields = (projectionRootFields ++ 
filterRootFields).distinct
+
+// If requestedRootFields includes a nested field, continue. 
Otherwise,
+// return op
+if (requestedRootFields.exists { case RootField(_, derivedFromAtt) 
=> !derivedFromAtt }) {
--- End diff --

Are we really unable to make some private functions and split some logics 
at least?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r209527916
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTest.scala
 ---
@@ -59,42 +59,68 @@ private[sql] trait ParquetTest extends SQLTestUtils {
   }
 
   /**
-   * Writes `data` to a Parquet file, which is then passed to `f` and will 
be deleted after `f`
+   * Writes `df` to a Parquet file, which is then passed to `f` and will 
be deleted after `f`
* returns.
*/
-  protected def withParquetFile[T <: Product: ClassTag: TypeTag]
-  (data: Seq[T])
+  protected def asParquetFile
--- End diff --

I would avoid unrelated refactoring and revert this back.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r209527485
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 ---
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.scalactic.Equality
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+
+class ParquetSchemaPruningSuite
+extends QueryTest
+with ParquetTest
+with SharedSQLContext {
+  case class FullName(first: String, middle: String, last: String)
+  case class Contact(
+id: Int,
+name: FullName,
+address: String,
+pets: Int,
+friends: Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map())
+
+  val janeDoe = FullName("Jane", "X.", "Doe")
+  val johnDoe = FullName("John", "Y.", "Doe")
+  val susanSmith = FullName("Susan", "Z.", "Smith")
+
+  private val contacts =
+Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith),
+  relatives = Map("brother" -> johnDoe)) ::
+Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> 
janeDoe)) :: Nil
+
+  case class Name(first: String, last: String)
+  case class BriefContact(id: Int, name: Name, address: String)
+
+  private val briefContacts =
+BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") ::
+BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil
+
+  case class ContactWithDataPartitionColumn(
+id: Int,
+name: FullName,
+address: String,
+pets: Int,
+friends: Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map(),
+p: Int)
+
+  case class BriefContactWithDataPartitionColumn(id: Int, name: Name, 
address: String, p: Int)
+
+  private val contactsWithDataPartitionColumn =
+contacts.map { case Contact(id, name, address, pets, friends, 
relatives) =>
+  ContactWithDataPartitionColumn(id, name, address, pets, friends, 
relatives, 1) }
+  private val briefContactsWithDataPartitionColumn =
+briefContacts.map { case BriefContact(id, name, address) =>
+  BriefContactWithDataPartitionColumn(id, name, address, 2) }
+
+  override def beforeEach(): Unit = {
+super.beforeAll()
+conf.setConf(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED, true)
+  }
+
+  override def afterEach(): Unit = {
+try {
+  conf.unsetConf(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED)
+} finally {
+  super.afterEach()
+}
+  }
+
+  testSchemaPruning("select a single complex field") {
+val query = sql("select name.middle from contacts")
+checkScan(query, "struct>")
+checkAnswer(query.orderBy("id"), Row("X.") :: Row("Y.") :: Row(null) 
:: Row(null) :: Nil)
+  }
+
+  testSchemaPruning("select a single complex field and its parent struct") 
{
+val query = sql("select name.middle, name from contacts")
+checkScan(query, 
"struct>")
+checkAnswer(query.orderBy("id"),
+  Row("X.", Row("Jane", "X.", "Doe")) ::
+  Row("Y.", Row("John", "Y.", "Doe")) ::
+  Row(null, Row("Janet", null, "Jones")) ::
+  Row(null, Row("Jim", null, "Jones")) ::
+  Nil)
+  }
+
+  testSchemaPruning("select a single complex field array and its parent 
struct array") {
+val query = sql("select friends.middle, friends from contacts where 
p=1")
+checkScan(query,
+  
"struct>>")
+checkAnswer(query.orderBy("id"),
+  

[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r209526908
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 ---
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.scalactic.Equality
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+
+class ParquetSchemaPruningSuite
+extends QueryTest
+with ParquetTest
+with SharedSQLContext {
+  case class FullName(first: String, middle: String, last: String)
+  case class Contact(
+id: Int,
+name: FullName,
+address: String,
+pets: Int,
+friends: Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map())
+
+  val janeDoe = FullName("Jane", "X.", "Doe")
+  val johnDoe = FullName("John", "Y.", "Doe")
+  val susanSmith = FullName("Susan", "Z.", "Smith")
+
+  private val contacts =
+Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith),
+  relatives = Map("brother" -> johnDoe)) ::
+Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> 
janeDoe)) :: Nil
+
+  case class Name(first: String, last: String)
+  case class BriefContact(id: Int, name: Name, address: String)
+
+  private val briefContacts =
+BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") ::
+BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil
+
+  case class ContactWithDataPartitionColumn(
+id: Int,
+name: FullName,
+address: String,
+pets: Int,
+friends: Array[FullName] = Array(),
--- End diff --

nit: `Array.empty[FullName]`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r209526986
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 ---
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.scalactic.Equality
+
+import org.apache.spark.sql.{DataFrame, QueryTest, Row}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.execution.FileSourceScanExec
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types.StructType
+
+class ParquetSchemaPruningSuite
+extends QueryTest
+with ParquetTest
+with SharedSQLContext {
+  case class FullName(first: String, middle: String, last: String)
+  case class Contact(
+id: Int,
+name: FullName,
+address: String,
+pets: Int,
+friends: Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map())
+
+  val janeDoe = FullName("Jane", "X.", "Doe")
+  val johnDoe = FullName("John", "Y.", "Doe")
+  val susanSmith = FullName("Susan", "Z.", "Smith")
+
+  private val contacts =
+Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith),
+  relatives = Map("brother" -> johnDoe)) ::
+Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> 
janeDoe)) :: Nil
+
+  case class Name(first: String, last: String)
+  case class BriefContact(id: Int, name: Name, address: String)
+
+  private val briefContacts =
+BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") ::
+BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil
+
+  case class ContactWithDataPartitionColumn(
+id: Int,
+name: FullName,
+address: String,
+pets: Int,
+friends: Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map(),
--- End diff --

nit: `Map.empty[String, FullName]`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r209526670
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/SelectedFieldSuite.scala 
---
@@ -0,0 +1,415 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.exceptions.TestFailedException
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.dsl.plans._
+import org.apache.spark.sql.catalyst.expressions.NamedExpression
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
+import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
+import org.apache.spark.sql.types._
+
+class SelectedFieldSuite extends SparkFunSuite with BeforeAndAfterAll {
+  private val innerSchemaCol1 = StructField("col2", StructType(
+StructField("field1", IntegerType) ::
+  StructField("field2", ArrayType(IntegerType, containsNull = false)) 
::
+  StructField("field3", ArrayType(StructType(
+StructField("subfield1", IntegerType) ::
+  StructField("subfield2", IntegerType) ::
+  StructField("subfield3", ArrayType(IntegerType)) :: Nil)), 
nullable = false) ::
+  StructField("field4", MapType(StringType, StructType(
+StructField("subfield1", IntegerType) ::
+  StructField("subfield2", ArrayType(IntegerType, containsNull = 
false))
+  :: Nil), valueContainsNull = false)) ::
+  StructField("field5", ArrayType(StructType(
+StructField("subfield1", StructType(
+  StructField("subsubfield1", IntegerType) ::
+StructField("subsubfield2", IntegerType) :: Nil), nullable = 
false) ::
+  StructField("subfield2", StructType(
+StructField("subsubfield1", StructType(
+  StructField("subsubsubfield1", StringType) :: Nil)) ::
+  StructField("subsubfield2", IntegerType) :: Nil)) :: Nil)), 
nullable = false) ::
+  StructField("field6", StructType(
+StructField("subfield1", StringType, nullable = false) ::
+  StructField("subfield2", StringType) :: Nil)) ::
+  StructField("field7", StructType(
+StructField("subfield1", StructType(
+  StructField("subsubfield1", IntegerType) ::
+StructField("subsubfield2", IntegerType) :: Nil)) :: Nil)) ::
+  StructField("field8", MapType(StringType, ArrayType(StructType(
+StructField("subfield1", IntegerType) ::
+  StructField("subfield2", ArrayType(IntegerType, containsNull = 
false))
+  :: Nil)), valueContainsNull = false)) ::
+  StructField("field9", MapType(StringType, IntegerType, 
valueContainsNull = false)) :: Nil))
+
+  // The test schema as a tree string, i.e. `schema.treeString`
+  // root
+  //  |-- col1: string (nullable = false)
+  //  |-- col2: struct (nullable = true)
+  //  ||-- field1: integer (nullable = true)
+  //  ||-- field2: array (nullable = true)
+  //  |||-- element: integer (containsNull = false)
+  //  ||-- field3: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: integer (nullable = true)
+  //  ||||-- subfield3: array (nullable = true)
+  //  |||||-- element: integer (containsNull = true)
+  //  ||-- field4: map (nullable = true)
+  //  |||-- key: string
+  //  |||-- value: struct (valueContainsNull = false)
+  //  ||||-- subfield1: integer (nullable = true)
+  //  ||||-- subfield2: array (nullable = true)
+  //  |||||-- element: integer (containsNull = false)
+  //  ||-- field5: array (nullable = false)
+  //  |||-- element: struct (containsNull = true)

[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r209526105
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+if (SQLConf.get.nestedSchemaPruningEnabled) {
+  apply0(plan)
+} else {
+  plan
+}
+
+  private def apply0(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _,
+dataSchema, _, _: ParquetFileFormat, _), _, _, _)) =>
+val projectionRootFields = projects.flatMap(getRootFields)
+val filterRootFields = filters.flatMap(getRootFields)
+val requestedRootFields = (projectionRootFields ++ 
filterRootFields).distinct
+
+// If requestedRootFields includes a nested field, continue. 
Otherwise,
+// return op
+if (requestedRootFields.exists { case RootField(_, derivedFromAtt) 
=> !derivedFromAtt }) {
+  val prunedDataSchema = buildPrunedDataSchema(dataSchema, 
requestedRootFields)
+
+  // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+  // return op. We effect this comparison by counting the number 
of "leaf" fields in
+  // each schemata, assuming the fields in [[prunedDataSchema]] 
are a subset of the fields
+  // in dataSchema.
+  if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+val prunedParquetRelation =
+  hadoopFsRelation.copy(dataSchema = 
prunedDataSchema)(hadoopFsRelation.sparkSession)
+
+val prunedRelation = buildPrunedRelation(l, 
prunedParquetRelation)
+val projectionOverSchema = 
ProjectionOverSchema(prunedDataSchema)
+
+// Construct a new target for our projection by rewriting and
+// including the original filters where available
+val projectionChild =
+  if (filters.nonEmpty) {
+val projectedFilters = filters.map(_.transformDown {
+  case projectionOverSchema(expr) => expr
+})
+val newFilterCondition = projectedFilters.reduce(And)
+Filter(newFilterCondition, prunedRelation)
+  } else {
+prunedRelation
+  }
+
+// Construct the new projections of our Project by
+// rewriting the original projections
+val newProjects = projects.map(_.transformDown {
+  case projectionOverSchema(expr) => expr
+}).map { case expr: 

[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r209525706
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+if (SQLConf.get.nestedSchemaPruningEnabled) {
+  apply0(plan)
+} else {
+  plan
+}
+
+  private def apply0(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _,
+dataSchema, _, _: ParquetFileFormat, _), _, _, _)) =>
+val projectionRootFields = projects.flatMap(getRootFields)
+val filterRootFields = filters.flatMap(getRootFields)
+val requestedRootFields = (projectionRootFields ++ 
filterRootFields).distinct
+
+// If requestedRootFields includes a nested field, continue. 
Otherwise,
+// return op
+if (requestedRootFields.exists { case RootField(_, derivedFromAtt) 
=> !derivedFromAtt }) {
+  val prunedDataSchema = buildPrunedDataSchema(dataSchema, 
requestedRootFields)
+
+  // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+  // return op. We effect this comparison by counting the number 
of "leaf" fields in
+  // each schemata, assuming the fields in [[prunedDataSchema]] 
are a subset of the fields
+  // in dataSchema.
+  if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+val prunedParquetRelation =
+  hadoopFsRelation.copy(dataSchema = 
prunedDataSchema)(hadoopFsRelation.sparkSession)
+
+val prunedRelation = buildPrunedRelation(l, 
prunedParquetRelation)
+val projectionOverSchema = 
ProjectionOverSchema(prunedDataSchema)
+
+// Construct a new target for our projection by rewriting and
+// including the original filters where available
+val projectionChild =
+  if (filters.nonEmpty) {
+val projectedFilters = filters.map(_.transformDown {
+  case projectionOverSchema(expr) => expr
+})
+val newFilterCondition = projectedFilters.reduce(And)
+Filter(newFilterCondition, prunedRelation)
+  } else {
+prunedRelation
+  }
+
+// Construct the new projections of our Project by
+// rewriting the original projections
+val newProjects = projects.map(_.transformDown {
+  case projectionOverSchema(expr) => expr
+}).map { case expr: 

[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r209525587
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+if (SQLConf.get.nestedSchemaPruningEnabled) {
+  apply0(plan)
+} else {
+  plan
+}
+
+  private def apply0(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _,
+dataSchema, _, _: ParquetFileFormat, _), _, _, _)) =>
+val projectionRootFields = projects.flatMap(getRootFields)
+val filterRootFields = filters.flatMap(getRootFields)
+val requestedRootFields = (projectionRootFields ++ 
filterRootFields).distinct
+
+// If requestedRootFields includes a nested field, continue. 
Otherwise,
+// return op
+if (requestedRootFields.exists { case RootField(_, derivedFromAtt) 
=> !derivedFromAtt }) {
+  val prunedDataSchema = buildPrunedDataSchema(dataSchema, 
requestedRootFields)
+
+  // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+  // return op. We effect this comparison by counting the number 
of "leaf" fields in
+  // each schemata, assuming the fields in [[prunedDataSchema]] 
are a subset of the fields
+  // in dataSchema.
+  if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+val prunedParquetRelation =
+  hadoopFsRelation.copy(dataSchema = 
prunedDataSchema)(hadoopFsRelation.sparkSession)
+
+val prunedRelation = buildPrunedRelation(l, 
prunedParquetRelation)
+val projectionOverSchema = 
ProjectionOverSchema(prunedDataSchema)
+
+// Construct a new target for our projection by rewriting and
+// including the original filters where available
+val projectionChild =
+  if (filters.nonEmpty) {
+val projectedFilters = filters.map(_.transformDown {
+  case projectionOverSchema(expr) => expr
+})
+val newFilterCondition = projectedFilters.reduce(And)
+Filter(newFilterCondition, prunedRelation)
+  } else {
+prunedRelation
+  }
+
+// Construct the new projections of our Project by
+// rewriting the original projections
+val newProjects = projects.map(_.transformDown {
+  case projectionOverSchema(expr) => expr
+}).map { case expr: 

[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r209525173
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruning.scala
 ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.catalyst.expressions.{And, Attribute, 
Expression, NamedExpression}
+import org.apache.spark.sql.catalyst.planning.PhysicalOperation
+import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, 
Project}
+import org.apache.spark.sql.catalyst.rules.Rule
+import org.apache.spark.sql.execution.{ProjectionOverSchema, SelectedField}
+import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types.{ArrayType, DataType, MapType, 
StructField, StructType}
+
+/**
+ * Prunes unnecessary Parquet columns given a [[PhysicalOperation]] over a
+ * [[ParquetRelation]]. By "Parquet column", we mean a column as defined 
in the
+ * Parquet format. In Spark SQL, a root-level Parquet column corresponds 
to a
+ * SQL column, and a nested Parquet column corresponds to a 
[[StructField]].
+ */
+private[sql] object ParquetSchemaPruning extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan =
+if (SQLConf.get.nestedSchemaPruningEnabled) {
+  apply0(plan)
+} else {
+  plan
+}
+
+  private def apply0(plan: LogicalPlan): LogicalPlan =
+plan transformDown {
+  case op @ PhysicalOperation(projects, filters,
+  l @ LogicalRelation(hadoopFsRelation @ HadoopFsRelation(_, _,
+dataSchema, _, _: ParquetFileFormat, _), _, _, _)) =>
+val projectionRootFields = projects.flatMap(getRootFields)
+val filterRootFields = filters.flatMap(getRootFields)
+val requestedRootFields = (projectionRootFields ++ 
filterRootFields).distinct
+
+// If requestedRootFields includes a nested field, continue. 
Otherwise,
+// return op
+if (requestedRootFields.exists { case RootField(_, derivedFromAtt) 
=> !derivedFromAtt }) {
+  val prunedDataSchema = buildPrunedDataSchema(dataSchema, 
requestedRootFields)
+
+  // If the data schema is different from the pruned data schema, 
continue. Otherwise,
+  // return op. We effect this comparison by counting the number 
of "leaf" fields in
+  // each schemata, assuming the fields in [[prunedDataSchema]] 
are a subset of the fields
+  // in dataSchema.
+  if (countLeaves(dataSchema) > countLeaves(prunedDataSchema)) {
+val prunedParquetRelation =
+  hadoopFsRelation.copy(dataSchema = 
prunedDataSchema)(hadoopFsRelation.sparkSession)
+
+val prunedRelation = buildPrunedRelation(l, 
prunedParquetRelation)
+val projectionOverSchema = 
ProjectionOverSchema(prunedDataSchema)
+
+// Construct a new target for our projection by rewriting and
+// including the original filters where available
+val projectionChild =
+  if (filters.nonEmpty) {
+val projectedFilters = filters.map(_.transformDown {
+  case projectionOverSchema(expr) => expr
+})
+val newFilterCondition = projectedFilters.reduce(And)
+Filter(newFilterCondition, prunedRelation)
+  } else {
+prunedRelation
+  }
+
+// Construct the new projections of our Project by
+// rewriting the original projections
+val newProjects = projects.map(_.transformDown {
+  case projectionOverSchema(expr) => expr
+}).map { case expr: 

[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r209524566
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that projects an expression over a given schema. Data 
types,
+ * field indexes and field counts of complex type extractors and attributes
+ * are adjusted to fit the schema. All other expressions are left as-is. 
This
+ * class is motivated by columnar nested schema pruning.
+ */
+private[execution] case class ProjectionOverSchema(schema: StructType) {
+  private val fieldNames = schema.fieldNames.toSet
+
+  def unapply(expr: Expression): Option[Expression] = getProjection(expr)
+
+  private def getProjection(expr: Expression): Option[Expression] =
+expr match {
+  case a @ AttributeReference(name, _, _, _) if 
fieldNames.contains(name) =>
+Some(a.copy(dataType = schema(name).dataType)(a.exprId, 
a.qualifier))
+  case GetArrayItem(child, arrayItemOrdinal) =>
+getProjection(child).map { projection => GetArrayItem(projection, 
arrayItemOrdinal) }
+  case GetArrayStructFields(child, StructField(name, _, _, _), _, _, 
containsNull) =>
--- End diff --

For instance, these `, _, _, _), _, _,` looks excessive.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-13 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r209524346
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/ProjectionOverSchema.scala
 ---
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.types._
+
+/**
+ * A Scala extractor that projects an expression over a given schema. Data 
types,
+ * field indexes and field counts of complex type extractors and attributes
+ * are adjusted to fit the schema. All other expressions are left as-is. 
This
+ * class is motivated by columnar nested schema pruning.
+ */
+private[execution] case class ProjectionOverSchema(schema: StructType) {
+  private val fieldNames = schema.fieldNames.toSet
+
+  def unapply(expr: Expression): Option[Expression] = getProjection(expr)
+
+  private def getProjection(expr: Expression): Option[Expression] =
+expr match {
+  case a @ AttributeReference(name, _, _, _) if 
fieldNames.contains(name) =>
--- End diff --

Can we really avoid `AttributeReference(name, _, _, _)` pattern per 
https://github.com/databricks/scala-style-guide#pattern-matching?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-07 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r208446828
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.execution.FileSchemaPruningTest
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetSchemaPruningSuite
+extends QueryTest
+with ParquetTest
+with FileSchemaPruningTest
+with SharedSQLContext {
+  case class FullName(first: String, middle: String, last: String)
+  case class Contact(
+id: Int,
+name: FullName,
+address: String,
+pets: Int,
+friends: Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map())
+
+  val janeDoe = FullName("Jane", "X.", "Doe")
+  val johnDoe = FullName("John", "Y.", "Doe")
+  val susanSmith = FullName("Susan", "Z.", "Smith")
+
+  val contacts =
+Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith),
+  relatives = Map("brother" -> johnDoe)) ::
+Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> 
janeDoe)) :: Nil
+
+  case class Name(first: String, last: String)
+  case class BriefContact(id: Int, name: Name, address: String)
+
+  val briefContacts =
+BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") ::
+BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil
+
+  case class ContactWithDataPartitionColumn(
+id: Int,
+name: FullName,
+address: String,
+pets: Int,
+friends: Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map(),
+p: Int)
+
+  case class BriefContactWithDataPartitionColumn(id: Int, name: Name, 
address: String, p: Int)
+
+  val contactsWithDataPartitionColumn =
+contacts.map { case Contact(id, name, address, pets, friends, 
relatives) =>
+  ContactWithDataPartitionColumn(id, name, address, pets, friends, 
relatives, 1) }
+  val briefContactsWithDataPartitionColumn =
+briefContacts.map { case BriefContact(id, name, address) =>
+  BriefContactWithDataPartitionColumn(id, name, address, 2) }
+
+  testSchemaPruning("select a single complex field") {
+val query = sql("select name.middle from contacts order by id")
+checkScanSchemata(query, "struct>")
+checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: 
Nil)
+  }
+
+  testSchemaPruning("select a single complex field and its parent struct") 
{
+val query = sql("select name.middle, name from contacts order by id")
+checkScanSchemata(query, 
"struct>")
+checkAnswer(query,
+  Row("X.", Row("Jane", "X.", "Doe")) ::
+  Row("Y.", Row("John", "Y.", "Doe")) ::
+  Row(null, Row("Janet", null, "Jones")) ::
+  Row(null, Row("Jim", null, "Jones")) ::
+  Nil)
+  }
+
+  testSchemaPruning("select a single complex field array and its parent 
struct array") {
+val query = sql("select friends.middle, friends from contacts where 
p=1 order by id")
+checkScanSchemata(query,
+  
"struct>>")
+checkAnswer(query,
+  Row(Array("Z."), Array(Row("Susan", "Z.", "Smith"))) ::
+  Row(Array.empty[String], Array.empty[Row]) ::
+  Nil)
+  }
+
+  testSchemaPruning("select a single complex field from a map entry and 
its parent map entry") {
+val query =
+  sql("select relatives[\"brother\"].middle, relatives[\"brother\"] 
from contacts where p=1 " +
+"order by id")
+checkScanSchemata(query,
+  
"struct>>")
+checkAnswer(query,
+  

[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-04 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r207719862
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.execution.FileSchemaPruningTest
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetSchemaPruningSuite
+extends QueryTest
+with ParquetTest
+with FileSchemaPruningTest
+with SharedSQLContext {
+  case class FullName(first: String, middle: String, last: String)
+  case class Contact(
+id: Int,
+name: FullName,
+address: String,
+pets: Int,
+friends: Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map())
+
+  val janeDoe = FullName("Jane", "X.", "Doe")
+  val johnDoe = FullName("John", "Y.", "Doe")
+  val susanSmith = FullName("Susan", "Z.", "Smith")
+
+  val contacts =
+Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith),
+  relatives = Map("brother" -> johnDoe)) ::
+Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> 
janeDoe)) :: Nil
+
+  case class Name(first: String, last: String)
+  case class BriefContact(id: Int, name: Name, address: String)
+
+  val briefContacts =
+BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") ::
+BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil
+
+  case class ContactWithDataPartitionColumn(
+id: Int,
+name: FullName,
+address: String,
+pets: Int,
+friends: Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map(),
+p: Int)
+
+  case class BriefContactWithDataPartitionColumn(id: Int, name: Name, 
address: String, p: Int)
+
+  val contactsWithDataPartitionColumn =
+contacts.map { case Contact(id, name, address, pets, friends, 
relatives) =>
+  ContactWithDataPartitionColumn(id, name, address, pets, friends, 
relatives, 1) }
+  val briefContactsWithDataPartitionColumn =
+briefContacts.map { case BriefContact(id, name, address) =>
+  BriefContactWithDataPartitionColumn(id, name, address, 2) }
+
+  testSchemaPruning("select a single complex field") {
+val query = sql("select name.middle from contacts order by id")
+checkScanSchemata(query, "struct>")
+checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: 
Nil)
+  }
+
+  testSchemaPruning("select a single complex field and its parent struct") 
{
+val query = sql("select name.middle, name from contacts order by id")
+checkScanSchemata(query, 
"struct>")
+checkAnswer(query,
+  Row("X.", Row("Jane", "X.", "Doe")) ::
+  Row("Y.", Row("John", "Y.", "Doe")) ::
+  Row(null, Row("Janet", null, "Jones")) ::
+  Row(null, Row("Jim", null, "Jones")) ::
+  Nil)
+  }
+
+  testSchemaPruning("select a single complex field array and its parent 
struct array") {
+val query = sql("select friends.middle, friends from contacts where 
p=1 order by id")
+checkScanSchemata(query,
+  
"struct>>")
+checkAnswer(query,
+  Row(Array("Z."), Array(Row("Susan", "Z.", "Smith"))) ::
+  Row(Array.empty[String], Array.empty[Row]) ::
+  Nil)
+  }
+
+  testSchemaPruning("select a single complex field from a map entry and 
its parent map entry") {
+val query =
+  sql("select relatives[\"brother\"].middle, relatives[\"brother\"] 
from contacts where p=1 " +
+"order by id")
+checkScanSchemata(query,
+  
"struct>>")
+checkAnswer(query,
+  

[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-04 Thread mallman
Github user mallman commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r207718734
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.execution.FileSchemaPruningTest
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetSchemaPruningSuite
+extends QueryTest
+with ParquetTest
+with FileSchemaPruningTest
+with SharedSQLContext {
+  case class FullName(first: String, middle: String, last: String)
+  case class Contact(
+id: Int,
+name: FullName,
+address: String,
+pets: Int,
+friends: Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map())
+
+  val janeDoe = FullName("Jane", "X.", "Doe")
+  val johnDoe = FullName("John", "Y.", "Doe")
+  val susanSmith = FullName("Susan", "Z.", "Smith")
+
+  val contacts =
+Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith),
+  relatives = Map("brother" -> johnDoe)) ::
+Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> 
janeDoe)) :: Nil
+
+  case class Name(first: String, last: String)
+  case class BriefContact(id: Int, name: Name, address: String)
+
+  val briefContacts =
+BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") ::
+BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil
+
+  case class ContactWithDataPartitionColumn(
+id: Int,
+name: FullName,
+address: String,
+pets: Int,
+friends: Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map(),
+p: Int)
+
+  case class BriefContactWithDataPartitionColumn(id: Int, name: Name, 
address: String, p: Int)
+
+  val contactsWithDataPartitionColumn =
+contacts.map { case Contact(id, name, address, pets, friends, 
relatives) =>
+  ContactWithDataPartitionColumn(id, name, address, pets, friends, 
relatives, 1) }
+  val briefContactsWithDataPartitionColumn =
+briefContacts.map { case BriefContact(id, name, address) =>
+  BriefContactWithDataPartitionColumn(id, name, address, 2) }
+
+  testSchemaPruning("select a single complex field") {
+val query = sql("select name.middle from contacts order by id")
+checkScanSchemata(query, "struct>")
+checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: 
Nil)
+  }
+
+  testSchemaPruning("select a single complex field and its parent struct") 
{
+val query = sql("select name.middle, name from contacts order by id")
+checkScanSchemata(query, 
"struct>")
+checkAnswer(query,
+  Row("X.", Row("Jane", "X.", "Doe")) ::
+  Row("Y.", Row("John", "Y.", "Doe")) ::
+  Row(null, Row("Janet", null, "Jones")) ::
+  Row(null, Row("Jim", null, "Jones")) ::
+  Nil)
+  }
+
+  testSchemaPruning("select a single complex field array and its parent 
struct array") {
+val query = sql("select friends.middle, friends from contacts where 
p=1 order by id")
+checkScanSchemata(query,
+  
"struct>>")
+checkAnswer(query,
+  Row(Array("Z."), Array(Row("Susan", "Z.", "Smith"))) ::
+  Row(Array.empty[String], Array.empty[Row]) ::
+  Nil)
+  }
+
+  testSchemaPruning("select a single complex field from a map entry and 
its parent map entry") {
+val query =
+  sql("select relatives[\"brother\"].middle, relatives[\"brother\"] 
from contacts where p=1 " +
+"order by id")
+checkScanSchemata(query,
+  
"struct>>")
+checkAnswer(query,
+  

[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-04 Thread ajacques
Github user ajacques commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r207718713
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.execution.FileSchemaPruningTest
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetSchemaPruningSuite
+extends QueryTest
+with ParquetTest
+with FileSchemaPruningTest
+with SharedSQLContext {
+  case class FullName(first: String, middle: String, last: String)
+  case class Contact(
+id: Int,
+name: FullName,
+address: String,
+pets: Int,
+friends: Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map())
+
+  val janeDoe = FullName("Jane", "X.", "Doe")
+  val johnDoe = FullName("John", "Y.", "Doe")
+  val susanSmith = FullName("Susan", "Z.", "Smith")
+
+  val contacts =
+Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith),
+  relatives = Map("brother" -> johnDoe)) ::
+Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> 
janeDoe)) :: Nil
+
+  case class Name(first: String, last: String)
+  case class BriefContact(id: Int, name: Name, address: String)
+
+  val briefContacts =
+BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") ::
+BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil
+
+  case class ContactWithDataPartitionColumn(
+id: Int,
+name: FullName,
+address: String,
+pets: Int,
+friends: Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map(),
+p: Int)
+
+  case class BriefContactWithDataPartitionColumn(id: Int, name: Name, 
address: String, p: Int)
+
+  val contactsWithDataPartitionColumn =
+contacts.map { case Contact(id, name, address, pets, friends, 
relatives) =>
+  ContactWithDataPartitionColumn(id, name, address, pets, friends, 
relatives, 1) }
+  val briefContactsWithDataPartitionColumn =
+briefContacts.map { case BriefContact(id, name, address) =>
+  BriefContactWithDataPartitionColumn(id, name, address, 2) }
+
+  testSchemaPruning("select a single complex field") {
+val query = sql("select name.middle from contacts order by id")
+checkScanSchemata(query, "struct>")
+checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: 
Nil)
+  }
+
+  testSchemaPruning("select a single complex field and its parent struct") 
{
+val query = sql("select name.middle, name from contacts order by id")
+checkScanSchemata(query, 
"struct>")
+checkAnswer(query,
+  Row("X.", Row("Jane", "X.", "Doe")) ::
+  Row("Y.", Row("John", "Y.", "Doe")) ::
+  Row(null, Row("Janet", null, "Jones")) ::
+  Row(null, Row("Jim", null, "Jones")) ::
+  Nil)
+  }
+
+  testSchemaPruning("select a single complex field array and its parent 
struct array") {
+val query = sql("select friends.middle, friends from contacts where 
p=1 order by id")
+checkScanSchemata(query,
+  
"struct>>")
+checkAnswer(query,
+  Row(Array("Z."), Array(Row("Susan", "Z.", "Smith"))) ::
+  Row(Array.empty[String], Array.empty[Row]) ::
+  Nil)
+  }
+
+  testSchemaPruning("select a single complex field from a map entry and 
its parent map entry") {
+val query =
+  sql("select relatives[\"brother\"].middle, relatives[\"brother\"] 
from contacts where p=1 " +
+"order by id")
+checkScanSchemata(query,
+  
"struct>>")
+checkAnswer(query,
+  

[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-08-04 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/21889#discussion_r207701260
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaPruningSuite.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.execution.FileSchemaPruningTest
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetSchemaPruningSuite
+extends QueryTest
+with ParquetTest
+with FileSchemaPruningTest
+with SharedSQLContext {
+  case class FullName(first: String, middle: String, last: String)
+  case class Contact(
+id: Int,
+name: FullName,
+address: String,
+pets: Int,
+friends: Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map())
+
+  val janeDoe = FullName("Jane", "X.", "Doe")
+  val johnDoe = FullName("John", "Y.", "Doe")
+  val susanSmith = FullName("Susan", "Z.", "Smith")
+
+  val contacts =
+Contact(0, janeDoe, "123 Main Street", 1, friends = Array(susanSmith),
+  relatives = Map("brother" -> johnDoe)) ::
+Contact(1, johnDoe, "321 Wall Street", 3, relatives = Map("sister" -> 
janeDoe)) :: Nil
+
+  case class Name(first: String, last: String)
+  case class BriefContact(id: Int, name: Name, address: String)
+
+  val briefContacts =
+BriefContact(2, Name("Janet", "Jones"), "567 Maple Drive") ::
+BriefContact(3, Name("Jim", "Jones"), "6242 Ash Street") :: Nil
+
+  case class ContactWithDataPartitionColumn(
+id: Int,
+name: FullName,
+address: String,
+pets: Int,
+friends: Array[FullName] = Array(),
+relatives: Map[String, FullName] = Map(),
+p: Int)
+
+  case class BriefContactWithDataPartitionColumn(id: Int, name: Name, 
address: String, p: Int)
+
+  val contactsWithDataPartitionColumn =
+contacts.map { case Contact(id, name, address, pets, friends, 
relatives) =>
+  ContactWithDataPartitionColumn(id, name, address, pets, friends, 
relatives, 1) }
+  val briefContactsWithDataPartitionColumn =
+briefContacts.map { case BriefContact(id, name, address) =>
+  BriefContactWithDataPartitionColumn(id, name, address, 2) }
+
+  testSchemaPruning("select a single complex field") {
+val query = sql("select name.middle from contacts order by id")
+checkScanSchemata(query, "struct>")
+checkAnswer(query, Row("X.") :: Row("Y.") :: Row(null) :: Row(null) :: 
Nil)
+  }
+
+  testSchemaPruning("select a single complex field and its parent struct") 
{
+val query = sql("select name.middle, name from contacts order by id")
+checkScanSchemata(query, 
"struct>")
+checkAnswer(query,
+  Row("X.", Row("Jane", "X.", "Doe")) ::
+  Row("Y.", Row("John", "Y.", "Doe")) ::
+  Row(null, Row("Janet", null, "Jones")) ::
+  Row(null, Row("Jim", null, "Jones")) ::
+  Nil)
+  }
+
+  testSchemaPruning("select a single complex field array and its parent 
struct array") {
+val query = sql("select friends.middle, friends from contacts where 
p=1 order by id")
+checkScanSchemata(query,
+  
"struct>>")
+checkAnswer(query,
+  Row(Array("Z."), Array(Row("Susan", "Z.", "Smith"))) ::
+  Row(Array.empty[String], Array.empty[Row]) ::
+  Nil)
+  }
+
+  testSchemaPruning("select a single complex field from a map entry and 
its parent map entry") {
+val query =
+  sql("select relatives[\"brother\"].middle, relatives[\"brother\"] 
from contacts where p=1 " +
+"order by id")
+checkScanSchemata(query,
+  
"struct>>")
+checkAnswer(query,
+  

[GitHub] spark pull request #21889: [SPARK-4502][SQL] Parquet nested column pruning -...

2018-07-26 Thread ajacques
GitHub user ajacques opened a pull request:

https://github.com/apache/spark/pull/21889

[SPARK-4502][SQL] Parquet nested column pruning - foundation (2nd attempt)

(Link to Jira: https://issues.apache.org/jira/browse/SPARK-4502)

**This is a restart of apache/spark#21320. Most of the discussion has taken 
place over there. I've only taken it as a based to make stylistic changes based 
on the code review to help move things along.**

Due to the urgency of the upcoming 2.4 code freeze, I'm going to open this 
PR to collect any feedback. This can be closed if you prefer to continue to the 
work in the original PR.

### What changes were proposed in this pull request?
One of the hallmarks of a column-oriented data storage format is the 
ability to read data from a subset of columns, efficiently skipping reads from 
other columns. Spark has long had support for pruning unneeded top-level schema 
fields from the scan of a parquet file. For example, consider a table, 
contacts, backed by parquet with the following Spark SQL schema:

```
root
 |-- name: struct
 ||-- first: string
 ||-- last: string
 |-- address: string
```
Parquet stores this table's data in three physical columns: name.first, 
name.last and address. To answer the query

`select address from contacts`
Spark will read only from the address column of parquet data. However, to 
answer the query

`select name.first from contacts`
Spark will read name.first and name.last from parquet.

This PR modifies Spark SQL to support a finer-grain of schema pruning. With 
this patch, Spark reads only the name.first column to answer the previous query.

### Implementation
There are two main components of this patch. First, there is a 
ParquetSchemaPruning optimizer rule for gathering the required schema fields of 
a PhysicalOperation over a parquet file, constructing a new schema based on 
those required fields and rewriting the plan in terms of that pruned schema. 
The pruned schema fields are pushed down to the parquet requested read schema. 
ParquetSchemaPruning uses a new ProjectionOverSchema extractor for rewriting a 
catalyst expression in terms of a pruned schema.

Second, the ParquetRowConverter has been patched to ensure the ordinals of 
the parquet columns read are correct for the pruned schema. ParquetReadSupport 
has been patched to address a compatibility mismatch between Spark's built in 
vectorized reader and the parquet-mr library's reader.

### Limitation
Among the complex Spark SQL data types, this patch supports parquet column 
pruning of nested sequences of struct fields only.

### How was this patch tested?
Care has been taken to ensure correctness and prevent regressions. A more 
advanced version of this patch incorporating optimizations for rewriting 
queries involving aggregations and joins has been running on a production Spark 
cluster at VideoAmp for several years. In that time, one bug was found and 
fixed early on, and we added a regression test for that bug.

We forward-ported this patch to Spark master in June 2016 and have been 
running this patch against Spark 2.x branches on ad-hoc clusters since then.

### Known Issues
Highlighted in 
https://github.com/apache/spark/pull/21320#issuecomment-408271470

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/ajacques/apache-spark 
spark-4502-parquet_column_pruning-foundation

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/21889.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #21889


commit 86c572a26918e6380762508d1d868fd5ea231ed1
Author: Michael Allman 
Date:   2016-06-24T17:21:24Z

[SPARK-4502][SQL] Parquet nested column pruning

commit 1ffbc6c8f8eaa27b50048e2124afdd2ff9e1e2b4
Author: Michael Allman 
Date:   2018-06-04T09:59:34Z

Refactor SelectedFieldSuite to make its tests simpler and more
comprehensible

commit 97cd30b221fe78a4ea61c6c24be370fc2dfdd498
Author: Michael Allman 
Date:   2018-06-04T10:02:28Z

Remove test "select function over nested data" of unknown origin and
purpose

commit c27e87924ed788a4253d077691401d0852306406
Author: Michael Allman 
Date:   2018-06-04T10:45:28Z

Improve readability of ParquetSchemaPruning and
ParquetSchemaPruningSuite. Add test to exercise whether the
requested root fields in a query exclude any attributes

commit 2fc1173499e1fc4ca4205db80e78ca6f110fb5dd
Author: Michael Allman 
Date:   2018-06-04T12:04:23Z

Don't handle non-data-field partition column names specially when
constructing the new projections in ParquetSchemaPruning. These
column projections are left unchanged by the transformDown function when