Repository: spark
Updated Branches:
  refs/heads/branch-1.5 63c72b93e -> fca16c59d


[SPARK-10301] [SPARK-10428] [SQL] [BRANCH-1.5] Fixes schema merging for nested 
structs

We used to workaround SPARK-10301 with a quick fix in branch-1.5 (PR #8515), 
but it doesn't cover the case described in SPARK-10428. So this PR backports PR 
#8509, which had once been considered too big a change to be merged into 
branch-1.5 in the last minute, to fix both SPARK-10301 and SPARK-10428 for 
Spark 1.5. Also added more test cases for SPARK-10428.

This PR looks big, but the essential change is only ~200 loc. All other changes 
are for testing. Especially, PR #8454 is also backported here because the 
`ParquetInteroperabilitySuite` introduced in PR #8515 depends on it. This 
should be safe since #8454 only touches testing code.

Author: Cheng Lian <l...@databricks.com>

Closes #8583 from liancheng/spark-10301/for-1.5.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fca16c59
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fca16c59
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fca16c59

Branch: refs/heads/branch-1.5
Commit: fca16c59da75b08d18cb9d6da7942cd24b05518e
Parents: 63c72b9
Author: Cheng Lian <l...@databricks.com>
Authored: Tue Sep 8 20:30:24 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Tue Sep 8 20:30:24 2015 -0700

----------------------------------------------------------------------
 .../parquet/CatalystReadSupport.scala           | 243 ++++++---
 .../parquet/CatalystRowConverter.scala          |  63 +--
 .../parquet/CatalystSchemaConverter.scala       |  14 +-
 .../parquet/DirectParquetWriter.scala           |  81 +++
 .../parquet/ParquetInteroperabilitySuite.scala  | 111 ++++
 .../datasources/parquet/ParquetQuerySuite.scala | 322 +++++++++++
 .../parquet/ParquetSchemaSuite.scala            | 527 +++++++++++++++++++
 7 files changed, 1233 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/fca16c59/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 3f8353a..00f36ca 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -19,17 +19,18 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.util.{Map => JMap}
 
-import scala.collection.JavaConversions.{iterableAsScalaIterable, 
mapAsJavaMap, mapAsScalaMap}
+import scala.collection.JavaConverters._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.parquet.hadoop.api.ReadSupport.ReadContext
 import org.apache.parquet.hadoop.api.{InitContext, ReadSupport}
 import org.apache.parquet.io.api.RecordMaterializer
-import org.apache.parquet.schema.MessageType
+import org.apache.parquet.schema.Type.Repetition
+import org.apache.parquet.schema._
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types._
 
 private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] 
with Logging {
   // Called after `init()` when initializing Parquet record reader.
@@ -44,7 +45,7 @@ private[parquet] class CatalystReadSupport extends 
ReadSupport[InternalRow] with
     val parquetRequestedSchema = readContext.getRequestedSchema
 
     val catalystRequestedSchema =
-      Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { 
metadata =>
+      Option(readContext.getReadSupportMetadata).map(_.asScala).flatMap { 
metadata =>
         metadata
           // First tries to read requested schema, which may result from 
projections
           .get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
@@ -81,70 +82,10 @@ private[parquet] class CatalystReadSupport extends 
ReadSupport[InternalRow] with
     // `StructType` containing all requested columns.
     val maybeRequestedSchema = 
Option(conf.get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA))
 
-    // Below we construct a Parquet schema containing all requested columns.  
This schema tells
-    // Parquet which columns to read.
-    //
-    // If `maybeRequestedSchema` is defined, we assemble an equivalent Parquet 
schema.  Otherwise,
-    // we have to fallback to the full file schema which contains all columns 
in the file.
-    // Obviously this may waste IO bandwidth since it may read more columns 
than requested.
-    //
-    // Two things to note:
-    //
-    // 1. It's possible that some requested columns don't exist in the target 
Parquet file.  For
-    //    example, in the case of schema merging, the globally merged schema 
may contain extra
-    //    columns gathered from other Parquet files.  These columns will be 
simply filled with nulls
-    //    when actually reading the target Parquet file.
-    //
-    // 2. When `maybeRequestedSchema` is available, we can't simply convert 
the Catalyst schema to
-    //    Parquet schema using `CatalystSchemaConverter`, because the mapping 
is not unique due to
-    //    non-standard behaviors of some Parquet libraries/tools.  For 
example, a Parquet file
-    //    containing a single integer array field `f1` may have the following 
legacy 2-level
-    //    structure:
-    //
-    //      message root {
-    //        optional group f1 (LIST) {
-    //          required INT32 element;
-    //        }
-    //      }
-    //
-    //    while `CatalystSchemaConverter` may generate a standard 3-level 
structure:
-    //
-    //      message root {
-    //        optional group f1 (LIST) {
-    //          repeated group list {
-    //            required INT32 element;
-    //          }
-    //        }
-    //      }
-    //
-    //    Apparently, we can't use the 2nd schema to read the target Parquet 
file as they have
-    //    different physical structures.
     val parquetRequestedSchema =
       maybeRequestedSchema.fold(context.getFileSchema) { schemaString =>
-        val toParquet = new CatalystSchemaConverter(conf)
-        val fileSchema = context.getFileSchema.asGroupType()
-        val fileFieldNames = fileSchema.getFields.map(_.getName).toSet
-
-        StructType
-          // Deserializes the Catalyst schema of requested columns
-          .fromString(schemaString)
-          .map { field =>
-            if (fileFieldNames.contains(field.name)) {
-              // If the field exists in the target Parquet file, extracts the 
field type from the
-              // full file schema and makes a single-field Parquet schema
-              new MessageType("root", fileSchema.getType(field.name))
-            } else {
-              // Otherwise, just resorts to `CatalystSchemaConverter`
-              toParquet.convert(StructType(Array(field)))
-            }
-          }
-          // Merges all single-field Parquet schemas to form a complete schema 
for all requested
-          // columns.  Note that it's possible that no columns are requested 
at all (e.g., count
-          // some partition column of a partitioned Parquet table). That's why 
`fold` is used here
-          // and always fallback to an empty Parquet schema.
-          .fold(new MessageType("root")) {
-            _ union _
-          }
+        val catalystRequestedSchema = StructType.fromString(schemaString)
+        CatalystReadSupport.clipParquetSchema(context.getFileSchema, 
catalystRequestedSchema)
       }
 
     val metadata =
@@ -152,7 +93,7 @@ private[parquet] class CatalystReadSupport extends 
ReadSupport[InternalRow] with
         
maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
         maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
 
-    new ReadContext(parquetRequestedSchema, metadata)
+    new ReadContext(parquetRequestedSchema, metadata.asJava)
   }
 }
 
@@ -160,4 +101,172 @@ private[parquet] object CatalystReadSupport {
   val SPARK_ROW_REQUESTED_SCHEMA = 
"org.apache.spark.sql.parquet.row.requested_schema"
 
   val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
+
+  /**
+   * Tailors `parquetSchema` according to `catalystSchema` by removing column 
paths don't exist
+   * in `catalystSchema`, and adding those only exist in `catalystSchema`.
+   */
+  def clipParquetSchema(parquetSchema: MessageType, catalystSchema: 
StructType): MessageType = {
+    val clippedParquetFields = 
clipParquetGroupFields(parquetSchema.asGroupType(), catalystSchema)
+    Types.buildMessage().addFields(clippedParquetFields: _*).named("root")
+  }
+
+  private def clipParquetType(parquetType: Type, catalystType: DataType): Type 
= {
+    catalystType match {
+      case t: ArrayType if !isPrimitiveCatalystType(t.elementType) =>
+        // Only clips array types with nested type as element type.
+        clipParquetListType(parquetType.asGroupType(), t.elementType)
+
+      case t: MapType
+        if !isPrimitiveCatalystType(t.keyType) ||
+           !isPrimitiveCatalystType(t.valueType) =>
+        // Only clips map types with nested key type or value type
+        clipParquetMapType(parquetType.asGroupType(), t.keyType, t.valueType)
+
+      case t: StructType =>
+        clipParquetGroup(parquetType.asGroupType(), t)
+
+      case _ =>
+        // UDTs and primitive types are not clipped.  For UDTs, a clipped 
version might not be able
+        // to be mapped to desired user-space types.  So UDTs shouldn't 
participate schema merging.
+        parquetType
+    }
+  }
+
+  /**
+   * Whether a Catalyst [[DataType]] is primitive.  Primitive [[DataType]] is 
not equivalent to
+   * [[AtomicType]].  For example, [[CalendarIntervalType]] is primitive, but 
it's not an
+   * [[AtomicType]].
+   */
+  private def isPrimitiveCatalystType(dataType: DataType): Boolean = {
+    dataType match {
+      case _: ArrayType | _: MapType | _: StructType => false
+      case _ => true
+    }
+  }
+
+  /**
+   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[ArrayType]].  The element type
+   * of the [[ArrayType]] should also be a nested type, namely an 
[[ArrayType]], a [[MapType]], or a
+   * [[StructType]].
+   */
+  private def clipParquetListType(parquetList: GroupType, elementType: 
DataType): Type = {
+    // Precondition of this method, should only be called for lists with 
nested element types.
+    assert(!isPrimitiveCatalystType(elementType))
+
+    // Unannotated repeated group should be interpreted as required list of 
required element, so
+    // list element type is just the group itself.  Clip it.
+    if (parquetList.getOriginalType == null && 
parquetList.isRepetition(Repetition.REPEATED)) {
+      clipParquetType(parquetList, elementType)
+    } else {
+      assert(
+        parquetList.getOriginalType == OriginalType.LIST,
+        "Invalid Parquet schema. " +
+          "Original type of annotated Parquet lists must be LIST: " +
+          parquetList.toString)
+
+      assert(
+        parquetList.getFieldCount == 1 && 
parquetList.getType(0).isRepetition(Repetition.REPEATED),
+        "Invalid Parquet schema. " +
+          "LIST-annotated group should only have exactly one repeated field: " 
+
+          parquetList)
+
+      // Precondition of this method, should only be called for lists with 
nested element types.
+      assert(!parquetList.getType(0).isPrimitive)
+
+      val repeatedGroup = parquetList.getType(0).asGroupType()
+
+      // If the repeated field is a group with multiple fields, or the 
repeated field is a group
+      // with one field and is named either "array" or uses the LIST-annotated 
group's name with
+      // "_tuple" appended then the repeated type is the element type and 
elements are required.
+      // Build a new LIST-annotated group with clipped `repeatedGroup` as 
element type and the
+      // only field.
+      if (
+        repeatedGroup.getFieldCount > 1 ||
+        repeatedGroup.getName == "array" ||
+        repeatedGroup.getName == parquetList.getName + "_tuple"
+      ) {
+        Types
+          .buildGroup(parquetList.getRepetition)
+          .as(OriginalType.LIST)
+          .addField(clipParquetType(repeatedGroup, elementType))
+          .named(parquetList.getName)
+      } else {
+        // Otherwise, the repeated field's type is the element type with the 
repeated field's
+        // repetition.
+        Types
+          .buildGroup(parquetList.getRepetition)
+          .as(OriginalType.LIST)
+          .addField(
+            Types
+              .repeatedGroup()
+              .addField(clipParquetType(repeatedGroup.getType(0), elementType))
+              .named(repeatedGroup.getName))
+          .named(parquetList.getName)
+      }
+    }
+  }
+
+  /**
+   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[MapType]].  Either key type or
+   * value type of the [[MapType]] must be a nested type, namely an 
[[ArrayType]], a [[MapType]], or
+   * a [[StructType]].
+   */
+  private def clipParquetMapType(
+      parquetMap: GroupType, keyType: DataType, valueType: DataType): 
GroupType = {
+    // Precondition of this method, only handles maps with nested key types or 
value types.
+    assert(!isPrimitiveCatalystType(keyType) || 
!isPrimitiveCatalystType(valueType))
+
+    val repeatedGroup = parquetMap.getType(0).asGroupType()
+    val parquetKeyType = repeatedGroup.getType(0)
+    val parquetValueType = repeatedGroup.getType(1)
+
+    val clippedRepeatedGroup =
+      Types
+        .repeatedGroup()
+        .as(repeatedGroup.getOriginalType)
+        .addField(clipParquetType(parquetKeyType, keyType))
+        .addField(clipParquetType(parquetValueType, valueType))
+        .named(repeatedGroup.getName)
+
+    Types
+      .buildGroup(parquetMap.getRepetition)
+      .as(parquetMap.getOriginalType)
+      .addField(clippedRepeatedGroup)
+      .named(parquetMap.getName)
+  }
+
+  /**
+   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[StructType]].
+   *
+   * @return A clipped [[GroupType]], which has at least one field.
+   * @note Parquet doesn't allow creating empty [[GroupType]] instances except 
for empty
+   *       [[MessageType]].  Because it's legal to construct an empty 
requested schema for column
+   *       pruning.
+   */
+  private def clipParquetGroup(parquetRecord: GroupType, structType: 
StructType): GroupType = {
+    val clippedParquetFields = clipParquetGroupFields(parquetRecord, 
structType)
+    Types
+      .buildGroup(parquetRecord.getRepetition)
+      .as(parquetRecord.getOriginalType)
+      .addFields(clippedParquetFields: _*)
+      .named(parquetRecord.getName)
+  }
+
+  /**
+   * Clips a Parquet [[GroupType]] which corresponds to a Catalyst 
[[StructType]].
+   *
+   * @return A list of clipped [[GroupType]] fields, which can be empty.
+   */
+  private def clipParquetGroupFields(
+      parquetRecord: GroupType, structType: StructType): Seq[Type] = {
+    val parquetFieldMap = parquetRecord.getFields.asScala.map(f => f.getName 
-> f).toMap
+    val toParquet = new CatalystSchemaConverter(followParquetFormatSpec = true)
+    structType.map { f =>
+      parquetFieldMap
+        .get(f.name)
+        .map(clipParquetType(_, f.dataType))
+        .getOrElse(toParquet.convertField(f))
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fca16c59/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index d99bfe4..2ff2fda 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet
 import java.math.{BigDecimal, BigInteger}
 import java.nio.ByteOrder
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.parquet.column.Dictionary
@@ -113,31 +113,6 @@ private[parquet] class CatalystPrimitiveConverter(val 
updater: ParentContainerUp
  * When used as a root converter, [[NoopUpdater]] should be used since root 
converters don't have
  * any "parent" container.
  *
- * @note Constructor argument [[parquetType]] refers to requested fields of 
the actual schema of the
- *       Parquet file being read, while constructor argument [[catalystType]] 
refers to requested
- *       fields of the global schema.  The key difference is that, in case of 
schema merging,
- *       [[parquetType]] can be a subset of [[catalystType]].  For example, 
it's possible to have
- *       the following [[catalystType]]:
- *       {{{
- *         new StructType()
- *           .add("f1", IntegerType, nullable = false)
- *           .add("f2", StringType, nullable = true)
- *           .add("f3", new StructType()
- *             .add("f31", DoubleType, nullable = false)
- *             .add("f32", IntegerType, nullable = true)
- *             .add("f33", StringType, nullable = true), nullable = false)
- *       }}}
- *       and the following [[parquetType]] (`f2` and `f32` are missing):
- *       {{{
- *         message root {
- *           required int32 f1;
- *           required group f3 {
- *             required double f31;
- *             optional binary f33 (utf8);
- *           }
- *         }
- *       }}}
- *
  * @param parquetType Parquet schema of Parquet records
  * @param catalystType Spark SQL schema that corresponds to the Parquet record 
type
  * @param updater An updater which propagates converted field values to the 
parent container
@@ -148,6 +123,16 @@ private[parquet] class CatalystRowConverter(
     updater: ParentContainerUpdater)
   extends CatalystGroupConverter(updater) with Logging {
 
+  assert(
+    parquetType.getFieldCount == catalystType.length,
+    s"""Field counts of the Parquet schema and the Catalyst schema don't match:
+       |
+       |Parquet schema:
+       |$parquetType
+       |Catalyst schema:
+       |${catalystType.prettyJson}
+     """.stripMargin)
+
   logDebug(
     s"""Building row converter for the following schema:
        |
@@ -179,31 +164,7 @@ private[parquet] class CatalystRowConverter(
 
   // Converters for each field.
   private val fieldConverters: Array[Converter with HasParentContainerUpdater] 
= {
-    // In case of schema merging, `parquetType` can be a subset of 
`catalystType`.  We need to pad
-    // those missing fields and create converters for them, although values of 
these fields are
-    // always null.
-    val paddedParquetFields = {
-      val parquetFields = parquetType.getFields
-      val parquetFieldNames = parquetFields.map(_.getName).toSet
-      val missingFields = catalystType.filterNot(f => 
parquetFieldNames.contains(f.name))
-
-      // We don't need to worry about feature flag arguments like 
`assumeBinaryIsString` when
-      // creating the schema converter here, since values of missing fields 
are always null.
-      val toParquet = new CatalystSchemaConverter()
-
-      (parquetFields ++ missingFields.map(toParquet.convertField)).sortBy { f 
=>
-        catalystType.indexWhere(_.name == f.getName)
-      }
-    }
-
-    if (paddedParquetFields.length != catalystType.length) {
-      throw new UnsupportedOperationException(
-        "A Parquet file's schema has different number of fields with the table 
schema. " +
-          "Please enable schema merging by setting \"mergeSchema\" to true 
when load " +
-          "a Parquet dataset or set spark.sql.parquet.mergeSchema to true in 
SQLConf.")
-    }
-
-    paddedParquetFields.zip(catalystType).zipWithIndex.map {
+    parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {
       case ((parquetFieldType, catalystField), ordinal) =>
         // Converted field value should be set to the `ordinal`-th cell of 
`currentRow`
         newConverter(parquetFieldType, catalystField.dataType, new 
RowUpdater(currentRow, ordinal))

http://git-wip-us.apache.org/repos/asf/spark/blob/fca16c59/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index 71161f8..9c53913 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -55,16 +55,10 @@ import org.apache.spark.sql.{AnalysisException, SQLConf}
  *        to old style non-standard behaviors.
  */
 private[parquet] class CatalystSchemaConverter(
-    private val assumeBinaryIsString: Boolean,
-    private val assumeInt96IsTimestamp: Boolean,
-    private val followParquetFormatSpec: Boolean) {
-
-  // Only used when constructing converter for converting Spark SQL schema to 
Parquet schema, in
-  // which case `assumeInt96IsTimestamp` and `assumeBinaryIsString` are 
irrelevant.
-  def this() = this(
-    assumeBinaryIsString = SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
-    assumeInt96IsTimestamp = 
SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
-    followParquetFormatSpec = 
SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get)
+    assumeBinaryIsString: Boolean = 
SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get,
+    assumeInt96IsTimestamp: Boolean = 
SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get,
+    followParquetFormatSpec: Boolean = 
SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get
+) {
 
   def this(conf: SQLConf) = this(
     assumeBinaryIsString = conf.isParquetBinaryAsString,

http://git-wip-us.apache.org/repos/asf/spark/blob/fca16c59/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetWriter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetWriter.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetWriter.scala
new file mode 100644
index 0000000..d05c609
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/DirectParquetWriter.scala
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.hadoop.conf
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.hadoop.ParquetWriter
+import org.apache.parquet.hadoop.api.WriteSupport
+import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
+import org.apache.parquet.io.api.RecordConsumer
+import org.apache.parquet.schema.{MessageType, MessageTypeParser}
+
+private[sql] object DirectParquetWriter {
+  type RecordBuilder = RecordConsumer => Unit
+
+  /**
+   * A testing Parquet [[WriteSupport]] implementation used to write manually 
constructed Parquet
+   * records with arbitrary structures.
+   */
+  private class DirectWriteSupport(schema: MessageType, metadata: Map[String, 
String])
+    extends WriteSupport[RecordBuilder] {
+
+    private var recordConsumer: RecordConsumer = _
+
+    override def init(configuration: conf.Configuration): WriteContext = {
+      new WriteContext(schema, metadata.asJava)
+    }
+
+    override def write(buildRecord: RecordBuilder): Unit = {
+      recordConsumer.startMessage()
+      buildRecord(recordConsumer)
+      recordConsumer.endMessage()
+    }
+
+    override def prepareForWrite(recordConsumer: RecordConsumer): Unit = {
+      this.recordConsumer = recordConsumer
+    }
+  }
+
+  def writeDirect
+      (path: String, schema: String, metadata: Map[String, String] = Map.empty)
+      (f: ParquetWriter[RecordBuilder] => Unit): Unit = {
+    val messageType = MessageTypeParser.parseMessageType(schema)
+    val writeSupport = new DirectWriteSupport(messageType, metadata)
+    val parquetWriter = new ParquetWriter[RecordBuilder](new Path(path), 
writeSupport)
+    try f(parquetWriter) finally parquetWriter.close()
+  }
+
+  def message(writer: ParquetWriter[RecordBuilder])(builder: RecordBuilder): 
Unit = {
+    writer.write(builder)
+  }
+
+  def group(consumer: RecordConsumer)(f: => Unit): Unit = {
+    consumer.startGroup()
+    f
+    consumer.endGroup()
+  }
+
+  def field(consumer: RecordConsumer, name: String, index: Int = 0)(f: => 
Unit): Unit = {
+    consumer.startField(name, index)
+    f
+    consumer.endField(name, index)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fca16c59/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
new file mode 100644
index 0000000..d17d930
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.io.File
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.SharedSQLContext
+
+class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with 
SharedSQLContext {
+  test("parquet files with different physical schemas but share the same 
logical schema") {
+    // This test case writes two Parquet files, both representing the 
following Catalyst schema
+    //
+    //   StructType(
+    //     StructField(
+    //       "f",
+    //       ArrayType(IntegerType, containsNull = false),
+    //       nullable = false))
+    //
+    // The first Parquet file comes with parquet-avro style 2-level 
LIST-annotated repeated group,
+    // while the other one comes with parquet-protobuf style 1-level 
unannotated repeated primitive
+    // field.
+    withTempDir { dir =>
+      import DirectParquetWriter._
+
+      val avroStylePath = new File(dir, "avro-style").getCanonicalPath
+      val protobufStylePath = new File(dir, "protobuf-style").getCanonicalPath
+
+      val avroStyleSchema =
+        """message avro_style {
+          |  required group f (LIST) {
+          |    repeated int32 array;
+          |  }
+          |}
+        """.stripMargin
+
+      writeDirect(avroStylePath, avroStyleSchema) { writer =>
+        message(writer) { rc =>
+          field(rc, "f") {
+            group(rc) {
+              field(rc, "array") {
+                rc.addInteger(0)
+                rc.addInteger(1)
+              }
+            }
+          }
+        }
+
+        message(writer) { rc =>
+          field(rc, "f") {
+            group(rc) {
+              field(rc, "array") {
+                rc.addInteger(2)
+                rc.addInteger(3)
+              }
+            }
+          }
+        }
+      }
+
+      logParquetSchema(avroStylePath)
+
+      val protobufStyleSchema =
+        """message protobuf_style {
+          |  repeated int32 f;
+          |}
+        """.stripMargin
+
+      writeDirect(protobufStylePath, protobufStyleSchema) { writer =>
+        message(writer) { rc =>
+          field(rc, "f") {
+            rc.addInteger(4)
+            rc.addInteger(5)
+          }
+        }
+
+        message(writer) { rc =>
+          field(rc, "f") {
+            rc.addInteger(6)
+            rc.addInteger(7)
+          }
+        }
+      }
+
+      logParquetSchema(protobufStylePath)
+
+      checkAnswer(
+        sqlContext.read.parquet(dir.getCanonicalPath),
+        Seq(
+          Row(Seq(0, 1)),
+          Row(Seq(2, 3)),
+          Row(Seq(4, 5)),
+          Row(Seq(6, 7))))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/fca16c59/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index b7b70c2..2cfa42d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -22,6 +22,9 @@ import java.io.File
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.SpecificMutableRow
+import 
org.apache.spark.sql.execution.datasources.parquet.TestingUDT.{NestedStructUDT, 
NestedStruct}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -229,4 +232,323 @@ class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedSQLContext
       }
     }
   }
+
+  test("SPARK-10301 requested schema clipping - same schema") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val df = sqlContext.range(1).selectExpr("NAMED_STRUCT('a', id, 'b', id + 
1) AS s").coalesce(1)
+      df.write.parquet(path)
+
+      val userDefinedSchema =
+        new StructType()
+          .add(
+            "s",
+            new StructType()
+              .add("a", LongType, nullable = true)
+              .add("b", LongType, nullable = true),
+            nullable = true)
+
+      checkAnswer(
+        sqlContext.read.schema(userDefinedSchema).parquet(path),
+        Row(Row(0L, 1L)))
+    }
+  }
+
+  // This test case is ignored because of parquet-mr bug PARQUET-370
+  ignore("SPARK-10301 requested schema clipping - schemas with disjoint sets 
of fields") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val df = sqlContext.range(1).selectExpr("NAMED_STRUCT('a', id, 'b', id + 
1) AS s").coalesce(1)
+      df.write.parquet(path)
+
+      val userDefinedSchema =
+        new StructType()
+          .add(
+            "s",
+            new StructType()
+              .add("c", LongType, nullable = true)
+              .add("d", LongType, nullable = true),
+            nullable = true)
+
+      checkAnswer(
+        sqlContext.read.schema(userDefinedSchema).parquet(path),
+        Row(Row(null, null)))
+    }
+  }
+
+  test("SPARK-10301 requested schema clipping - requested schema contains 
physical schema") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val df = sqlContext.range(1).selectExpr("NAMED_STRUCT('a', id, 'b', id + 
1) AS s").coalesce(1)
+      df.write.parquet(path)
+
+      val userDefinedSchema =
+        new StructType()
+          .add(
+            "s",
+            new StructType()
+              .add("a", LongType, nullable = true)
+              .add("b", LongType, nullable = true)
+              .add("c", LongType, nullable = true)
+              .add("d", LongType, nullable = true),
+            nullable = true)
+
+      checkAnswer(
+        sqlContext.read.schema(userDefinedSchema).parquet(path),
+        Row(Row(0L, 1L, null, null)))
+    }
+
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val df = sqlContext.range(1).selectExpr("NAMED_STRUCT('a', id, 'd', id + 
3) AS s").coalesce(1)
+      df.write.parquet(path)
+
+      val userDefinedSchema =
+        new StructType()
+          .add(
+            "s",
+            new StructType()
+              .add("a", LongType, nullable = true)
+              .add("b", LongType, nullable = true)
+              .add("c", LongType, nullable = true)
+              .add("d", LongType, nullable = true),
+            nullable = true)
+
+      checkAnswer(
+        sqlContext.read.schema(userDefinedSchema).parquet(path),
+        Row(Row(0L, null, null, 3L)))
+    }
+  }
+
+  test("SPARK-10301 requested schema clipping - physical schema contains 
requested schema") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val df = sqlContext
+        .range(1)
+        .selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2, 'd', id + 
3) AS s")
+        .coalesce(1)
+
+      df.write.parquet(path)
+
+      val userDefinedSchema =
+        new StructType()
+          .add(
+            "s",
+            new StructType()
+              .add("a", LongType, nullable = true)
+              .add("b", LongType, nullable = true),
+            nullable = true)
+
+      checkAnswer(
+        sqlContext.read.schema(userDefinedSchema).parquet(path),
+        Row(Row(0L, 1L)))
+    }
+
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val df = sqlContext
+        .range(1)
+        .selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2, 'd', id + 
3) AS s")
+        .coalesce(1)
+
+      df.write.parquet(path)
+
+      val userDefinedSchema =
+        new StructType()
+          .add(
+            "s",
+            new StructType()
+              .add("a", LongType, nullable = true)
+              .add("d", LongType, nullable = true),
+            nullable = true)
+
+      checkAnswer(
+        sqlContext.read.schema(userDefinedSchema).parquet(path),
+        Row(Row(0L, 3L)))
+    }
+  }
+
+  test("SPARK-10301 requested schema clipping - schemas overlap but don't 
contain each other") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val df = sqlContext
+        .range(1)
+        .selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2) AS s")
+        .coalesce(1)
+
+      df.write.parquet(path)
+
+      val userDefinedSchema =
+        new StructType()
+          .add(
+            "s",
+            new StructType()
+              .add("b", LongType, nullable = true)
+              .add("c", LongType, nullable = true)
+              .add("d", LongType, nullable = true),
+            nullable = true)
+
+      checkAnswer(
+        sqlContext.read.schema(userDefinedSchema).parquet(path),
+        Row(Row(1L, 2L, null)))
+    }
+  }
+
+  test("SPARK-10301 requested schema clipping - deeply nested struct") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      val df = sqlContext
+        .range(1)
+        .selectExpr("NAMED_STRUCT('a', ARRAY(NAMED_STRUCT('b', id, 'c', id))) 
AS s")
+        .coalesce(1)
+
+      df.write.parquet(path)
+
+      val userDefinedSchema = new StructType()
+        .add("s",
+          new StructType()
+            .add(
+              "a",
+              ArrayType(
+                new StructType()
+                  .add("b", LongType, nullable = true)
+                  .add("d", StringType, nullable = true),
+                containsNull = true),
+              nullable = true),
+          nullable = true)
+
+      checkAnswer(
+        sqlContext.read.schema(userDefinedSchema).parquet(path),
+        Row(Row(Seq(Row(0, null)))))
+    }
+  }
+
+  test("SPARK-10301 requested schema clipping - out of order") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      val df1 = sqlContext
+        .range(1)
+        .selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2) AS s")
+        .coalesce(1)
+
+      val df2 = sqlContext
+        .range(1, 2)
+        .selectExpr("NAMED_STRUCT('c', id + 2, 'b', id + 1, 'd', id + 3) AS s")
+        .coalesce(1)
+
+      df1.write.parquet(path)
+      df2.write.mode(SaveMode.Append).parquet(path)
+
+      val userDefinedSchema = new StructType()
+        .add("s",
+          new StructType()
+            .add("a", LongType, nullable = true)
+            .add("b", LongType, nullable = true)
+            .add("d", LongType, nullable = true),
+          nullable = true)
+
+      checkAnswer(
+        sqlContext.read.schema(userDefinedSchema).parquet(path),
+        Seq(
+          Row(Row(0, 1, null)),
+          Row(Row(null, 2, 4))))
+    }
+  }
+
+  test("SPARK-10301 requested schema clipping - schema merging") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      val df1 = sqlContext
+        .range(1)
+        .selectExpr("NAMED_STRUCT('a', id, 'c', id + 2) AS s")
+        .coalesce(1)
+
+      val df2 = sqlContext
+        .range(1, 2)
+        .selectExpr("NAMED_STRUCT('a', id, 'b', id + 1, 'c', id + 2) AS s")
+        .coalesce(1)
+
+      df1.write.mode(SaveMode.Append).parquet(path)
+      df2.write.mode(SaveMode.Append).parquet(path)
+
+      checkAnswer(
+        sqlContext
+          .read
+          .option("mergeSchema", "true")
+          .parquet(path)
+          .selectExpr("s.a", "s.b", "s.c"),
+        Seq(
+          Row(0, null, 2),
+          Row(1, 2, 3)))
+    }
+  }
+
+  test("SPARK-10301 requested schema clipping - UDT") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      val df = sqlContext
+        .range(1)
+        .selectExpr(
+          """NAMED_STRUCT(
+            |  'f0', CAST(id AS STRING),
+            |  'f1', NAMED_STRUCT(
+            |    'a', CAST(id + 1 AS INT),
+            |    'b', CAST(id + 2 AS LONG),
+            |    'c', CAST(id + 3.5 AS DOUBLE)
+            |  )
+            |) AS s
+          """.stripMargin)
+        .coalesce(1)
+
+      df.write.mode(SaveMode.Append).parquet(path)
+
+      val userDefinedSchema =
+        new StructType()
+          .add(
+            "s",
+            new StructType()
+              .add("f1", new NestedStructUDT, nullable = true),
+            nullable = true)
+
+      checkAnswer(
+        sqlContext.read.schema(userDefinedSchema).parquet(path),
+        Row(Row(NestedStruct(1, 2L, 3.5D))))
+    }
+  }
+}
+
+object TestingUDT {
+  @SQLUserDefinedType(udt = classOf[NestedStructUDT])
+  case class NestedStruct(a: Integer, b: Long, c: Double)
+
+  class NestedStructUDT extends UserDefinedType[NestedStruct] {
+    override def sqlType: DataType =
+      new StructType()
+        .add("a", IntegerType, nullable = true)
+        .add("b", LongType, nullable = false)
+        .add("c", DoubleType, nullable = false)
+
+    override def serialize(obj: Any): Any = {
+      val row = new 
SpecificMutableRow(sqlType.asInstanceOf[StructType].map(_.dataType))
+      obj match {
+        case n: NestedStruct =>
+          row.setInt(0, n.a)
+          row.setLong(1, n.b)
+          row.setDouble(2, n.c)
+      }
+    }
+
+    override def userClass: Class[NestedStruct] = classOf[NestedStruct]
+
+    override def deserialize(datum: Any): NestedStruct = {
+      datum match {
+        case row: InternalRow =>
+          NestedStruct(row.getInt(0), row.getLong(1), row.getDouble(2))
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/fca16c59/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index b344616..eb7192d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -941,4 +941,531 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |  optional fixed_len_byte_array(8) f1 (DECIMAL(18, 3));
       |}
     """.stripMargin)
+
+  private def testSchemaClipping(
+      testName: String,
+      parquetSchema: String,
+      catalystSchema: StructType,
+      expectedSchema: String): Unit = {
+    test(s"Clipping - $testName") {
+      val expected = MessageTypeParser.parseMessageType(expectedSchema)
+      val actual = CatalystReadSupport.clipParquetSchema(
+        MessageTypeParser.parseMessageType(parquetSchema), catalystSchema)
+
+      try {
+        expected.checkContains(actual)
+        actual.checkContains(expected)
+      } catch { case cause: Throwable =>
+        fail(
+          s"""Expected clipped schema:
+             |$expected
+             |Actual clipped schema:
+             |$actual
+          """.stripMargin,
+          cause)
+      }
+    }
+  }
+
+  testSchemaClipping(
+    "simple nested struct",
+
+    parquetSchema =
+      """message root {
+        |  required group f0 {
+        |    optional int32 f00;
+        |    optional int32 f01;
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = {
+      val f0Type = new StructType().add("f00", IntegerType, nullable = true)
+      new StructType()
+        .add("f0", f0Type, nullable = false)
+        .add("f1", IntegerType, nullable = true)
+    },
+
+    expectedSchema =
+      """message root {
+        |  required group f0 {
+        |    optional int32 f00;
+        |  }
+        |  optional int32 f1;
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
+    "parquet-protobuf style array",
+
+    parquetSchema =
+      """message root {
+        |  required group f0 {
+        |    repeated binary f00 (UTF8);
+        |    repeated group f01 {
+        |      optional int32 f010;
+        |      optional double f011;
+        |    }
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = {
+      val f00Type = ArrayType(StringType, containsNull = false)
+      val f01Type = ArrayType(
+        new StructType()
+          .add("f011", DoubleType, nullable = true),
+        containsNull = false)
+
+      val f0Type = new StructType()
+        .add("f00", f00Type, nullable = false)
+        .add("f01", f01Type, nullable = false)
+      val f1Type = ArrayType(IntegerType, containsNull = true)
+
+      new StructType()
+        .add("f0", f0Type, nullable = false)
+        .add("f1", f1Type, nullable = true)
+    },
+
+    expectedSchema =
+      """message root {
+        |  required group f0 {
+        |    repeated binary f00 (UTF8);
+        |    repeated group f01 {
+        |      optional double f011;
+        |    }
+        |  }
+        |
+        |  optional group f1 (LIST) {
+        |    repeated group list {
+        |      optional int32 element;
+        |    }
+        |  }
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
+    "parquet-thrift style array",
+
+    parquetSchema =
+      """message root {
+        |  required group f0 {
+        |    optional group f00 (LIST) {
+        |      repeated binary f00_tuple (UTF8);
+        |    }
+        |
+        |    optional group f01 (LIST) {
+        |      repeated group f01_tuple {
+        |        optional int32 f010;
+        |        optional double f011;
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = {
+      val f01ElementType = new StructType()
+        .add("f011", DoubleType, nullable = true)
+        .add("f012", LongType, nullable = true)
+
+      val f0Type = new StructType()
+        .add("f00", ArrayType(StringType, containsNull = false), nullable = 
true)
+        .add("f01", ArrayType(f01ElementType, containsNull = false), nullable 
= true)
+
+      new StructType().add("f0", f0Type, nullable = false)
+    },
+
+    expectedSchema =
+      """message root {
+        |  required group f0 {
+        |    optional group f00 (LIST) {
+        |      repeated binary f00_tuple (UTF8);
+        |    }
+        |
+        |    optional group f01 (LIST) {
+        |      repeated group f01_tuple {
+        |        optional double f011;
+        |        optional int64 f012;
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
+    "parquet-avro style array",
+
+    parquetSchema =
+      """message root {
+        |  required group f0 {
+        |    optional group f00 (LIST) {
+        |      repeated binary array (UTF8);
+        |    }
+        |
+        |    optional group f01 (LIST) {
+        |      repeated group array {
+        |        optional int32 f010;
+        |        optional double f011;
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = {
+      val f01ElementType = new StructType()
+        .add("f011", DoubleType, nullable = true)
+        .add("f012", LongType, nullable = true)
+
+      val f0Type = new StructType()
+        .add("f00", ArrayType(StringType, containsNull = false), nullable = 
true)
+        .add("f01", ArrayType(f01ElementType, containsNull = false), nullable 
= true)
+
+      new StructType().add("f0", f0Type, nullable = false)
+    },
+
+    expectedSchema =
+      """message root {
+        |  required group f0 {
+        |    optional group f00 (LIST) {
+        |      repeated binary array (UTF8);
+        |    }
+        |
+        |    optional group f01 (LIST) {
+        |      repeated group array {
+        |        optional double f011;
+        |        optional int64 f012;
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
+    "parquet-hive style array",
+
+    parquetSchema =
+      """message root {
+        |  optional group f0 {
+        |    optional group f00 (LIST) {
+        |      repeated group bag {
+        |        optional binary array_element;
+        |      }
+        |    }
+        |
+        |    optional group f01 (LIST) {
+        |      repeated group bag {
+        |        optional group array_element {
+        |          optional int32 f010;
+        |          optional double f011;
+        |        }
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = {
+      val f01ElementType = new StructType()
+        .add("f011", DoubleType, nullable = true)
+        .add("f012", LongType, nullable = true)
+
+      val f0Type = new StructType()
+        .add("f00", ArrayType(StringType, containsNull = true), nullable = 
true)
+        .add("f01", ArrayType(f01ElementType, containsNull = true), nullable = 
true)
+
+      new StructType().add("f0", f0Type, nullable = true)
+    },
+
+    expectedSchema =
+      """message root {
+        |  optional group f0 {
+        |    optional group f00 (LIST) {
+        |      repeated group bag {
+        |        optional binary array_element;
+        |      }
+        |    }
+        |
+        |    optional group f01 (LIST) {
+        |      repeated group bag {
+        |        optional group array_element {
+        |          optional double f011;
+        |          optional int64 f012;
+        |        }
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
+    "2-level list of required struct",
+
+    parquetSchema =
+      s"""message root {
+         |  required group f0 {
+         |    required group f00 (LIST) {
+         |      repeated group element {
+         |        required int32 f000;
+         |        optional int64 f001;
+         |      }
+         |    }
+         |  }
+         |}
+       """.stripMargin,
+
+    catalystSchema = {
+      val f00ElementType =
+        new StructType()
+          .add("f001", LongType, nullable = true)
+          .add("f002", DoubleType, nullable = false)
+
+      val f00Type = ArrayType(f00ElementType, containsNull = false)
+      val f0Type = new StructType().add("f00", f00Type, nullable = false)
+
+      new StructType().add("f0", f0Type, nullable = false)
+    },
+
+    expectedSchema =
+      s"""message root {
+         |  required group f0 {
+         |    required group f00 (LIST) {
+         |      repeated group element {
+         |        optional int64 f001;
+         |        required double f002;
+         |      }
+         |    }
+         |  }
+         |}
+       """.stripMargin)
+
+  testSchemaClipping(
+    "standard array",
+
+    parquetSchema =
+      """message root {
+        |  required group f0 {
+        |    optional group f00 (LIST) {
+        |      repeated group list {
+        |        required binary element (UTF8);
+        |      }
+        |    }
+        |
+        |    optional group f01 (LIST) {
+        |      repeated group list {
+        |        required group element {
+        |          optional int32 f010;
+        |          optional double f011;
+        |        }
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = {
+      val f01ElementType = new StructType()
+        .add("f011", DoubleType, nullable = true)
+        .add("f012", LongType, nullable = true)
+
+      val f0Type = new StructType()
+        .add("f00", ArrayType(StringType, containsNull = false), nullable = 
true)
+        .add("f01", ArrayType(f01ElementType, containsNull = false), nullable 
= true)
+
+      new StructType().add("f0", f0Type, nullable = false)
+    },
+
+    expectedSchema =
+      """message root {
+        |  required group f0 {
+        |    optional group f00 (LIST) {
+        |      repeated group list {
+        |        required binary element (UTF8);
+        |      }
+        |    }
+        |
+        |    optional group f01 (LIST) {
+        |      repeated group list {
+        |        required group element {
+        |          optional double f011;
+        |          optional int64 f012;
+        |        }
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
+    "empty requested schema",
+
+    parquetSchema =
+      """message root {
+        |  required group f0 {
+        |    required int32 f00;
+        |    required int64 f01;
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = new StructType(),
+
+    expectedSchema = "message root {}")
+
+  testSchemaClipping(
+    "disjoint field sets",
+
+    parquetSchema =
+      """message root {
+        |  required group f0 {
+        |    required int32 f00;
+        |    required int64 f01;
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema =
+      new StructType()
+        .add(
+          "f0",
+          new StructType()
+            .add("f02", FloatType, nullable = true)
+            .add("f03", DoubleType, nullable = true),
+          nullable = true),
+
+    expectedSchema =
+      """message root {
+        |  required group f0 {
+        |    optional float f02;
+        |    optional double f03;
+        |  }
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
+    "parquet-avro style map",
+
+    parquetSchema =
+      """message root {
+        |  required group f0 (MAP) {
+        |    repeated group map (MAP_KEY_VALUE) {
+        |      required int32 key;
+        |      required group value {
+        |        required int32 value_f0;
+        |        required int64 value_f1;
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = {
+      val valueType =
+        new StructType()
+          .add("value_f1", LongType, nullable = false)
+          .add("value_f2", DoubleType, nullable = false)
+
+      val f0Type = MapType(IntegerType, valueType, valueContainsNull = false)
+
+      new StructType().add("f0", f0Type, nullable = false)
+    },
+
+    expectedSchema =
+      """message root {
+        |  required group f0 (MAP) {
+        |    repeated group map (MAP_KEY_VALUE) {
+        |      required int32 key;
+        |      required group value {
+        |        required int64 value_f1;
+        |        required double value_f2;
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
+    "standard map",
+
+    parquetSchema =
+      """message root {
+        |  required group f0 (MAP) {
+        |    repeated group key_value {
+        |      required int32 key;
+        |      required group value {
+        |        required int32 value_f0;
+        |        required int64 value_f1;
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = {
+      val valueType =
+        new StructType()
+          .add("value_f1", LongType, nullable = false)
+          .add("value_f2", DoubleType, nullable = false)
+
+      val f0Type = MapType(IntegerType, valueType, valueContainsNull = false)
+
+      new StructType().add("f0", f0Type, nullable = false)
+    },
+
+    expectedSchema =
+      """message root {
+        |  required group f0 (MAP) {
+        |    repeated group key_value {
+        |      required int32 key;
+        |      required group value {
+        |        required int64 value_f1;
+        |        required double value_f2;
+        |      }
+        |    }
+        |  }
+        |}
+      """.stripMargin)
+
+  testSchemaClipping(
+    "standard map with complex key",
+
+    parquetSchema =
+      """message root {
+        |  required group f0 (MAP) {
+        |    repeated group key_value {
+        |      required group key {
+        |        required int32 value_f0;
+        |        required int64 value_f1;
+        |      }
+        |      required int32 value;
+        |    }
+        |  }
+        |}
+      """.stripMargin,
+
+    catalystSchema = {
+      val keyType =
+        new StructType()
+          .add("value_f1", LongType, nullable = false)
+          .add("value_f2", DoubleType, nullable = false)
+
+      val f0Type = MapType(keyType, IntegerType, valueContainsNull = false)
+
+      new StructType().add("f0", f0Type, nullable = false)
+    },
+
+    expectedSchema =
+      """message root {
+        |  required group f0 (MAP) {
+        |    repeated group key_value {
+        |      required group key {
+        |        required int64 value_f1;
+        |        required double value_f2;
+        |      }
+        |      required int32 value;
+        |    }
+        |  }
+        |}
+      """.stripMargin)
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to