Repository: spark
Updated Branches:
  refs/heads/master 1fbd124b1 -> dba98bf69


[SPARK-4520] [SQL] This pr fixes the ArrayIndexOutOfBoundsException as r...

...aised in SPARK-4520.

The exception is thrown only for a thrift generated parquet file. The array 
element schema name is assumed as "array" as per ParquetAvro but for thrift 
generated parquet files, it is array_name + "_tuple". This leads to missing 
child of array group type and hence when the parquet rows are being 
materialized leads to the exception.

Author: Sadhan Sood <sad...@tellapart.com>

Closes #4148 from sadhan/SPARK-4520 and squashes the following commits:

c5ccde8 [Sadhan Sood] [SPARK-4520] [SQL] This pr fixes the 
ArrayIndexOutOfBoundsException as raised in SPARK-4520.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dba98bf6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dba98bf6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dba98bf6

Branch: refs/heads/master
Commit: dba98bf6987ec39380f1a5b0ca2772b694452231
Parents: 1fbd124
Author: Sadhan Sood <sad...@tellapart.com>
Authored: Wed Feb 4 19:18:06 2015 -0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Wed Feb 4 19:18:06 2015 -0800

----------------------------------------------------------------------
 .../spark/sql/parquet/ParquetConverter.scala    |  5 +++
 .../spark/sql/parquet/ParquetTableSupport.scala |  6 +++-
 .../apache/spark/sql/parquet/ParquetTypes.scala | 35 ++++++++++++++------
 .../spark/sql/parquet/ParquetSchemaSuite.scala  | 28 ++++++++++++++--
 4 files changed, 60 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/dba98bf6/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
index d87ddfe..7d62f37 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetConverter.scala
@@ -66,6 +66,11 @@ private[sql] object CatalystConverter {
   // Using a different value will result in Parquet silently dropping columns.
   val ARRAY_CONTAINS_NULL_BAG_SCHEMA_NAME = "bag"
   val ARRAY_ELEMENTS_SCHEMA_NAME = "array"
+  // SPARK-4520: Thrift generated parquet files have different array element
+  // schema names than avro. Thrift parquet uses array_schema_name + "_tuple"
+  // as opposed to "array" used by default. For more information, check
+  // TestThriftSchemaConverter.java in parquet.thrift.
+  val THRIFT_ARRAY_ELEMENTS_SCHEMA_NAME_SUFFIX = "_tuple"
   val MAP_KEY_SCHEMA_NAME = "key"
   val MAP_VALUE_SCHEMA_NAME = "value"
   val MAP_SCHEMA_NAME = "map"

http://git-wip-us.apache.org/repos/asf/spark/blob/dba98bf6/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
index 3fb1cc4..14c81ae 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTableSupport.scala
@@ -99,7 +99,11 @@ private[parquet] class RowReadSupport extends 
ReadSupport[Row] with Logging {
     val requestedAttributes = RowReadSupport.getRequestedSchema(configuration)
 
     if (requestedAttributes != null) {
-      parquetSchema = 
ParquetTypesConverter.convertFromAttributes(requestedAttributes)
+      // If the parquet file is thrift derived, there is a good chance that
+      // it will have the thrift class in metadata.
+      val isThriftDerived = keyValueMetaData.keySet().contains("thrift.class")
+      parquetSchema = ParquetTypesConverter
+        .convertFromAttributes(requestedAttributes, isThriftDerived)
       metadata.put(
         RowReadSupport.SPARK_ROW_REQUESTED_SCHEMA,
         ParquetTypesConverter.convertToString(requestedAttributes))

http://git-wip-us.apache.org/repos/asf/spark/blob/dba98bf6/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
index f1d4ff2..b646109 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetTypes.scala
@@ -285,13 +285,19 @@ private[parquet] object ParquetTypesConverter extends 
Logging {
       ctype: DataType,
       name: String,
       nullable: Boolean = true,
-      inArray: Boolean = false): ParquetType = {
+      inArray: Boolean = false, 
+      toThriftSchemaNames: Boolean = false): ParquetType = {
     val repetition =
       if (inArray) {
         Repetition.REPEATED
       } else {
         if (nullable) Repetition.OPTIONAL else Repetition.REQUIRED
       }
+    val arraySchemaName = if (toThriftSchemaNames) {
+      name + CatalystConverter.THRIFT_ARRAY_ELEMENTS_SCHEMA_NAME_SUFFIX
+    } else {
+      CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME
+    }
     val typeInfo = fromPrimitiveDataType(ctype)
     typeInfo.map {
       case ParquetTypeInfo(primitiveType, originalType, decimalMetadata, 
length) =>
@@ -306,22 +312,24 @@ private[parquet] object ParquetTypesConverter extends 
Logging {
     }.getOrElse {
       ctype match {
         case udt: UserDefinedType[_] => {
-          fromDataType(udt.sqlType, name, nullable, inArray)
+          fromDataType(udt.sqlType, name, nullable, inArray, 
toThriftSchemaNames)
         }
         case ArrayType(elementType, false) => {
           val parquetElementType = fromDataType(
             elementType,
-            CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
+            arraySchemaName,
             nullable = false,
-            inArray = true)
+            inArray = true,
+            toThriftSchemaNames)
           ConversionPatterns.listType(repetition, name, parquetElementType)
         }
         case ArrayType(elementType, true) => {
           val parquetElementType = fromDataType(
             elementType,
-            CatalystConverter.ARRAY_ELEMENTS_SCHEMA_NAME,
+            arraySchemaName,
             nullable = true,
-            inArray = false)
+            inArray = false,
+            toThriftSchemaNames)
           ConversionPatterns.listType(
             repetition,
             name,
@@ -332,7 +340,8 @@ private[parquet] object ParquetTypesConverter extends 
Logging {
         }
         case StructType(structFields) => {
           val fields = structFields.map {
-            field => fromDataType(field.dataType, field.name, field.nullable, 
inArray = false)
+            field => fromDataType(field.dataType, field.name, field.nullable, 
+                                  inArray = false, toThriftSchemaNames)
           }
           new ParquetGroupType(repetition, name, fields.toSeq)
         }
@@ -342,13 +351,15 @@ private[parquet] object ParquetTypesConverter extends 
Logging {
               keyType,
               CatalystConverter.MAP_KEY_SCHEMA_NAME,
               nullable = false,
-              inArray = false)
+              inArray = false,
+              toThriftSchemaNames)
           val parquetValueType =
             fromDataType(
               valueType,
               CatalystConverter.MAP_VALUE_SCHEMA_NAME,
               nullable = valueContainsNull,
-              inArray = false)
+              inArray = false,
+              toThriftSchemaNames)
           ConversionPatterns.mapType(
             repetition,
             name,
@@ -374,10 +385,12 @@ private[parquet] object ParquetTypesConverter extends 
Logging {
             field.getRepetition != Repetition.REQUIRED)())
   }
 
-  def convertFromAttributes(attributes: Seq[Attribute]): MessageType = {
+  def convertFromAttributes(attributes: Seq[Attribute],
+                            toThriftSchemaNames: Boolean = false): MessageType 
= {
     val fields = attributes.map(
       attribute =>
-        fromDataType(attribute.dataType, attribute.name, attribute.nullable))
+        fromDataType(attribute.dataType, attribute.name, attribute.nullable,
+                     toThriftSchemaNames = toThriftSchemaNames))
     new MessageType("root", fields)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/dba98bf6/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
index 6427495..5f7f31d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetSchemaSuite.scala
@@ -33,9 +33,10 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest {
    * Checks whether the reflected Parquet message type for product type `T` 
conforms `messageType`.
    */
   private def testSchema[T <: Product: ClassTag: TypeTag](
-      testName: String, messageType: String): Unit = {
+      testName: String, messageType: String, isThriftDerived: Boolean = 
false): Unit = {
     test(testName) {
-      val actual = 
ParquetTypesConverter.convertFromAttributes(ScalaReflection.attributesFor[T])
+      val actual = 
ParquetTypesConverter.convertFromAttributes(ScalaReflection.attributesFor[T], 
+                                                               isThriftDerived)
       val expected = MessageTypeParser.parseMessageType(messageType)
       actual.checkContains(expected)
       expected.checkContains(actual)
@@ -146,6 +147,29 @@ class ParquetSchemaSuite extends FunSuite with ParquetTest 
{
       |}
     """.stripMargin)
 
+  // Test for SPARK-4520 -- ensure that thrift generated parquet schema is 
generated
+  // as expected from attributes
+  testSchema[(Array[Byte], Array[Byte], Array[Byte], Seq[Int], 
Map[Array[Byte], Seq[Int]])](
+    "thrift generated parquet schema",
+    """
+      |message root {
+      |  optional binary _1 (UTF8);
+      |  optional binary _2 (UTF8);
+      |  optional binary _3 (UTF8);
+      |  optional group _4 (LIST) {
+      |    repeated int32 _4_tuple;
+      |  }
+      |  optional group _5 (MAP) {
+      |    repeated group map (MAP_KEY_VALUE) {
+      |      required binary key (UTF8);
+      |      optional group value (LIST) {
+      |        repeated int32 value_tuple;
+      |      }
+      |    }
+      |  }
+      |}
+    """.stripMargin, isThriftDerived = true)
+
   test("DataType string parser compatibility") {
     // This is the generated string from previous versions of the Spark SQL, 
using the following:
     // val schema = StructType(List(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to