Repository: spark Updated Branches: refs/heads/master 3c9802d94 -> 071bbad5d
[SPARK-9340] [SQL] Fixes converting unannotated Parquet lists This PR is inspired by #8063 authored by dguy. Especially, testing Parquet files added here are all taken from that PR. **Committer who merges this PR should attribute it to "Damian Guy <damian.guygmail.com>".** ---- SPARK-6776 and SPARK-6777 followed `parquet-avro` to implement backwards-compatibility rules defined in `parquet-format` spec. However, both Spark SQL and `parquet-avro` neglected the following statement in `parquet-format`: > This does not affect repeated fields that are not annotated: A repeated field > that is neither contained by a `LIST`- or `MAP`-annotated group nor annotated > by `LIST` or `MAP` should be interpreted as a required list of required > elements where the element type is the type of the field. One of the consequences is that, Parquet files generated by `parquet-protobuf` containing unannotated repeated fields are not correctly converted to Catalyst arrays. This PR fixes this issue by 1. Handling unannotated repeated fields in `CatalystSchemaConverter`. 2. Converting this kind of special repeated fields to Catalyst arrays in `CatalystRowConverter`. Two special converters, `RepeatedPrimitiveConverter` and `RepeatedGroupConverter`, are added. They delegate actual conversion work to a child `elementConverter` and accumulates elements in an `ArrayBuffer`. Two extra methods, `start()` and `end()`, are added to `ParentContainerUpdater`. So that they can be used to initialize new `ArrayBuffer`s for unannotated repeated fields, and propagate converted array values to upstream. Author: Cheng Lian <l...@databricks.com> Closes #8070 from liancheng/spark-9340/unannotated-parquet-list and squashes the following commits: ace6df7 [Cheng Lian] Moves ParquetProtobufCompatibilitySuite f1c7bfd [Cheng Lian] Updates .rat-excludes 420ad2b [Cheng Lian] Fixes converting unannotated Parquet lists Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/071bbad5 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/071bbad5 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/071bbad5 Branch: refs/heads/master Commit: 071bbad5db1096a548c886762b611a8484a52753 Parents: 3c9802d Author: Damian Guy <damian....@gmail.com> Authored: Tue Aug 11 12:46:33 2015 +0800 Committer: Cheng Lian <l...@databricks.com> Committed: Tue Aug 11 12:46:33 2015 +0800 ---------------------------------------------------------------------- .rat-excludes | 1 + .../parquet/CatalystRowConverter.scala | 151 +++++++++++++++---- .../parquet/CatalystSchemaConverter.scala | 7 +- .../test/resources/nested-array-struct.parquet | Bin 0 -> 775 bytes .../src/test/resources/old-repeated-int.parquet | Bin 0 -> 389 bytes .../test/resources/old-repeated-message.parquet | Bin 0 -> 600 bytes .../src/test/resources/old-repeated.parquet | Bin 0 -> 432 bytes .../parquet-thrift-compat.snappy.parquet | Bin .../resources/proto-repeated-string.parquet | Bin 0 -> 411 bytes .../resources/proto-repeated-struct.parquet | Bin 0 -> 608 bytes .../proto-struct-with-array-many.parquet | Bin 0 -> 802 bytes .../resources/proto-struct-with-array.parquet | Bin 0 -> 1576 bytes .../ParquetProtobufCompatibilitySuite.scala | 91 +++++++++++ .../parquet/ParquetSchemaSuite.scala | 30 ++++ 14 files changed, 247 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/071bbad5/.rat-excludes ---------------------------------------------------------------------- diff --git a/.rat-excludes b/.rat-excludes index 7277146..9165872 100644 --- a/.rat-excludes +++ b/.rat-excludes @@ -94,3 +94,4 @@ INDEX gen-java.* .*avpr org.apache.spark.sql.sources.DataSourceRegister +.*parquet http://git-wip-us.apache.org/repos/asf/spark/blob/071bbad5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index 3542dfb..ab5a6dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -21,11 +21,11 @@ import java.math.{BigDecimal, BigInteger} import java.nio.ByteOrder import scala.collection.JavaConversions._ -import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.parquet.column.Dictionary import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} +import org.apache.parquet.schema.OriginalType.LIST import org.apache.parquet.schema.Type.Repetition import org.apache.parquet.schema.{GroupType, PrimitiveType, Type} @@ -42,6 +42,12 @@ import org.apache.spark.unsafe.types.UTF8String * values to an [[ArrayBuffer]]. */ private[parquet] trait ParentContainerUpdater { + /** Called before a record field is being converted */ + def start(): Unit = () + + /** Called after a record field is being converted */ + def end(): Unit = () + def set(value: Any): Unit = () def setBoolean(value: Boolean): Unit = set(value) def setByte(value: Byte): Unit = set(value) @@ -55,6 +61,32 @@ private[parquet] trait ParentContainerUpdater { /** A no-op updater used for root converter (who doesn't have a parent). */ private[parquet] object NoopUpdater extends ParentContainerUpdater +private[parquet] trait HasParentContainerUpdater { + def updater: ParentContainerUpdater +} + +/** + * A convenient converter class for Parquet group types with an [[HasParentContainerUpdater]]. + */ +private[parquet] abstract class CatalystGroupConverter(val updater: ParentContainerUpdater) + extends GroupConverter with HasParentContainerUpdater + +/** + * Parquet converter for Parquet primitive types. Note that not all Spark SQL atomic types + * are handled by this converter. Parquet primitive types are only a subset of those of Spark + * SQL. For example, BYTE, SHORT, and INT in Spark SQL are all covered by INT32 in Parquet. + */ +private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUpdater) + extends PrimitiveConverter with HasParentContainerUpdater { + + override def addBoolean(value: Boolean): Unit = updater.setBoolean(value) + override def addInt(value: Int): Unit = updater.setInt(value) + override def addLong(value: Long): Unit = updater.setLong(value) + override def addFloat(value: Float): Unit = updater.setFloat(value) + override def addDouble(value: Double): Unit = updater.setDouble(value) + override def addBinary(value: Binary): Unit = updater.set(value.getBytes) +} + /** * A [[CatalystRowConverter]] is used to convert Parquet "structs" into Spark SQL [[InternalRow]]s. * Since any Parquet record is also a struct, this converter can also be used as root converter. @@ -70,7 +102,7 @@ private[parquet] class CatalystRowConverter( parquetType: GroupType, catalystType: StructType, updater: ParentContainerUpdater) - extends GroupConverter { + extends CatalystGroupConverter(updater) { /** * Updater used together with field converters within a [[CatalystRowConverter]]. It propagates @@ -89,13 +121,11 @@ private[parquet] class CatalystRowConverter( /** * Represents the converted row object once an entire Parquet record is converted. - * - * @todo Uses [[UnsafeRow]] for better performance. */ val currentRow = new SpecificMutableRow(catalystType.map(_.dataType)) // Converters for each field. - private val fieldConverters: Array[Converter] = { + private val fieldConverters: Array[Converter with HasParentContainerUpdater] = { parquetType.getFields.zip(catalystType).zipWithIndex.map { case ((parquetFieldType, catalystField), ordinal) => // Converted field value should be set to the `ordinal`-th cell of `currentRow` @@ -105,11 +135,19 @@ private[parquet] class CatalystRowConverter( override def getConverter(fieldIndex: Int): Converter = fieldConverters(fieldIndex) - override def end(): Unit = updater.set(currentRow) + override def end(): Unit = { + var i = 0 + while (i < currentRow.numFields) { + fieldConverters(i).updater.end() + i += 1 + } + updater.set(currentRow) + } override def start(): Unit = { var i = 0 while (i < currentRow.numFields) { + fieldConverters(i).updater.start() currentRow.setNullAt(i) i += 1 } @@ -122,20 +160,20 @@ private[parquet] class CatalystRowConverter( private def newConverter( parquetType: Type, catalystType: DataType, - updater: ParentContainerUpdater): Converter = { + updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = { catalystType match { case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType => new CatalystPrimitiveConverter(updater) case ByteType => - new PrimitiveConverter { + new CatalystPrimitiveConverter(updater) { override def addInt(value: Int): Unit = updater.setByte(value.asInstanceOf[ByteType#InternalType]) } case ShortType => - new PrimitiveConverter { + new CatalystPrimitiveConverter(updater) { override def addInt(value: Int): Unit = updater.setShort(value.asInstanceOf[ShortType#InternalType]) } @@ -148,7 +186,7 @@ private[parquet] class CatalystRowConverter( case TimestampType => // TODO Implements `TIMESTAMP_MICROS` once parquet-mr has that. - new PrimitiveConverter { + new CatalystPrimitiveConverter(updater) { // Converts nanosecond timestamps stored as INT96 override def addBinary(value: Binary): Unit = { assert( @@ -164,13 +202,23 @@ private[parquet] class CatalystRowConverter( } case DateType => - new PrimitiveConverter { + new CatalystPrimitiveConverter(updater) { override def addInt(value: Int): Unit = { // DateType is not specialized in `SpecificMutableRow`, have to box it here. updater.set(value.asInstanceOf[DateType#InternalType]) } } + // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor + // annotated by `LIST` or `MAP` should be interpreted as a required list of required + // elements where the element type is the type of the field. + case t: ArrayType if parquetType.getOriginalType != LIST => + if (parquetType.isPrimitive) { + new RepeatedPrimitiveConverter(parquetType, t.elementType, updater) + } else { + new RepeatedGroupConverter(parquetType, t.elementType, updater) + } + case t: ArrayType => new CatalystArrayConverter(parquetType.asGroupType(), t, updater) @@ -196,26 +244,10 @@ private[parquet] class CatalystRowConverter( } /** - * Parquet converter for Parquet primitive types. Note that not all Spark SQL atomic types - * are handled by this converter. Parquet primitive types are only a subset of those of Spark - * SQL. For example, BYTE, SHORT, and INT in Spark SQL are all covered by INT32 in Parquet. - */ - private final class CatalystPrimitiveConverter(updater: ParentContainerUpdater) - extends PrimitiveConverter { - - override def addBoolean(value: Boolean): Unit = updater.setBoolean(value) - override def addInt(value: Int): Unit = updater.setInt(value) - override def addLong(value: Long): Unit = updater.setLong(value) - override def addFloat(value: Float): Unit = updater.setFloat(value) - override def addDouble(value: Double): Unit = updater.setDouble(value) - override def addBinary(value: Binary): Unit = updater.set(value.getBytes) - } - - /** * Parquet converter for strings. A dictionary is used to minimize string decoding cost. */ private final class CatalystStringConverter(updater: ParentContainerUpdater) - extends PrimitiveConverter { + extends CatalystPrimitiveConverter(updater) { private var expandedDictionary: Array[UTF8String] = null @@ -242,7 +274,7 @@ private[parquet] class CatalystRowConverter( private final class CatalystDecimalConverter( decimalType: DecimalType, updater: ParentContainerUpdater) - extends PrimitiveConverter { + extends CatalystPrimitiveConverter(updater) { // Converts decimals stored as INT32 override def addInt(value: Int): Unit = { @@ -306,7 +338,7 @@ private[parquet] class CatalystRowConverter( parquetSchema: GroupType, catalystSchema: ArrayType, updater: ParentContainerUpdater) - extends GroupConverter { + extends CatalystGroupConverter(updater) { private var currentArray: ArrayBuffer[Any] = _ @@ -383,7 +415,7 @@ private[parquet] class CatalystRowConverter( parquetType: GroupType, catalystType: MapType, updater: ParentContainerUpdater) - extends GroupConverter { + extends CatalystGroupConverter(updater) { private var currentKeys: ArrayBuffer[Any] = _ private var currentValues: ArrayBuffer[Any] = _ @@ -446,4 +478,61 @@ private[parquet] class CatalystRowConverter( } } } + + private trait RepeatedConverter { + private var currentArray: ArrayBuffer[Any] = _ + + protected def newArrayUpdater(updater: ParentContainerUpdater) = new ParentContainerUpdater { + override def start(): Unit = currentArray = ArrayBuffer.empty[Any] + override def end(): Unit = updater.set(new GenericArrayData(currentArray.toArray)) + override def set(value: Any): Unit = currentArray += value + } + } + + /** + * A primitive converter for converting unannotated repeated primitive values to required arrays + * of required primitives values. + */ + private final class RepeatedPrimitiveConverter( + parquetType: Type, + catalystType: DataType, + parentUpdater: ParentContainerUpdater) + extends PrimitiveConverter with RepeatedConverter with HasParentContainerUpdater { + + val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater) + + private val elementConverter: PrimitiveConverter = + newConverter(parquetType, catalystType, updater).asPrimitiveConverter() + + override def addBoolean(value: Boolean): Unit = elementConverter.addBoolean(value) + override def addInt(value: Int): Unit = elementConverter.addInt(value) + override def addLong(value: Long): Unit = elementConverter.addLong(value) + override def addFloat(value: Float): Unit = elementConverter.addFloat(value) + override def addDouble(value: Double): Unit = elementConverter.addDouble(value) + override def addBinary(value: Binary): Unit = elementConverter.addBinary(value) + + override def setDictionary(dict: Dictionary): Unit = elementConverter.setDictionary(dict) + override def hasDictionarySupport: Boolean = elementConverter.hasDictionarySupport + override def addValueFromDictionary(id: Int): Unit = elementConverter.addValueFromDictionary(id) + } + + /** + * A group converter for converting unannotated repeated group values to required arrays of + * required struct values. + */ + private final class RepeatedGroupConverter( + parquetType: Type, + catalystType: DataType, + parentUpdater: ParentContainerUpdater) + extends GroupConverter with HasParentContainerUpdater with RepeatedConverter { + + val updater: ParentContainerUpdater = newArrayUpdater(parentUpdater) + + private val elementConverter: GroupConverter = + newConverter(parquetType, catalystType, updater).asGroupConverter() + + override def getConverter(field: Int): Converter = elementConverter.getConverter(field) + override def end(): Unit = elementConverter.end() + override def start(): Unit = elementConverter.start() + } } http://git-wip-us.apache.org/repos/asf/spark/blob/071bbad5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index a3fc74c..275646e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -100,8 +100,11 @@ private[parquet] class CatalystSchemaConverter( StructField(field.getName, convertField(field), nullable = false) case REPEATED => - throw new AnalysisException( - s"REPEATED not supported outside LIST or MAP. Type: $field") + // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor + // annotated by `LIST` or `MAP` should be interpreted as a required list of required + // elements where the element type is the type of the field. + val arrayType = ArrayType(convertField(field), containsNull = false) + StructField(field.getName, arrayType, nullable = false) } } http://git-wip-us.apache.org/repos/asf/spark/blob/071bbad5/sql/core/src/test/resources/nested-array-struct.parquet ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/nested-array-struct.parquet b/sql/core/src/test/resources/nested-array-struct.parquet new file mode 100644 index 0000000..41a43fa Binary files /dev/null and b/sql/core/src/test/resources/nested-array-struct.parquet differ http://git-wip-us.apache.org/repos/asf/spark/blob/071bbad5/sql/core/src/test/resources/old-repeated-int.parquet ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/old-repeated-int.parquet b/sql/core/src/test/resources/old-repeated-int.parquet new file mode 100644 index 0000000..520922f Binary files /dev/null and b/sql/core/src/test/resources/old-repeated-int.parquet differ http://git-wip-us.apache.org/repos/asf/spark/blob/071bbad5/sql/core/src/test/resources/old-repeated-message.parquet ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/old-repeated-message.parquet b/sql/core/src/test/resources/old-repeated-message.parquet new file mode 100644 index 0000000..548db99 Binary files /dev/null and b/sql/core/src/test/resources/old-repeated-message.parquet differ http://git-wip-us.apache.org/repos/asf/spark/blob/071bbad5/sql/core/src/test/resources/old-repeated.parquet ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/old-repeated.parquet b/sql/core/src/test/resources/old-repeated.parquet new file mode 100644 index 0000000..213f1a9 Binary files /dev/null and b/sql/core/src/test/resources/old-repeated.parquet differ http://git-wip-us.apache.org/repos/asf/spark/blob/071bbad5/sql/core/src/test/resources/parquet-thrift-compat.snappy.parquet ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/parquet-thrift-compat.snappy.parquet b/sql/core/src/test/resources/parquet-thrift-compat.snappy.parquet old mode 100755 new mode 100644 http://git-wip-us.apache.org/repos/asf/spark/blob/071bbad5/sql/core/src/test/resources/proto-repeated-string.parquet ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/proto-repeated-string.parquet b/sql/core/src/test/resources/proto-repeated-string.parquet new file mode 100644 index 0000000..8a7eea6 Binary files /dev/null and b/sql/core/src/test/resources/proto-repeated-string.parquet differ http://git-wip-us.apache.org/repos/asf/spark/blob/071bbad5/sql/core/src/test/resources/proto-repeated-struct.parquet ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/proto-repeated-struct.parquet b/sql/core/src/test/resources/proto-repeated-struct.parquet new file mode 100644 index 0000000..c29eee3 Binary files /dev/null and b/sql/core/src/test/resources/proto-repeated-struct.parquet differ http://git-wip-us.apache.org/repos/asf/spark/blob/071bbad5/sql/core/src/test/resources/proto-struct-with-array-many.parquet ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/proto-struct-with-array-many.parquet b/sql/core/src/test/resources/proto-struct-with-array-many.parquet new file mode 100644 index 0000000..ff98096 Binary files /dev/null and b/sql/core/src/test/resources/proto-struct-with-array-many.parquet differ http://git-wip-us.apache.org/repos/asf/spark/blob/071bbad5/sql/core/src/test/resources/proto-struct-with-array.parquet ---------------------------------------------------------------------- diff --git a/sql/core/src/test/resources/proto-struct-with-array.parquet b/sql/core/src/test/resources/proto-struct-with-array.parquet new file mode 100644 index 0000000..325a837 Binary files /dev/null and b/sql/core/src/test/resources/proto-struct-with-array.parquet differ http://git-wip-us.apache.org/repos/asf/spark/blob/071bbad5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala new file mode 100644 index 0000000..981334c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetProtobufCompatibilitySuite.scala @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.parquet + +import org.apache.spark.sql.test.TestSQLContext +import org.apache.spark.sql.{DataFrame, Row, SQLContext} + +class ParquetProtobufCompatibilitySuite extends ParquetCompatibilityTest { + override def sqlContext: SQLContext = TestSQLContext + + private def readParquetProtobufFile(name: String): DataFrame = { + val url = Thread.currentThread().getContextClassLoader.getResource(name) + sqlContext.read.parquet(url.toString) + } + + test("unannotated array of primitive type") { + checkAnswer(readParquetProtobufFile("old-repeated-int.parquet"), Row(Seq(1, 2, 3))) + } + + test("unannotated array of struct") { + checkAnswer( + readParquetProtobufFile("old-repeated-message.parquet"), + Row( + Seq( + Row("First inner", null, null), + Row(null, "Second inner", null), + Row(null, null, "Third inner")))) + + checkAnswer( + readParquetProtobufFile("proto-repeated-struct.parquet"), + Row( + Seq( + Row("0 - 1", "0 - 2", "0 - 3"), + Row("1 - 1", "1 - 2", "1 - 3")))) + + checkAnswer( + readParquetProtobufFile("proto-struct-with-array-many.parquet"), + Seq( + Row( + Seq( + Row("0 - 0 - 1", "0 - 0 - 2", "0 - 0 - 3"), + Row("0 - 1 - 1", "0 - 1 - 2", "0 - 1 - 3"))), + Row( + Seq( + Row("1 - 0 - 1", "1 - 0 - 2", "1 - 0 - 3"), + Row("1 - 1 - 1", "1 - 1 - 2", "1 - 1 - 3"))), + Row( + Seq( + Row("2 - 0 - 1", "2 - 0 - 2", "2 - 0 - 3"), + Row("2 - 1 - 1", "2 - 1 - 2", "2 - 1 - 3"))))) + } + + test("struct with unannotated array") { + checkAnswer( + readParquetProtobufFile("proto-struct-with-array.parquet"), + Row(10, 9, Seq.empty, null, Row(9), Seq(Row(9), Row(10)))) + } + + test("unannotated array of struct with unannotated array") { + checkAnswer( + readParquetProtobufFile("nested-array-struct.parquet"), + Seq( + Row(2, Seq(Row(1, Seq(Row(3))))), + Row(5, Seq(Row(4, Seq(Row(6))))), + Row(8, Seq(Row(7, Seq(Row(9))))))) + } + + test("unannotated array of string") { + checkAnswer( + readParquetProtobufFile("proto-repeated-string.parquet"), + Seq( + Row(Seq("hello", "world")), + Row(Seq("good", "bye")), + Row(Seq("one", "two", "three")))) + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/071bbad5/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 8f06de7..971f71e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -585,6 +585,36 @@ class ParquetSchemaSuite extends ParquetSchemaTest { |} """.stripMargin) + testParquetToCatalyst( + "Backwards-compatibility: LIST with non-nullable element type 7 - " + + "parquet-protobuf primitive lists", + new StructType() + .add("f1", ArrayType(IntegerType, containsNull = false), nullable = false), + """message root { + | repeated int32 f1; + |} + """.stripMargin) + + testParquetToCatalyst( + "Backwards-compatibility: LIST with non-nullable element type 8 - " + + "parquet-protobuf non-primitive lists", + { + val elementType = + new StructType() + .add("c1", StringType, nullable = true) + .add("c2", IntegerType, nullable = false) + + new StructType() + .add("f1", ArrayType(elementType, containsNull = false), nullable = false) + }, + """message root { + | repeated group f1 { + | optional binary c1 (UTF8); + | required int32 c2; + | } + |} + """.stripMargin) + // ======================================================= // Tests for converting Catalyst ArrayType to Parquet LIST // ======================================================= --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org