Repository: spark
Updated Branches:
  refs/heads/master c86a57f4d -> bf493686e


[SPARK-19411][SQL] Remove the metadata used to mark optional columns in merged 
Parquet schema for filter predicate pushdown

## What changes were proposed in this pull request?

There is a metadata introduced before to mark the optional columns in merged 
Parquet schema for filter predicate pushdown. As we upgrade to Parquet 1.8.2 
which includes the fix for the pushdown of optional columns, we don't need this 
metadata now.

## How was this patch tested?

Jenkins tests.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Liang-Chi Hsieh <vii...@gmail.com>

Closes #16756 from viirya/remove-optional-metadata.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bf493686
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bf493686
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bf493686

Branch: refs/heads/master
Commit: bf493686eb17006727b3ec81849b22f3df68fdef
Parents: c86a57f
Author: Liang-Chi Hsieh <vii...@gmail.com>
Authored: Fri Feb 3 11:58:42 2017 +0100
Committer: Reynold Xin <r...@databricks.com>
Committed: Fri Feb 3 11:58:42 2017 +0100

----------------------------------------------------------------------
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  4 +-
 .../org/apache/spark/sql/types/StructType.scala | 15 +----
 .../apache/spark/sql/types/DataTypeSuite.scala  | 49 --------------
 .../datasources/parquet/ParquetFileFormat.scala | 10 +--
 .../datasources/parquet/ParquetFilters.scala    | 13 +---
 .../parquet/ParquetFilterSuite.scala            | 68 ++++----------------
 6 files changed, 22 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bf493686/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 9e6dbf3..4913dcc 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -575,9 +575,9 @@ class Analyzer(
     //     |- view2 (defaultDatabase = db2)
     //        |- view3 (defaultDatabase = db3)
     //   |- view4 (defaultDatabase = db4)
-    // In this case, the view `view1` is a nested view, it directly references 
`table2`、`view2`
+    // In this case, the view `view1` is a nested view, it directly references 
`table2`, `view2`
     // and `view4`, the view `view2` references `view3`. On resolving the 
table, we look up the
-    // relations `table2`、`view2`、`view4` using the default database 
`db1`, and look up the
+    // relations `table2`, `view2`, `view4` using the default database `db1`, 
and look up the
     // relation `view3` using the default database `db2`.
     //
     // Note this is compatible with the views defined by older versions of 
Spark(before 2.2), which

http://git-wip-us.apache.org/repos/asf/spark/blob/bf493686/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index ca0000a..8d8b5b8 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -402,13 +402,6 @@ case class StructType(fields: Array[StructField]) extends 
DataType with Seq[Stru
 @InterfaceStability.Stable
 object StructType extends AbstractDataType {
 
-  /**
-   * A key used in field metadata to indicate that the field comes from the 
result of merging
-   * two different StructTypes that do not always contain the field. That is 
to say, the field
-   * might be missing (optional) from one of the StructTypes.
-   */
-  private[sql] val metadataKeyForOptionalField = "_OPTIONAL_"
-
   override private[sql] def defaultConcreteType: DataType = new StructType
 
   override private[sql] def acceptsType(other: DataType): Boolean = {
@@ -463,8 +456,6 @@ object StructType extends AbstractDataType {
 
       case (StructType(leftFields), StructType(rightFields)) =>
         val newFields = ArrayBuffer.empty[StructField]
-        // This metadata will record the fields that only exist in one of two 
StructTypes
-        val optionalMeta = new MetadataBuilder()
 
         val rightMapped = fieldsMap(rightFields)
         leftFields.foreach {
@@ -476,8 +467,7 @@ object StructType extends AbstractDataType {
                   nullable = leftNullable || rightNullable)
               }
               .orElse {
-                optionalMeta.putBoolean(metadataKeyForOptionalField, value = 
true)
-                Some(leftField.copy(metadata = optionalMeta.build()))
+                Some(leftField)
               }
               .foreach(newFields += _)
         }
@@ -486,8 +476,7 @@ object StructType extends AbstractDataType {
         rightFields
           .filterNot(f => leftMapped.get(f.name).nonEmpty)
           .foreach { f =>
-            optionalMeta.putBoolean(metadataKeyForOptionalField, value = true)
-            newFields += f.copy(metadata = optionalMeta.build())
+            newFields += f
           }
 
         StructType(newFields)

http://git-wip-us.apache.org/repos/asf/spark/blob/bf493686/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
index 12d2c00..61e1ec7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DataTypeSuite.scala
@@ -132,55 +132,6 @@ class DataTypeSuite extends SparkFunSuite {
     assert(mapped === expected)
   }
 
-  test("merge where right is empty") {
-    val left = StructType(
-      StructField("a", LongType) ::
-      StructField("b", FloatType) :: Nil)
-
-    val right = StructType(List())
-    val merged = left.merge(right)
-
-    assert(DataType.equalsIgnoreCompatibleNullability(merged, left))
-    
assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-    
assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-  }
-
-  test("merge where left is empty") {
-
-    val left = StructType(List())
-
-    val right = StructType(
-      StructField("a", LongType) ::
-      StructField("b", FloatType) :: Nil)
-
-    val merged = left.merge(right)
-
-    assert(DataType.equalsIgnoreCompatibleNullability(merged, right))
-    
assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-    
assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-  }
-
-  test("merge where both are non-empty") {
-    val left = StructType(
-      StructField("a", LongType) ::
-      StructField("b", FloatType) :: Nil)
-
-    val right = StructType(
-      StructField("c", LongType) :: Nil)
-
-    val expected = StructType(
-      StructField("a", LongType) ::
-      StructField("b", FloatType) ::
-      StructField("c", LongType) :: Nil)
-
-    val merged = left.merge(right)
-
-    assert(DataType.equalsIgnoreCompatibleNullability(merged, expected))
-    
assert(merged("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-    
assert(merged("b").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-    
assert(merged("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-  }
-
   test("merge where right contains type conflict") {
     val left = StructType(
       StructField("a", LongType) ::

http://git-wip-us.apache.org/repos/asf/spark/blob/bf493686/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index d9831c5..828949e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -109,9 +109,7 @@ class ParquetFileFormat
 
     // We want to clear this temporary metadata from saving into Parquet file.
     // This metadata is only useful for detecting optional columns when 
pushdowning filters.
-    val dataSchemaToWrite = 
StructType.removeMetadata(StructType.metadataKeyForOptionalField,
-      dataSchema).asInstanceOf[StructType]
-    ParquetWriteSupport.setSchema(dataSchemaToWrite, conf)
+    ParquetWriteSupport.setSchema(dataSchema, conf)
 
     // Sets flags for `CatalystSchemaConverter` (which converts Catalyst 
schema to Parquet schema)
     // and `CatalystWriteSupport` (writing actual rows to Parquet files).
@@ -307,11 +305,7 @@ class ParquetFileFormat
       ParquetWriteSupport.SPARK_ROW_SCHEMA,
       ParquetSchemaConverter.checkFieldNames(requiredSchema).json)
 
-    // We want to clear this temporary metadata from saving into Parquet file.
-    // This metadata is only useful for detecting optional columns when 
pushdowning filters.
-    val dataSchemaToWrite = 
StructType.removeMetadata(StructType.metadataKeyForOptionalField,
-      requiredSchema).asInstanceOf[StructType]
-    ParquetWriteSupport.setSchema(dataSchemaToWrite, hadoopConf)
+    ParquetWriteSupport.setSchema(requiredSchema, hadoopConf)
 
     // Sets flags for `CatalystSchemaConverter`
     hadoopConf.setBoolean(

http://git-wip-us.apache.org/repos/asf/spark/blob/bf493686/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 7730d1f..2efeb80 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -169,23 +169,14 @@ private[parquet] object ParquetFilters {
   }
 
   /**
-   * Returns a map from name of the column to the data type, if predicate push 
down applies
-   * (i.e. not an optional field).
-   *
-   * SPARK-11955: The optional fields will have metadata 
StructType.metadataKeyForOptionalField.
-   * These fields only exist in one side of merged schemas. Due to that, we 
can't push down filters
-   * using such fields, otherwise Parquet library will throw exception 
(PARQUET-389).
-   * Here we filter out such fields.
+   * Returns a map from name of the column to the data type, if predicate push 
down applies.
    */
   private def getFieldMap(dataType: DataType): Map[String, DataType] = 
dataType match {
     case StructType(fields) =>
       // Here we don't flatten the fields in the nested schema but just look 
up through
       // root fields. Currently, accessing to nested fields does not push down 
filters
       // and it does not support to create filters for them.
-      fields.filter { f =>
-        !f.metadata.contains(StructType.metadataKeyForOptionalField) ||
-          !f.metadata.getBoolean(StructType.metadataKeyForOptionalField)
-      }.map(f => f.name -> f.dataType).toMap
+      fields.map(f => f.name -> f.dataType).toMap
     case _ => Map.empty[String, DataType]
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/bf493686/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index a0d57d7..fa046c8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -368,76 +368,36 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
   }
 
 
-  test("SPARK-11103: Filter applied on merged Parquet schema with new column 
fails") {
+  test("Filter applied on merged Parquet schema with new column should work") {
     import testImplicits._
     Seq("true", "false").map { vectorized =>
       withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_ENABLED.key -> "true",
         SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
         SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized) {
         withTempPath { dir =>
-          val pathOne = s"${dir.getCanonicalPath}/table1"
-          (1 to 3).map(i => (i, i.toString)).toDF("a", 
"b").write.parquet(pathOne)
-          val pathTwo = s"${dir.getCanonicalPath}/table2"
-          (1 to 3).map(i => (i, i.toString)).toDF("c", 
"b").write.parquet(pathTwo)
-
-          // If the "c = 1" filter gets pushed down, this query will throw an 
exception which
-          // Parquet emits. This is a Parquet issue (PARQUET-389).
-          val df = spark.read.parquet(pathOne, pathTwo).filter("c = 
1").selectExpr("c", "b", "a")
+          val path1 = s"${dir.getCanonicalPath}/table1"
+          (1 to 3).map(i => (i, i.toString)).toDF("a", 
"b").write.parquet(path1)
+          val path2 = s"${dir.getCanonicalPath}/table2"
+          (1 to 3).map(i => (i, i.toString)).toDF("c", 
"b").write.parquet(path2)
+
+          // No matter "c = 1" gets pushed down or not, this query should work 
without exception.
+          val df = spark.read.parquet(path1, path2).filter("c = 
1").selectExpr("c", "b", "a")
           checkAnswer(
             df,
             Row(1, "1", null))
 
-          // The fields "a" and "c" only exist in one Parquet file.
-          
assert(df.schema("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-          
assert(df.schema("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-
-          val pathThree = s"${dir.getCanonicalPath}/table3"
-          df.write.parquet(pathThree)
-
-          // We will remove the temporary metadata when writing Parquet file.
-          val schema = spark.read.parquet(pathThree).schema
-          
assert(schema.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
-
-          val pathFour = s"${dir.getCanonicalPath}/table4"
+          val path3 = s"${dir.getCanonicalPath}/table3"
           val dfStruct = sparkContext.parallelize(Seq((1, 1))).toDF("a", "b")
-          dfStruct.select(struct("a").as("s")).write.parquet(pathFour)
+          dfStruct.select(struct("a").as("s")).write.parquet(path3)
 
-          val pathFive = s"${dir.getCanonicalPath}/table5"
+          val path4 = s"${dir.getCanonicalPath}/table4"
           val dfStruct2 = sparkContext.parallelize(Seq((1, 1))).toDF("c", "b")
-          dfStruct2.select(struct("c").as("s")).write.parquet(pathFive)
+          dfStruct2.select(struct("c").as("s")).write.parquet(path4)
 
-          // If the "s.c = 1" filter gets pushed down, this query will throw 
an exception which
-          // Parquet emits.
-          val dfStruct3 = spark.read.parquet(pathFour, pathFive).filter("s.c = 
1")
+          // No matter "s.c = 1" gets pushed down or not, this query should 
work without exception.
+          val dfStruct3 = spark.read.parquet(path3, path4).filter("s.c = 1")
             .selectExpr("s")
           checkAnswer(dfStruct3, Row(Row(null, 1)))
-
-          // The fields "s.a" and "s.c" only exist in one Parquet file.
-          val field = dfStruct3.schema("s").dataType.asInstanceOf[StructType]
-          
assert(field("a").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-          
assert(field("c").metadata.getBoolean(StructType.metadataKeyForOptionalField))
-
-          val pathSix = s"${dir.getCanonicalPath}/table6"
-          dfStruct3.write.parquet(pathSix)
-
-          // We will remove the temporary metadata when writing Parquet file.
-          val forPathSix = spark.read.parquet(pathSix).schema
-          
assert(forPathSix.forall(!_.metadata.contains(StructType.metadataKeyForOptionalField)))
-
-          // sanity test: make sure optional metadata field is not wrongly set.
-          val pathSeven = s"${dir.getCanonicalPath}/table7"
-          (1 to 3).map(i => (i, i.toString)).toDF("a", 
"b").write.parquet(pathSeven)
-          val pathEight = s"${dir.getCanonicalPath}/table8"
-          (4 to 6).map(i => (i, i.toString)).toDF("a", 
"b").write.parquet(pathEight)
-
-          val df2 = spark.read.parquet(pathSeven, pathEight).filter("a = 
1").selectExpr("a", "b")
-          checkAnswer(
-            df2,
-            Row(1, "1"))
-
-          // The fields "a" and "b" exist in both two Parquet files. No 
metadata is set.
-          
assert(!df2.schema("a").metadata.contains(StructType.metadataKeyForOptionalField))
-          
assert(!df2.schema("b").metadata.contains(StructType.metadataKeyForOptionalField))
         }
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to