Repository: spark
Updated Branches:
  refs/heads/master d65656c45 -> 391e6be0a


[SPARK-10301] [SQL] Fixes schema merging for nested structs

This PR can be quite challenging to review.  I'm trying to give a detailed 
description of the problem as well as its solution here.

When reading Parquet files, we need to specify a potentially nested Parquet 
schema (of type `MessageType`) as requested schema for column pruning.  This 
Parquet schema is translated from a Catalyst schema (of type `StructType`), 
which is generated by the query planner and represents all requested columns.  
However, this translation can be fairly complicated because of several reasons:

1.  Requested schema must conform to the real schema of the physical file to be 
read.

    This means we have to tailor the actual file schema of every individual 
physical Parquet file to be read according to the given Catalyst schema.  
Fortunately we are already doing this in Spark 1.5 by pushing request schema 
conversion to executor side in PR #7231.

1.  Support for schema merging.

    A single Parquet dataset may consist of multiple physical Parquet files 
come with different but compatible schemas.  This means we may request for a 
column path that doesn't exist in a physical Parquet file.  All requested 
column paths can be nested.  For example, for a Parquet file schema

    ```
    message root {
      required group f0 {
        required group f00 {
          required int32 f000;
          required binary f001 (UTF8);
        }
      }
    }
    ```

    we may request for column paths defined in the following schema:

    ```
    message root {
      required group f0 {
        required group f00 {
          required binary f001 (UTF8);
          required float f002;
        }
      }

      optional double f1;
    }
    ```

    Notice that we pruned column path `f0.f00.f000`, but added `f0.f00.f002` 
and `f1`.

    The good news is that Parquet handles non-existing column paths properly 
and always returns null for them.

1.  The map from `StructType` to `MessageType` is a one-to-many map.

    This is the most unfortunate part.

    Due to historical reasons (dark histories!), schemas of Parquet files 
generated by different libraries have different "flavors".  For example, to 
handle a schema with a single non-nullable column, whose type is an array of 
non-nullable integers, parquet-protobuf generates the following Parquet schema:

    ```
    message m0 {
      repeated int32 f;
    }
    ```

    while parquet-avro generates another version:

    ```
    message m1 {
      required group f (LIST) {
        repeated int32 array;
      }
    }
    ```

    and parquet-thrift spills this:

    ```
    message m1 {
      required group f (LIST) {
        repeated int32 f_tuple;
      }
    }
    ```

    All of them can be mapped to the following _unique_ Catalyst schema:

    ```
    StructType(
      StructField(
        "f",
        ArrayType(IntegerType, containsNull = false),
        nullable = false))
    ```

    This greatly complicates Parquet requested schema construction, since the 
path of a given column varies in different cases.  To read the array elements 
from files with the above schemas, we must use `f` for `m0`, `f.array` for 
`m1`, and `f.f_tuple` for `m2`.

In earlier Spark versions, we didn't try to fix this issue properly.  Spark 1.4 
and prior versions simply translate the Catalyst schema in a way more or less 
compatible with parquet-hive and parquet-avro, but is broken in many other 
cases.  Earlier revisions of Spark 1.5 only try to tailor the Parquet file 
schema at the first level, and ignore nested ones.  This caused [SPARK-10301] 
[spark-10301] as well as [SPARK-10005] [spark-10005].  In PR #8228, I tried to 
avoid the hard part of the problem and made a minimum change in 
`CatalystRowConverter` to fix SPARK-10005.  However, when taking SPARK-10301 
into consideration, keeping hacking `CatalystRowConverter` doesn't seem to be a 
good idea.  So this PR is an attempt to fix the problem in a proper way.

For a given physical Parquet file with schema `ps` and a compatible Catalyst 
requested schema `cs`, we use the following algorithm to tailor `ps` to get the 
result Parquet requested schema `ps'`:

For a leaf column path `c` in `cs`:

- if `c` exists in `cs` and a corresponding Parquet column path `c'` can be 
found in `ps`, `c'` should be included in `ps'`;
- otherwise, we convert `c` to a Parquet column path `c"` using 
`CatalystSchemaConverter`, and include `c"` in `ps'`;
- no other column paths should exist in `ps'`.

Then comes the most tedious part:

> Given `cs`, `ps`, and `c`, how to locate `c'` in `ps`?

Unfortunately, there's no quick answer, and we have to enumerate all possible 
structures defined in parquet-format spec.  They are:

1.  the standard structure of nested types, and
1.  cases defined in all backwards-compatibility rules for `LIST` and `MAP`.

The core part of this PR is `CatalystReadSupport.clipParquetType()`, which 
tailors a given Parquet file schema according to a requested schema in its 
Catalyst form.  Backwards-compatibility rules of `LIST` and `MAP` are covered 
in `clipParquetListType()` and `clipParquetMapType()` respectively.  The column 
path selection algorithm is implemented in `clipParquetGroupFields()`.

With this PR, we no longer need to do schema tailoring in `CatalystReadSupport` 
and `CatalystRowConverter`.  Another benefit is that, now we can also read 
Parquet datasets consist of files with different physical Parquet schema but 
share the same logical schema, for example, files generated by different 
Parquet libraries.  This situation is illustrated by [this test case] 
[test-case].

[spark-10301]: https://issues.apache.org/jira/browse/SPARK-10301
[spark-10005]: https://issues.apache.org/jira/browse/SPARK-10005
[test-case]: 
https://github.com/liancheng/spark/commit/38644d8a45175cbdf20d2ace021c2c2544a50ab3#diff-a9b98e28ce3ae30641829dffd1173be2R26

Author: Cheng Lian <l...@databricks.com>

Closes #8509 from liancheng/spark-10301/fix-parquet-requested-schema.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/391e6be0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/391e6be0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/391e6be0

Branch: refs/heads/master
Commit: 391e6be0ae883f3ea0fab79463eb8b618af79afb
Parents: d65656c
Author: Cheng Lian <l...@databricks.com>
Authored: Tue Sep 1 16:52:59 2015 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Tue Sep 1 16:52:59 2015 +0800

----------------------------------------------------------------------
 .../parquet/CatalystReadSupport.scala           | 235 ++++++++++----
 .../parquet/CatalystRowConverter.scala          |  51 +--
 .../parquet/CatalystSchemaConverter.scala       |  14 +-
 .../parquet/ParquetAvroCompatibilitySuite.scala |   1 +
 .../parquet/ParquetInteroperabilitySuite.scala  |  90 ++++++
 .../datasources/parquet/ParquetQuerySuite.scala |  77 +++++
 .../parquet/ParquetSchemaSuite.scala            | 310 +++++++++++++++++++
 7 files changed, 653 insertions(+), 125 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/391e6be0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 0a6bb44..dc4ff06 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -19,17 +19,18 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.util.{Map => JMap}
 
-import scala.collection.JavaConverters._
+import scala.collection.JavaConverters.{collectionAsScalaIterableConverter, 
mapAsJavaMapConverter, mapAsScalaMapConverter}
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
 import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
 import org.apache.parquet.io.api.RecordMaterializer
-import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.Type.Repetition
+import org.apache.parquet.schema._
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types._
 
 private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] 
with Logging {
   // Called after `init()` when initializing Parquet record reader.
@@ -81,70 +82,10 @@ private[parquet] class CatalystReadSupport extends 
ReadSupport[InternalRow] with
     // `StructType` containing all requested columns.
     val maybeRequestedSchema = 
Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
 
-    // Below we construct a Parquet schema containing all requested columns.  
This schema tells
-    // Parquet which columns to read.
-    //
-    // If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet 
schema.  Otherwise,
-    // we have to fallback to the full file schema which contains all columns 
in the file.
-    // Obviously this may waste IO bandwidth since it may read more columns 
than requested.
-    //
-    // Two things to note:
-    //
-    // 1. It's possible that some requested columns don't exist in the target 
Parquet file.  For
-    //    example, in the case of schema merging, the globally merged schema 
may contain extra
-    //    columns gathered from other Parquet files.  These columns will be 
simply filled with nulls
-    //    when actually reading the target Parquet file.
-    //
-    // 2. When `maybeRequestedSchema` is available, we can't simply convert 
the Catalyst schema to
-    //    Parquet schema using `CatalystSchemaConverter`, because the mapping 
is not unique due to
-    //    non-standard behaviors of some Parquet libraries/tools.  For 
example, a Parquet file
-    //    containing a single integer array field `f1` may have the following 
legacy 2-level
-    //    structure:
-    //
-    //      message root {
-    //        optional group f1 (LIST) {
-    //          required INT32 element;
-    //        }
-    //      }
-    //
-    //    while `CatalystSchemaConverter` may generate a standard 3-level 
structure:
-    //
-    //      message root {
-    //        optional group f1 (LIST) {
-    //          repeated group list {
-    //            required INT32 element;
-    //          }
-    //        }
-    //      }
-    //
-    //    Apparently, we can't use the 2nd schema to read the target Parquet 
file as they have
-    //    different physical structures.
     val parquetRequestedSchema =
       maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
-        val toParquet = new CatalystSchemaConverter(conf)
-        val fileSchema = context.getFileSchema.asGroupType()
-        val fileFieldNames = fileSchema.getFields.asScala.map(_.getName).toSet
-
-        StructType
-          // Deserializes the Catalyst schema of requested columns
-          .fromString(schemaString)
-          .map { field =>
-            if (fileFieldNames.contains(field.name)) {
-              // If the field exists in the target Parquet file, extracts the 
field type from the
-              // full file schema and makes a single-field Parquet schema
-              new MessageType("root", fileSchema.getType(field.name))
-            } else {
-              // Otherwise, just resorts to `CatalystSchemaConverter`
-              toParquet.convert(StructType(Array(field)))
-            }
-          }
-          // Merges all single-field Parquet schemas to form a complete schema 
for all requested
-          // columns.  Note that it's possible that no columns are requested 
at all (e.g., count
-          // some partition column of a partitioned Parquet table). That's why 
`fold` is used here
-          // and always fallback to an empty Parquet schema.
-          .fold(new MessageType("root")) {
-            _ union _
-          }
+        val catalystRequestedSchema = StructType.fromString(schemaString)
+        CatalystReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
       }
 
     val metadata =
@@ -160,4 +101,168 @@ private[parquet] object CatalystReadSupport {
   val SPARK_ROW_REQUESTED_SCHEMA = 
"org.apache.spark.sql.parquet.row.requested_schema"
 
   val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
+
+  /**
+   * Tailors `parquetSchema` according to `catalystSchema` by removing column 
paths don't exist
+   * in `catalystSchema`, and adding those only exist in `catalystSchema`.
+   */
+  def clipParquetSchema(parquetSchema: MessageType, catalystSchema: 
StructType): MessageType = {
+    val clippedParquetFields = 
clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
+    Types.buildMessage().addFields(clippedParquetFields: _*).named("root")
+  }
+
+  private def clipParquetType(parquetType: Type, catalystType: DataType): Type 
= {
+    catalystType match {
+      case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
+        // Only clips array types with nested type as element type.
+        clipParquetListType(parquetType.asGroupType(), t.elementType)
+
+      case t: MapType if !isPrimitiveCatalystType(t.valueType) =>
+        // Only clips map types with nested type as value type.
+        clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType)
+
+      case t: StructType =>
+        clipParquetGroup(parquetType.asGroupType(), t)
+
+      case _ =>
+        parquetType
+    }
+  }
+
+  /**
+   * Whether a Catalyst [[DataType]] is primitive.  Primitive [[DataType]] is 
not equivalent to
+   * [[AtomicType]].  For example, [[CalendarIntervalType]] is primitive, but 
it's not an
+   * [[AtomicType]].
+   */
+  private def isPrimitiveCatalystType(dataType: DataType): Boolean = {
+    dataType match {
+      case _: ArrayType | _: MapType | _: StructType => false
+      case _ => true
+    }
+  }
+
+  /**
+   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[ArrayType]].  The element type
+   * of the [[ArrayType]] should also be a nested type, namely an 
[[ArrayType]], a [[MapType]], or a
+   * [[StructType]].
+   */
+  private def clipParquetListType(parquetList: GroupType, elementType: 
DataType): Type = {
+    // Precondition of this method, should only be called for lists with 
nested element types.
+    assert(!isPrimitiveCatalystType(elementType))
+
+    // Unannotated repeated group should be interpreted as required list of 
required element, so
+    // list element type is just the group itself.  Clip it.
+    if (parquetList.getOriginalType == null && 
parquetList.isRepetition(Repetition.REPEATED)) {
+      clipParquetType(parquetList, elementType)
+    } else {
+      assert(
+        parquetList.getOriginalType == OriginalType.LIST,
+        "Invalid Parquet schema. " +
+          "Original type of annotated Parquet lists must be LIST: " +
+          parquetList.toString)
+
+      assert(
+        parquetList.getFieldCount == 1 && 
parquetList.getType(0).isRepetition(Repetition.REPEATED),
+        "Invalid Parquet schema. " +
+          "LIST-annotated group should only have exactly one repeated field: " 
+
+          parquetList)
+
+      // Precondition of this method, should only be called for lists with 
nested element types.
+      assert(!parquetList.getType(0).isPrimitive)
+
+      val repeatedGroup = parquetList.getType(0).asGroupType()
+
+      // If the repeated field is a group with multiple fields, or the 
repeated field is a group
+      // with one field and is named either "array" or uses the LIST-annotated 
group's name with
+      // "_tuple" appended then the repeated type is the element type and 
elements are required.
+      // Build a new LIST-annotated group with clipped `repeatedGroup` as 
element type and the
+      // only field.
+      if (
+        repeatedGroup.getFieldCount > 1 ||
+        repeatedGroup.getName == "array" ||
+        repeatedGroup.getName == parquetList.getName + "_tuple"
+      ) {
+        Types
+          .buildGroup(parquetList.getRepetition)
+          .as(OriginalType.LIST)
+          .addField(clipParquetType(repeatedGroup, elementType))
+          .named(parquetList.getName)
+      } else {
+        // Otherwise, the repeated field's type is the element type with the 
repeated field's
+        // repetition.
+        Types
+          .buildGroup(parquetList.getRepetition)
+          .as(OriginalType.LIST)
+          .addField(
+            Types
+              .repeatedGroup()
+              .addField(clipParquetType(repeatedGroup.getType(0), elementType))
+              .named(repeatedGroup.getName))
+          .named(parquetList.getName)
+      }
+    }
+  }
+
+  /**
+   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[MapType]].  The value type
+   * of the [[MapType]] should also be a nested type, namely an [[ArrayType]], 
a [[MapType]], or a
+   * [[StructType]].  Note that key type of any [[MapType]] is always a 
primitive type.
+   */
+  private def clipParquetMapType(
+      parquetMap: GroupType, keyType: DataType, valueType: DataType): 
GroupType = {
+    // Precondition of this method, should only be called for maps with nested 
value types.
+    assert(!isPrimitiveCatalystType(valueType))
+
+    val repeatedGroup = parquetMap.getType(0).asGroupType()
+    val parquetKeyType = repeatedGroup.getType(0)
+    val parquetValueType = repeatedGroup.getType(1)
+
+    val clippedRepeatedGroup =
+      Types
+        .repeatedGroup()
+        .as(repeatedGroup.getOriginalType)
+        .addField(parquetKeyType)
+        .addField(clipParquetType(parquetValueType, valueType))
+        .named(repeatedGroup.getName)
+
+    Types
+      .buildGroup(parquetMap.getRepetition)
+      .as(parquetMap.getOriginalType)
+      .addField(clippedRepeatedGroup)
+      .named(parquetMap.getName)
+  }
+
+  /**
+   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[StructType]].
+   *
+   * @return A clipped [[GroupType]], which has at least one field.
+   * @note Parquet doesn't allow creating empty [[GroupType]] instances except 
for empty
+   *       [[MessageType]].  Because it's legal to construct an empty 
requested schema for column
+   *       pruning.
+   */
+  private def clipParquetGroup(parquetRecord: GroupType, structType: 
StructType): GroupType = {
+    val clippedParquetFields = clipParquetGroupFields(parquetRecord, 
structType)
+    Types
+      .buildGroup(parquetRecord.getRepetition)
+      .as(parquetRecord.getOriginalType)
+      .addFields(clippedParquetFields: _*)
+      .named(parquetRecord.getName)
+  }
+
+  /**
+   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[StructType]].
+   *
+   * @return A list of clipped [[GroupType]] fields, which can be empty.
+   */
+  private def clipParquetGroupFields(
+      parquetRecord: GroupType, structType: StructType): Seq[Type] = {
+    val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName 
-> f).toMap
+    val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true)
+    structType.map { f =>
+      parquetFieldMap
+        .get(f.name)
+        .map(clipParquetType(_, f.dataType))
+        .getOrElse(toParquet.convertField(f))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/391e6be0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index fe13dfb..f17e794 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -113,31 +113,6 @@ private[parquet] class CatalystPrimitiveConverter(val 
updater: ParentContainerUp
  * When used as a root converter, [[NoopUpdater]] should be used since root 
converters don't have
  * any "parent" container.
  *
- * @note Constructor argument [[parquetType]] refers to requested fields of 
the actual schema of the
- *       Parquet file being read, while constructor argument [[catalystType]] 
refers to requested
- *       fields of the global schema.  The key difference is that, in case of 
schema merging,
- *       [[parquetType]] can be a subset of [[catalystType]].  For example, 
it's possible to have
- *       the following [[catalystType]]:
- *       {{{
- *         new StructType()
- *           .add("f1", IntegerType, nullable = false)
- *           .add("f2", StringType, nullable = true)
- *           .add("f3", new StructType()
- *             .add("f31", DoubleType, nullable = false)
- *             .add("f32", IntegerType, nullable = true)
- *             .add("f33", StringType, nullable = true), nullable = false)
- *       }}}
- *       and the following [[parquetType]] (`f2` and `f32` are missing):
- *       {{{
- *         message root {
- *           required int32 f1;
- *           required group f3 {
- *             required double f31;
- *             optional binary f33 (utf8);
- *           }
- *         }
- *       }}}
- *
  * @param parquetType Parquet schema of Parquet records
  * @param catalystType Spark SQL schema that corresponds to the Parquet record 
type
  * @param updater An updater which propagates converted field values to the 
parent container
@@ -179,31 +154,7 @@ private[parquet] class CatalystRowConverter(
 
   // Converters for each field.
   private val fieldConverters: Array[Converter with HasParentContainerUpdater] 
= {
-    // In case of schema merging, `parquetType` can be a subset of 
`catalystType`.  We need to pad
-    // those missing fields and create converters for them, although values of 
these fields are
-    // always null.
-    val paddedParquetFields = {
-      val parquetFields = parquetType.getFields.asScala
-      val parquetFieldNames = parquetFields.map(_.getName).toSet
-      val missingFields = catalystType.filterNot(f => 
parquetFieldNames.contains(f.name))
-
-      // We don't need to worry about feature flag arguments like 
`assumeBinaryIsString` when
-      // creating the schema converter here, since values of missing fields 
are always null.
-      val toParquet = new CatalystSchemaConverter()
-
-      (parquetFields ++ missingFields.map(toParquet.convertField)).sortBy { f 
=>
-        catalystType.indexWhere(_.name == f.getName)
-      }
-    }
-
-    if (paddedParquetFields.length != catalystType.length) {
-      throw new UnsupportedOperationException(
-        "A Parquet file's schema has different number of fields with the table 
schema. " +
-          "Please enable schema merging by setting \"mergeSchema\" to true 
when load " +
-          "a Parquet dataset or set spark.sql.parquet.mergeSchema to true in 
SQLConf.")
-    }
-
-    paddedParquetFields.zip(catalystType).zipWithIndex.map {
+    parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {
       case ((parquetFieldType, catalystField), ordinal) =>
         // Converted field value should be set to the `ordinal`-th cell of 
`currentRow`
         newConverter(parquetFieldType, catalystField.dataType, new 
RowUpdater(currentRow, ordinal))

http://git-wip-us.apache.org/repos/asf/spark/blob/391e6be0/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index be6c054..a21ab1d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -55,16 +55,10 @@ import org.apache.spark.sql.{AnalysisException, SQLConf}
  *        to old style non-standard behaviors.
  */
 private[parquet] class CatalystSchemaConverter(
-    private val assumeBinaryIsString: Boolean,
-    private val assumeInt96IsTimestamp: Boolean,
-    private val followParquetFormatSpec: Boolean) {
-
-  // Only used when constructing converter for converting Spark SQL schema to 
Parquet schema, in
-  // which case `assumeInt96IsTimestamp` and `assumeBinaryIsString` are 
irrelevant.
-  def this() = this(
-    assumeBinaryIsString = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
-    assumeInt96IsTimestamp = 
SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
-    followParquetFormatSpec = 
SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get)
+    assumeBinaryIsString: Boolean = 
SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
+    assumeInt96IsTimestamp: Boolean = 
SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
+    followParquetFormatSpec: Boolean = 
SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get
+) {
 
   def this(conf: SQLConf) = this(
     assumeBinaryIsString = conf.isParquetBinaryAsString,

http://git-wip-us.apache.org/repos/asf/spark/blob/391e6be0/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
index bd7cf8c..36b929e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
+import java.io.File
 import java.nio.ByteBuffer
 import java.util.{List => JList, Map => JMap}
 

http://git-wip-us.apache.org/repos/asf/spark/blob/391e6be0/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
new file mode 100644
index 0000000..83b65fb
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with 
SharedSQLContext {
+  test("parquet files with different physical schemas but share the same 
logical schema") {
+    import ParquetCompatibilityTest._
+
+    // This test case writes two Parquet files, both representing the 
following Catalyst schema
+    //
+    //   StructType(
+    //     StructField(
+    //       "f",
+    //       ArrayType(IntegerType, containsNull = false),
+    //       nullable = false))
+    //
+    // The first Parquet file comes with parquet-avro style 2-level 
LIST-annotated group, while the
+    // other one comes with parquet-protobuf style 1-level unannotated 
primitive field.
+    withTempDir { dir =>
+      val avroStylePath = new File(dir, "avro-style").getCanonicalPath
+      val protobufStylePath = new File(dir, "protobuf-style").getCanonicalPath
+
+      val avroStyleSchema =
+        """message avro_style {
+          |  required group f (LIST) {
+          |    repeated int32 array;
+          |  }
+          |}
+        """.stripMargin
+
+      writeDirect(avroStylePath, avroStyleSchema, { rc =>
+        rc.message {
+          rc.field("f", 0) {
+            rc.group {
+              rc.field("array", 0) {
+                rc.addInteger(0)
+                rc.addInteger(1)
+              }
+            }
+          }
+        }
+      })
+
+      logParquetSchema(avroStylePath)
+
+      val protobufStyleSchema =
+        """message protobuf_style {
+          |  repeated int32 f;
+          |}
+        """.stripMargin
+
+      writeDirect(protobufStylePath, protobufStyleSchema, { rc =>
+        rc.message {
+          rc.field("f", 0) {
+            rc.addInteger(2)
+            rc.addInteger(3)
+          }
+        }
+      })
+
+      logParquetSchema(protobufStylePath)
+
+      checkAnswer(
+        sqlContext.read.parquet(dir.getCanonicalPath),
+        Seq(
+          Row(Seq(0, 1)),
+          Row(Seq(2, 3))))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/391e6be0/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index b7b70c2..a379523 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -229,4 +229,81 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
       }
     }
   }
+
+  test("SPARK-10301 Clipping nested structs in requested schema") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val df = sqlContext
+        .range(1)
+        .selectExpr("NAMED_STRUCT('a', id, 'b', id) AS s")
+        .coalesce(1)
+
+      df.write.mode("append").parquet(path)
+
+      val userDefinedSchema = new StructType()
+        .add("s", new StructType().add("a", LongType, nullable = true), 
nullable = true)
+
+      checkAnswer(
+        sqlContext.read.schema(userDefinedSchema).parquet(path),
+        Row(Row(0)))
+    }
+
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      val df1 = sqlContext
+        .range(1)
+        .selectExpr("NAMED_STRUCT('a', id, 'b', id) AS s")
+        .coalesce(1)
+
+      val df2 = sqlContext
+        .range(1, 2)
+        .selectExpr("NAMED_STRUCT('b', id, 'c', id) AS s")
+        .coalesce(1)
+
+      df1.write.parquet(path)
+      df2.write.mode(SaveMode.Append).parquet(path)
+
+      val userDefinedSchema = new StructType()
+        .add("s",
+          new StructType()
+            .add("a", LongType, nullable = true)
+            .add("c", LongType, nullable = true),
+          nullable = true)
+
+      checkAnswer(
+        sqlContext.read.schema(userDefinedSchema).parquet(path),
+        Seq(
+          Row(Row(0, null)),
+          Row(Row(null, 1))))
+    }
+
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      val df = sqlContext
+        .range(1)
+        .selectExpr("NAMED_STRUCT('a', ARRAY(NAMED_STRUCT('b', id, 'c', id))) 
AS s")
+        .coalesce(1)
+
+      df.write.parquet(path)
+
+      val userDefinedSchema = new StructType()
+        .add("s",
+          new StructType()
+            .add(
+              "a",
+              ArrayType(
+                new StructType()
+                  .add("b", LongType, nullable = true)
+                  .add("d", StringType, nullable = true),
+                containsNull = true),
+              nullable = true),
+          nullable = true)
+
+      checkAnswer(
+        sqlContext.read.schema(userDefinedSchema).parquet(path),
+        Row(Row(Seq(Row(0, null)))))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/391e6be0/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 9dcbc1a..28c59a4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -22,6 +22,7 @@ import scala.reflect.runtime.universe.TypeTag
 
 import org.apache.parquet.schema.MessageTypeParser
 
+import org.apache.spark.sql.SQLConf
 import org.apache.spark.sql.catalyst.ScalaReflection
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
@@ -941,4 +942,313 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |  optional fixed_len_byte_array(8) f1 (DECIMAL(18, 3));
       |}
     """.stripMargin)
+
+  private def testSchemaClipping(
+      testName: String,
+      parquetSchema: String,
+      catalystSchema: StructType,
+      expectedSchema: String): Unit = {
+    test(s"Clipping - $testName") {
+      val expected = MessageTypeParser.parseMessageType(expectedSchema)
+      val actual = CatalystReadSupport.clipParquetSchema(
+        MessageTypeParser.parseMessageType(parquetSchema), catalystSchema)
+
+      try {
+        expected.checkContains(actual)
+        actual.checkContains(expected)
+      } catch { case cause: Throwable =>
+        fail(
+          s"""Expected clipped schema:
+             |$expected
+             |Actual clipped schema:
+             |$actual
+           """.stripMargin,
+          cause)
+      }
+    }
+  }
+
+  testSchemaClipping(
+    "simple nested struct",
+
+    parquetSchema =
+      """message root {
+        |  required group f0 {
+        |    optional int32 f00;
+        |    optional int32 f01;
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = {
+      val f0Type = new StructType().add("f00", IntegerType, nullable = true)
+      new StructType()
+        .add("f0", f0Type, nullable = false)
+        .add("f1", IntegerType, nullable = true)
+    },
+
+    expectedSchema =
+      """message root {
+        |  required group f0 {
+        |    optional int32 f00;
+        |  }
+        |  optional int32 f1;
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
+    "parquet-protobuf style array",
+
+    parquetSchema =
+      """message root {
+        |  required group f0 {
+        |    repeated binary f00 (UTF8);
+        |    repeated group f01 {
+        |      optional int32 f010;
+        |      optional double f011;
+        |    }
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = {
+      val f11Type = new StructType().add("f011", DoubleType, nullable = true)
+      val f01Type = ArrayType(StringType, containsNull = false)
+      val f0Type = new StructType()
+        .add("f00", f01Type, nullable = false)
+        .add("f01", f11Type, nullable = false)
+      val f1Type = ArrayType(IntegerType, containsNull = true)
+      new StructType()
+        .add("f0", f0Type, nullable = false)
+        .add("f1", f1Type, nullable = true)
+    },
+
+    expectedSchema =
+      """message root {
+        |  required group f0 {
+        |    repeated binary f00 (UTF8);
+        |    repeated group f01 {
+        |      optional double f011;
+        |    }
+        |  }
+        |
+        |  optional group f1 (LIST) {
+        |    repeated group list {
+        |      optional int32 element;
+        |    }
+        |  }
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
+    "parquet-thrift style array",
+
+    parquetSchema =
+      """message root {
+        |  required group f0 {
+        |    optional group f00 {
+        |      repeated binary f00_tuple (UTF8);
+        |    }
+        |
+        |    optional group f01 (LIST) {
+        |      repeated group f01_tuple {
+        |        optional int32 f010;
+        |        optional double f011;
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = {
+      val f11ElementType = new StructType()
+        .add("f011", DoubleType, nullable = true)
+        .add("f012", LongType, nullable = true)
+
+      val f0Type = new StructType()
+        .add("f00", ArrayType(StringType, containsNull = false), nullable = 
false)
+        .add("f01", ArrayType(f11ElementType, containsNull = false), nullable 
= false)
+
+      new StructType().add("f0", f0Type, nullable = false)
+    },
+
+    expectedSchema =
+      """message root {
+        |  required group f0 {
+        |    optional group f00 {
+        |      repeated binary f00_tuple (UTF8);
+        |    }
+        |
+        |    optional group f01 (LIST) {
+        |      repeated group f01_tuple {
+        |        optional double f011;
+        |        optional int64 f012;
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
+    "parquet-avro style array",
+
+    parquetSchema =
+      """message root {
+        |  required group f0 {
+        |    optional group f00 {
+        |      repeated binary array (UTF8);
+        |    }
+        |
+        |    optional group f01 (LIST) {
+        |      repeated group array {
+        |        optional int32 f010;
+        |        optional double f011;
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = {
+      val f11ElementType = new StructType()
+        .add("f011", DoubleType, nullable = true)
+        .add("f012", LongType, nullable = true)
+
+      val f0Type = new StructType()
+        .add("f00", ArrayType(StringType, containsNull = false), nullable = 
false)
+        .add("f01", ArrayType(f11ElementType, containsNull = false), nullable 
= false)
+
+      new StructType().add("f0", f0Type, nullable = false)
+    },
+
+    expectedSchema =
+      """message root {
+        |  required group f0 {
+        |    optional group f00 {
+        |      repeated binary array (UTF8);
+        |    }
+        |
+        |    optional group f01 (LIST) {
+        |      repeated group array {
+        |        optional double f011;
+        |        optional int64 f012;
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
+    "parquet-hive style array",
+
+    parquetSchema =
+      """message root {
+        |  optional group f0 {
+        |    optional group f00 (LIST) {
+        |      repeated group bag {
+        |        optional binary array_element;
+        |      }
+        |    }
+        |
+        |    optional group f01 (LIST) {
+        |      repeated group bag {
+        |        optional group array_element {
+        |          optional int32 f010;
+        |          optional double f011;
+        |        }
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = {
+      val f01ElementType = new StructType()
+        .add("f011", DoubleType, nullable = true)
+        .add("f012", LongType, nullable = true)
+
+      val f0Type = new StructType()
+        .add("f00", ArrayType(StringType, containsNull = true), nullable = 
true)
+        .add("f01", ArrayType(f01ElementType, containsNull = true), nullable = 
true)
+
+      new StructType().add("f0", f0Type, nullable = true)
+    },
+
+    expectedSchema =
+      """message root {
+        |  optional group f0 {
+        |    optional group f00 (LIST) {
+        |      repeated group bag {
+        |        optional binary array_element;
+        |      }
+        |    }
+        |
+        |    optional group f01 (LIST) {
+        |      repeated group bag {
+        |        optional group array_element {
+        |          optional double f011;
+        |          optional int64 f012;
+        |        }
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
+    "2-level list of required struct",
+
+    parquetSchema =
+      s"""message root {
+         |  required group f0 {
+         |    required group f00 (LIST) {
+         |      repeated group element {
+         |        required int32 f000;
+         |        optional int64 f001;
+         |      }
+         |    }
+         |  }
+         |}
+       """.stripMargin,
+
+    catalystSchema = {
+      val f00ElementType =
+        new StructType()
+          .add("f001", LongType, nullable = true)
+          .add("f002", DoubleType, nullable = false)
+
+      val f00Type = ArrayType(f00ElementType, containsNull = false)
+      val f0Type = new StructType().add("f00", f00Type, nullable = false)
+
+      new StructType().add("f0", f0Type, nullable = false)
+    },
+
+    expectedSchema =
+      s"""message root {
+         |  required group f0 {
+         |    required group f00 (LIST) {
+         |      repeated group element {
+         |        optional int64 f001;
+         |        required double f002;
+         |      }
+         |    }
+         |  }
+         |}
+       """.stripMargin)
+
+  testSchemaClipping(
+    "empty requested schema",
+
+    parquetSchema =
+      """message root {
+        |  required group f0 {
+        |    required int32 f00;
+        |    required int64 f01;
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = new StructType(),
+
+    expectedSchema = "message root {}")
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to