Repository: spark
Updated Branches:
  refs/heads/branch-1.5 94692bb14 -> 01efa4f27


[SPARK-9340] [SQL] Fixes converting unannotated Parquet lists

This PR is inspired by #8063 authored by dguy. Especially, testing Parquet 
files added here are all taken from that PR.

**Committer who merges this PR should attribute it to "Damian Guy 
<damian.guygmail.com>".**

----

SPARK-6776 and SPARK-6777 followed `parquet-avro` to implement 
backwards-compatibility rules defined in `parquet-format` spec. However, both 
Spark SQL and `parquet-avro` neglected the following statement in 
`parquet-format`:

> This does not affect repeated fields that are not annotated: A repeated field 
> that is neither contained by a `LIST`- or `MAP`-annotated group nor annotated 
> by `LIST` or `MAP` should be interpreted as a required list of required 
> elements where the element type is the type of the field.

One of the consequences is that, Parquet files generated by `parquet-protobuf` 
containing unannotated repeated fields are not correctly converted to Catalyst 
arrays.

This PR fixes this issue by

1. Handling unannotated repeated fields in `CatalystSchemaConverter`.
2. Converting this kind of special repeated fields to Catalyst arrays in 
`CatalystRowConverter`.

   Two special converters, `RepeatedPrimitiveConverter` and 
`RepeatedGroupConverter`, are added. They delegate actual conversion work to a 
child `elementConverter` and accumulates elements in an `ArrayBuffer`.

   Two extra methods, `start()` and `end()`, are added to 
`ParentContainerUpdater`. So that they can be used to initialize new 
`ArrayBuffer`s for unannotated repeated fields, and propagate converted array 
values to upstream.

Author: Cheng Lian <l...@databricks.com>

Closes #8070 from liancheng/spark-9340/unannotated-parquet-list and squashes 
the following commits:

ace6df7 [Cheng Lian] Moves ParquetProtobufCompatibilitySuite
f1c7bfd [Cheng Lian] Updates .rat-excludes
420ad2b [Cheng Lian] Fixes converting unannotated Parquet lists

(cherry picked from commit 071bbad5db1096a548c886762b611a8484a52753)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01efa4f2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01efa4f2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01efa4f2

Branch: refs/heads/branch-1.5
Commit: 01efa4f27db1eefba9cb0fe2dec790556f3280de
Parents: 94692bb
Author: Damian Guy <damian....@gmail.com>
Authored: Tue Aug 11 12:46:33 2015 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Tue Aug 11 12:46:54 2015 +0800

----------------------------------------------------------------------
 .rat-excludes                                   |   1 +
 .../parquet/CatalystRowConverter.scala          | 151 +++++++++++++++----
 .../parquet/CatalystSchemaConverter.scala       |   7 +-
 .../test/resources/nested-array-struct.parquet  | Bin 0 -> 775 bytes
 .../src/test/resources/old-repeated-int.parquet | Bin 0 -> 389 bytes
 .../test/resources/old-repeated-message.parquet | Bin 0 -> 600 bytes
 .../src/test/resources/old-repeated.parquet     | Bin 0 -> 432 bytes
 .../parquet-thrift-compat.snappy.parquet        | Bin
 .../resources/proto-repeated-string.parquet     | Bin 0 -> 411 bytes
 .../resources/proto-repeated-struct.parquet     | Bin 0 -> 608 bytes
 .../proto-struct-with-array-many.parquet        | Bin 0 -> 802 bytes
 .../resources/proto-struct-with-array.parquet   | Bin 0 -> 1576 bytes
 .../ParquetProtobufCompatibilitySuite.scala     |  91 +++++++++++
 .../parquet/ParquetSchemaSuite.scala            |  30 ++++
 14 files changed, 247 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/01efa4f2/.rat-excludes
----------------------------------------------------------------------
diff --git a/.rat-excludes b/.rat-excludes
index 7277146..9165872 100644
--- a/.rat-excludes
+++ b/.rat-excludes
@@ -94,3 +94,4 @@ INDEX
 gen-java.*
 .*avpr
 org.apache.spark.sql.sources.DataSourceRegister
+.*parquet

http://git-wip-us.apache.org/repos/asf/spark/blob/01efa4f2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index 3542dfb..ab5a6dd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -21,11 +21,11 @@ import java.math.{BigDecimal, BigInteger}
 import java.nio.ByteOrder
 
 import scala.collection.JavaConversions._
-import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.parquet.column.Dictionary
 import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, 
PrimitiveConverter}
+import org.apache.parquet.schema.OriginalType.LIST
 import org.apache.parquet.schema.Type.Repetition
 import org.apache.parquet.schema.{GroupType, PrimitiveType, Type}
 
@@ -42,6 +42,12 @@ import org.apache.spark.unsafe.types.UTF8String
  * values to an [[ArrayBuffer]].
  */
 private[parquet] trait ParentContainerUpdater {
+  /** Called before a record field is being converted */
+  def start(): Unit = ()
+
+  /** Called after a record field is being converted */
+  def end(): Unit = ()
+
   def set(value: Any): Unit = ()
   def setBoolean(value: Boolean): Unit = set(value)
   def setByte(value: Byte): Unit = set(value)
@@ -55,6 +61,32 @@ private[parquet] trait ParentContainerUpdater {
 /** A no-op updater used for root converter (who doesn't have a parent). */
 private[parquet] object NoopUpdater extends ParentContainerUpdater
 
+private[parquet] trait HasParentContainerUpdater {
+  def updater: ParentContainerUpdater
+}
+
+/**
+ * A convenient converter class for Parquet group types with an 
[[HasParentContainerUpdater]].
+ */
+private[parquet] abstract class CatalystGroupConverter(val updater: 
ParentContainerUpdater)
+  extends GroupConverter with HasParentContainerUpdater
+
+/**
+ * Parquet converter for Parquet primitive types.  Note that not all Spark SQL 
atomic types
+ * are handled by this converter.  Parquet primitive types are only a subset 
of those of Spark
+ * SQL.  For example, BYTE, SHORT, and INT in Spark SQL are all covered by 
INT32 in Parquet.
+ */
+private[parquet] class CatalystPrimitiveConverter(val updater: 
ParentContainerUpdater)
+  extends PrimitiveConverter with HasParentContainerUpdater {
+
+  override def addBoolean(value: Boolean): Unit = updater.setBoolean(value)
+  override def addInt(value: Int): Unit = updater.setInt(value)
+  override def addLong(value: Long): Unit = updater.setLong(value)
+  override def addFloat(value: Float): Unit = updater.setFloat(value)
+  override def addDouble(value: Double): Unit = updater.setDouble(value)
+  override def addBinary(value: Binary): Unit = updater.set(value.getBytes)
+}
+
 /**
  * A [[CatalystRowConverter]] is used to convert Parquet "structs" into Spark 
SQL [[InternalRow]]s.
  * Since any Parquet record is also a struct, this converter can also be used 
as root converter.
@@ -70,7 +102,7 @@ private[parquet] class CatalystRowConverter(
     parquetType: GroupType,
     catalystType: StructType,
     updater: ParentContainerUpdater)
-  extends GroupConverter {
+  extends CatalystGroupConverter(updater) {
 
   /**
    * Updater used together with field converters within a 
[[CatalystRowConverter]].  It propagates
@@ -89,13 +121,11 @@ private[parquet] class CatalystRowConverter(
 
   /**
    * Represents the converted row object once an entire Parquet record is 
converted.
-   *
-   * @todo Uses [[UnsafeRow]] for better performance.
    */
   val currentRow = new SpecificMutableRow(catalystType.map(_.dataType))
 
   // Converters for each field.
-  private val fieldConverters: Array[Converter] = {
+  private val fieldConverters: Array[Converter with HasParentContainerUpdater] 
= {
     parquetType.getFields.zip(catalystType).zipWithIndex.map {
       case ((parquetFieldType, catalystField), ordinal) =>
         // Converted field value should be set to the `ordinal`-th cell of 
`currentRow`
@@ -105,11 +135,19 @@ private[parquet] class CatalystRowConverter(
 
   override def getConverter(fieldIndex: Int): Converter = 
fieldConverters(fieldIndex)
 
-  override def end(): Unit = updater.set(currentRow)
+  override def end(): Unit = {
+    var i = 0
+    while (i < currentRow.numFields) {
+      fieldConverters(i).updater.end()
+      i += 1
+    }
+    updater.set(currentRow)
+  }
 
   override def start(): Unit = {
     var i = 0
     while (i < currentRow.numFields) {
+      fieldConverters(i).updater.start()
       currentRow.setNullAt(i)
       i += 1
     }
@@ -122,20 +160,20 @@ private[parquet] class CatalystRowConverter(
   private def newConverter(
       parquetType: Type,
       catalystType: DataType,
-      updater: ParentContainerUpdater): Converter = {
+      updater: ParentContainerUpdater): Converter with 
HasParentContainerUpdater = {
 
     catalystType match {
       case BooleanType | IntegerType | LongType | FloatType | DoubleType | 
BinaryType =>
         new CatalystPrimitiveConverter(updater)
 
       case ByteType =>
-        new PrimitiveConverter {
+        new CatalystPrimitiveConverter(updater) {
           override def addInt(value: Int): Unit =
             updater.setByte(value.asInstanceOf[ByteType#InternalType])
         }
 
       case ShortType =>
-        new PrimitiveConverter {
+        new CatalystPrimitiveConverter(updater) {
           override def addInt(value: Int): Unit =
             updater.setShort(value.asInstanceOf[ShortType#InternalType])
         }
@@ -148,7 +186,7 @@ private[parquet] class CatalystRowConverter(
 
       case TimestampType =>
         // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that.
-        new PrimitiveConverter {
+        new CatalystPrimitiveConverter(updater) {
           // Converts nanosecond timestamps stored as INT96
           override def addBinary(value: Binary): Unit = {
             assert(
@@ -164,13 +202,23 @@ private[parquet] class CatalystRowConverter(
         }
 
       case DateType =>
-        new PrimitiveConverter {
+        new CatalystPrimitiveConverter(updater) {
           override def addInt(value: Int): Unit = {
             // DateType is not specialized in `SpecificMutableRow`, have to 
box it here.
             updater.set(value.asInstanceOf[DateType#InternalType])
           }
         }
 
+      // A repeated field that is neither contained by a `LIST`- or 
`MAP`-annotated group nor
+      // annotated by `LIST` or `MAP` should be interpreted as a required list 
of required
+      // elements where the element type is the type of the field.
+      case t: ArrayType if parquetType.getOriginalType != LIST =>
+        if (parquetType.isPrimitive) {
+          new RepeatedPrimitiveConverter(parquetType, t.elementType, updater)
+        } else {
+          new RepeatedGroupConverter(parquetType, t.elementType, updater)
+        }
+
       case t: ArrayType =>
         new CatalystArrayConverter(parquetType.asGroupType(), t, updater)
 
@@ -196,26 +244,10 @@ private[parquet] class CatalystRowConverter(
   }
 
   /**
-   * Parquet converter for Parquet primitive types.  Note that not all Spark 
SQL atomic types
-   * are handled by this converter.  Parquet primitive types are only a subset 
of those of Spark
-   * SQL.  For example, BYTE, SHORT, and INT in Spark SQL are all covered by 
INT32 in Parquet.
-   */
-  private final class CatalystPrimitiveConverter(updater: 
ParentContainerUpdater)
-    extends PrimitiveConverter {
-
-    override def addBoolean(value: Boolean): Unit = updater.setBoolean(value)
-    override def addInt(value: Int): Unit = updater.setInt(value)
-    override def addLong(value: Long): Unit = updater.setLong(value)
-    override def addFloat(value: Float): Unit = updater.setFloat(value)
-    override def addDouble(value: Double): Unit = updater.setDouble(value)
-    override def addBinary(value: Binary): Unit = updater.set(value.getBytes)
-  }
-
-  /**
    * Parquet converter for strings. A dictionary is used to minimize string 
decoding cost.
    */
   private final class CatalystStringConverter(updater: ParentContainerUpdater)
-    extends PrimitiveConverter {
+    extends CatalystPrimitiveConverter(updater) {
 
     private var expandedDictionary: Array[UTF8String] = null
 
@@ -242,7 +274,7 @@ private[parquet] class CatalystRowConverter(
   private final class CatalystDecimalConverter(
       decimalType: DecimalType,
       updater: ParentContainerUpdater)
-    extends PrimitiveConverter {
+    extends CatalystPrimitiveConverter(updater) {
 
     // Converts decimals stored as INT32
     override def addInt(value: Int): Unit = {
@@ -306,7 +338,7 @@ private[parquet] class CatalystRowConverter(
       parquetSchema: GroupType,
       catalystSchema: ArrayType,
       updater: ParentContainerUpdater)
-    extends GroupConverter {
+    extends CatalystGroupConverter(updater) {
 
     private var currentArray: ArrayBuffer[Any] = _
 
@@ -383,7 +415,7 @@ private[parquet] class CatalystRowConverter(
       parquetType: GroupType,
       catalystType: MapType,
       updater: ParentContainerUpdater)
-    extends GroupConverter {
+    extends CatalystGroupConverter(updater) {
 
     private var currentKeys: ArrayBuffer[Any] = _
     private var currentValues: ArrayBuffer[Any] = _
@@ -446,4 +478,61 @@ private[parquet] class CatalystRowConverter(
       }
     }
   }
+
+  private trait RepeatedConverter {
+    private var currentArray: ArrayBuffer[Any] = _
+
+    protected def newArrayUpdater(updater: ParentContainerUpdater) = new 
ParentContainerUpdater {
+      override def start(): Unit = currentArray = ArrayBuffer.empty[Any]
+      override def end(): Unit = updater.set(new 
GenericArrayData(currentArray.toArray))
+      override def set(value: Any): Unit = currentArray += value
+    }
+  }
+
+  /**
+   * A primitive converter for converting unannotated repeated primitive 
values to required arrays
+   * of required primitives values.
+   */
+  private final class RepeatedPrimitiveConverter(
+      parquetType: Type,
+      catalystType: DataType,
+      parentUpdater: ParentContainerUpdater)
+    extends PrimitiveConverter with RepeatedConverter with 
HasParentContainerUpdater {
+
+    val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater)
+
+    private val elementConverter: PrimitiveConverter =
+      newConverter(parquetType, catalystType, updater).asPrimitiveConverter()
+
+    override def addBoolean(value: Boolean): Unit = 
elementConverter.addBoolean(value)
+    override def addInt(value: Int): Unit = elementConverter.addInt(value)
+    override def addLong(value: Long): Unit = elementConverter.addLong(value)
+    override def addFloat(value: Float): Unit = 
elementConverter.addFloat(value)
+    override def addDouble(value: Double): Unit = 
elementConverter.addDouble(value)
+    override def addBinary(value: Binary): Unit = 
elementConverter.addBinary(value)
+
+    override def setDictionary(dict: Dictionary): Unit = 
elementConverter.setDictionary(dict)
+    override def hasDictionarySupport: Boolean = 
elementConverter.hasDictionarySupport
+    override def addValueFromDictionary(id: Int): Unit = 
elementConverter.addValueFromDictionary(id)
+  }
+
+  /**
+   * A group converter for converting unannotated repeated group values to 
required arrays of
+   * required struct values.
+   */
+  private final class RepeatedGroupConverter(
+      parquetType: Type,
+      catalystType: DataType,
+      parentUpdater: ParentContainerUpdater)
+    extends GroupConverter with HasParentContainerUpdater with 
RepeatedConverter {
+
+    val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater)
+
+    private val elementConverter: GroupConverter =
+      newConverter(parquetType, catalystType, updater).asGroupConverter()
+
+    override def getConverter(field: Int): Converter = 
elementConverter.getConverter(field)
+    override def end(): Unit = elementConverter.end()
+    override def start(): Unit = elementConverter.start()
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/01efa4f2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index a3fc74c..275646e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -100,8 +100,11 @@ private[parquet] class CatalystSchemaConverter(
           StructField(field.getName, convertField(field), nullable = false)
 
         case REPEATED =>
-          throw new AnalysisException(
-            s"REPEATED not supported outside LIST or MAP. Type: $field")
+          // A repeated field that is neither contained by a `LIST`- or 
`MAP`-annotated group nor
+          // annotated by `LIST` or `MAP` should be interpreted as a required 
list of required
+          // elements where the element type is the type of the field.
+          val arrayType = ArrayType(convertField(field), containsNull = false)
+          StructField(field.getName, arrayType, nullable = false)
       }
     }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/01efa4f2/sql/core/src/test/resources/nested-array-struct.parquet
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/nested-array-struct.parquet 
b/sql/core/src/test/resources/nested-array-struct.parquet
new file mode 100644
index 0000000..41a43fa
Binary files /dev/null and 
b/sql/core/src/test/resources/nested-array-struct.parquet differ

http://git-wip-us.apache.org/repos/asf/spark/blob/01efa4f2/sql/core/src/test/resources/old-repeated-int.parquet
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/old-repeated-int.parquet 
b/sql/core/src/test/resources/old-repeated-int.parquet
new file mode 100644
index 0000000..520922f
Binary files /dev/null and 
b/sql/core/src/test/resources/old-repeated-int.parquet differ

http://git-wip-us.apache.org/repos/asf/spark/blob/01efa4f2/sql/core/src/test/resources/old-repeated-message.parquet
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/old-repeated-message.parquet 
b/sql/core/src/test/resources/old-repeated-message.parquet
new file mode 100644
index 0000000..548db99
Binary files /dev/null and 
b/sql/core/src/test/resources/old-repeated-message.parquet differ

http://git-wip-us.apache.org/repos/asf/spark/blob/01efa4f2/sql/core/src/test/resources/old-repeated.parquet
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/old-repeated.parquet 
b/sql/core/src/test/resources/old-repeated.parquet
new file mode 100644
index 0000000..213f1a9
Binary files /dev/null and b/sql/core/src/test/resources/old-repeated.parquet 
differ

http://git-wip-us.apache.org/repos/asf/spark/blob/01efa4f2/sql/core/src/test/resources/parquet-thrift-compat.snappy.parquet
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/parquet-thrift-compat.snappy.parquet 
b/sql/core/src/test/resources/parquet-thrift-compat.snappy.parquet
old mode 100755
new mode 100644

http://git-wip-us.apache.org/repos/asf/spark/blob/01efa4f2/sql/core/src/test/resources/proto-repeated-string.parquet
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/proto-repeated-string.parquet 
b/sql/core/src/test/resources/proto-repeated-string.parquet
new file mode 100644
index 0000000..8a7eea6
Binary files /dev/null and 
b/sql/core/src/test/resources/proto-repeated-string.parquet differ

http://git-wip-us.apache.org/repos/asf/spark/blob/01efa4f2/sql/core/src/test/resources/proto-repeated-struct.parquet
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/proto-repeated-struct.parquet 
b/sql/core/src/test/resources/proto-repeated-struct.parquet
new file mode 100644
index 0000000..c29eee3
Binary files /dev/null and 
b/sql/core/src/test/resources/proto-repeated-struct.parquet differ

http://git-wip-us.apache.org/repos/asf/spark/blob/01efa4f2/sql/core/src/test/resources/proto-struct-with-array-many.parquet
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/proto-struct-with-array-many.parquet 
b/sql/core/src/test/resources/proto-struct-with-array-many.parquet
new file mode 100644
index 0000000..ff98096
Binary files /dev/null and 
b/sql/core/src/test/resources/proto-struct-with-array-many.parquet differ

http://git-wip-us.apache.org/repos/asf/spark/blob/01efa4f2/sql/core/src/test/resources/proto-struct-with-array.parquet
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/proto-struct-with-array.parquet 
b/sql/core/src/test/resources/proto-struct-with-array.parquet
new file mode 100644
index 0000000..325a837
Binary files /dev/null and 
b/sql/core/src/test/resources/proto-struct-with-array.parquet differ

http://git-wip-us.apache.org/repos/asf/spark/blob/01efa4f2/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala
new file mode 100644
index 0000000..981334c
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.{DataFrame, Row, SQLContext}
+
+class ParquetProtobufCompatibilitySuite extends ParquetCompatibilityTest {
+  override def sqlContext: SQLContext = TestSQLContext
+
+  private def readParquetProtobufFile(name: String): DataFrame = {
+    val url = Thread.currentThread().getContextClassLoader.getResource(name)
+    sqlContext.read.parquet(url.toString)
+  }
+
+  test("unannotated array of primitive type") {
+    checkAnswer(readParquetProtobufFile("old-repeated-int.parquet"), 
Row(Seq(1, 2, 3)))
+  }
+
+  test("unannotated array of struct") {
+    checkAnswer(
+      readParquetProtobufFile("old-repeated-message.parquet"),
+      Row(
+        Seq(
+          Row("First inner", null, null),
+          Row(null, "Second inner", null),
+          Row(null, null, "Third inner"))))
+
+    checkAnswer(
+      readParquetProtobufFile("proto-repeated-struct.parquet"),
+      Row(
+        Seq(
+          Row("0 - 1", "0 - 2", "0 - 3"),
+          Row("1 - 1", "1 - 2", "1 - 3"))))
+
+    checkAnswer(
+      readParquetProtobufFile("proto-struct-with-array-many.parquet"),
+      Seq(
+        Row(
+          Seq(
+            Row("0 - 0 - 1", "0 - 0 - 2", "0 - 0 - 3"),
+            Row("0 - 1 - 1", "0 - 1 - 2", "0 - 1 - 3"))),
+        Row(
+          Seq(
+            Row("1 - 0 - 1", "1 - 0 - 2", "1 - 0 - 3"),
+            Row("1 - 1 - 1", "1 - 1 - 2", "1 - 1 - 3"))),
+        Row(
+          Seq(
+            Row("2 - 0 - 1", "2 - 0 - 2", "2 - 0 - 3"),
+            Row("2 - 1 - 1", "2 - 1 - 2", "2 - 1 - 3")))))
+  }
+
+  test("struct with unannotated array") {
+    checkAnswer(
+      readParquetProtobufFile("proto-struct-with-array.parquet"),
+      Row(10, 9, Seq.empty, null, Row(9), Seq(Row(9), Row(10))))
+  }
+
+  test("unannotated array of struct with unannotated array") {
+    checkAnswer(
+      readParquetProtobufFile("nested-array-struct.parquet"),
+      Seq(
+        Row(2, Seq(Row(1, Seq(Row(3))))),
+        Row(5, Seq(Row(4, Seq(Row(6))))),
+        Row(8, Seq(Row(7, Seq(Row(9)))))))
+  }
+
+  test("unannotated array of string") {
+    checkAnswer(
+      readParquetProtobufFile("proto-repeated-string.parquet"),
+      Seq(
+        Row(Seq("hello", "world")),
+        Row(Seq("good", "bye")),
+        Row(Seq("one", "two", "three"))))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/01efa4f2/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 8f06de7..971f71e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -585,6 +585,36 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
       |}
     """.stripMargin)
 
+  testParquetToCatalyst(
+    "Backwards-compatibility: LIST with non-nullable element type 7 - " +
+      "parquet-protobuf primitive lists",
+    new StructType()
+      .add("f1", ArrayType(IntegerType, containsNull = false), nullable = 
false),
+    """message root {
+      |  repeated int32 f1;
+      |}
+    """.stripMargin)
+
+  testParquetToCatalyst(
+    "Backwards-compatibility: LIST with non-nullable element type 8 - " +
+      "parquet-protobuf non-primitive lists",
+    {
+      val elementType =
+        new StructType()
+          .add("c1", StringType, nullable = true)
+          .add("c2", IntegerType, nullable = false)
+
+      new StructType()
+        .add("f1", ArrayType(elementType, containsNull = false), nullable = 
false)
+    },
+    """message root {
+      |  repeated group f1 {
+      |    optional binary c1 (UTF8);
+      |    required int32 c2;
+      |  }
+      |}
+    """.stripMargin)
+
   // =======================================================
   // Tests for converting Catalyst ArrayType to Parquet LIST
   // =======================================================


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to