Repository: spark
Updated Branches:
  refs/heads/master 5687f7655 -> 4ffc27caa


http://git-wip-us.apache.org/repos/asf/spark/blob/4ffc27ca/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/thrift/Suit.java
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/thrift/Suit.java 
b/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/thrift/Suit.java
new file mode 100644
index 0000000..5315c6a
--- /dev/null
+++ 
b/sql/core/src/test/gen-java/org/apache/spark/sql/parquet/test/thrift/Suit.java
@@ -0,0 +1,51 @@
+/**
+ * Autogenerated by Thrift Compiler (0.9.2)
+ *
+ * DO NOT EDIT UNLESS YOU ARE SURE THAT YOU KNOW WHAT YOU ARE DOING
+ *  @generated
+ */
+package org.apache.spark.sql.parquet.test.thrift;
+
+
+import java.util.Map;
+import java.util.HashMap;
+import org.apache.thrift.TEnum;
+
+public enum Suit implements org.apache.thrift.TEnum {
+  SPADES(0),
+  HEARTS(1),
+  DIAMONDS(2),
+  CLUBS(3);
+
+  private final int value;
+
+  private Suit(int value) {
+    this.value = value;
+  }
+
+  /**
+   * Get the integer value of this enum value, as defined in the Thrift IDL.
+   */
+  public int getValue() {
+    return value;
+  }
+
+  /**
+   * Find a the enum type by its integer value, as defined in the Thrift IDL.
+   * @return null if the value is not found.
+   */
+  public static Suit findByValue(int value) { 
+    switch (value) {
+      case 0:
+        return SPADES;
+      case 1:
+        return HEARTS;
+      case 2:
+        return DIAMONDS;
+      case 3:
+        return CLUBS;
+      default:
+        return null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4ffc27ca/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetAvroCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetAvroCompatibilitySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetAvroCompatibilitySuite.scala
new file mode 100644
index 0000000..bfa4273
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetAvroCompatibilitySuite.scala
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import java.nio.ByteBuffer
+import java.util.{List => JList, Map => JMap}
+
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.avro.AvroParquetWriter
+
+import org.apache.spark.sql.parquet.test.avro.{Nested, ParquetAvroCompat}
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.{Row, SQLContext}
+
+class ParquetAvroCompatibilitySuite extends ParquetCompatibilityTest {
+  import ParquetCompatibilityTest._
+
+  override val sqlContext: SQLContext = TestSQLContext
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+
+    val writer =
+      new AvroParquetWriter[ParquetAvroCompat](
+        new Path(parquetStore.getCanonicalPath),
+        ParquetAvroCompat.getClassSchema)
+
+    (0 until 10).foreach(i => writer.write(makeParquetAvroCompat(i)))
+    writer.close()
+  }
+
+  test("Read Parquet file generated by parquet-avro") {
+    logInfo(
+      s"""Schema of the Parquet file written by parquet-avro:
+         |${readParquetSchema(parquetStore.getCanonicalPath)}
+       """.stripMargin)
+
+    checkAnswer(sqlContext.read.parquet(parquetStore.getCanonicalPath), (0 
until 10).map { i =>
+      def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i)
+
+      Row(
+        i % 2 == 0,
+        i,
+        i.toLong * 10,
+        i.toFloat + 0.1f,
+        i.toDouble + 0.2d,
+        s"val_$i".getBytes,
+        s"val_$i",
+
+        nullable(i % 2 == 0: java.lang.Boolean),
+        nullable(i: Integer),
+        nullable(i.toLong: java.lang.Long),
+        nullable(i.toFloat + 0.1f: java.lang.Float),
+        nullable(i.toDouble + 0.2d: java.lang.Double),
+        nullable(s"val_$i".getBytes),
+        nullable(s"val_$i"),
+
+        Seq.tabulate(3)(n => s"arr_${i + n}"),
+        Seq.tabulate(3)(n => n.toString -> (i + n: Integer)).toMap,
+        Seq.tabulate(3) { n =>
+          (i + n).toString -> Seq.tabulate(3) { m =>
+            Row(Seq.tabulate(3)(j => i + j + m), s"val_${i + m}")
+          }
+        }.toMap)
+    })
+  }
+
+  def makeParquetAvroCompat(i: Int): ParquetAvroCompat = {
+    def nullable[T <: AnyRef] = makeNullable[T](i) _
+
+    def makeComplexColumn(i: Int): JMap[String, JList[Nested]] = {
+      mapAsJavaMap(Seq.tabulate(3) { n =>
+        (i + n).toString -> seqAsJavaList(Seq.tabulate(3) { m =>
+          Nested
+            .newBuilder()
+            .setNestedIntsColumn(seqAsJavaList(Seq.tabulate(3)(j => i + j + 
m)))
+            .setNestedStringColumn(s"val_${i + m}")
+            .build()
+        })
+      }.toMap)
+    }
+
+    ParquetAvroCompat
+      .newBuilder()
+      .setBoolColumn(i % 2 == 0)
+      .setIntColumn(i)
+      .setLongColumn(i.toLong * 10)
+      .setFloatColumn(i.toFloat + 0.1f)
+      .setDoubleColumn(i.toDouble + 0.2d)
+      .setBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes))
+      .setStringColumn(s"val_$i")
+
+      .setMaybeBoolColumn(nullable(i % 2 == 0: java.lang.Boolean))
+      .setMaybeIntColumn(nullable(i: Integer))
+      .setMaybeLongColumn(nullable(i.toLong: java.lang.Long))
+      .setMaybeFloatColumn(nullable(i.toFloat + 0.1f: java.lang.Float))
+      .setMaybeDoubleColumn(nullable(i.toDouble + 0.2d: java.lang.Double))
+      .setMaybeBinaryColumn(nullable(ByteBuffer.wrap(s"val_$i".getBytes)))
+      .setMaybeStringColumn(nullable(s"val_$i"))
+
+      .setStringsColumn(Seq.tabulate(3)(n => s"arr_${i + n}"))
+      .setStringToIntColumn(
+        mapAsJavaMap(Seq.tabulate(3)(n => n.toString -> (i + n: 
Integer)).toMap))
+      .setComplexColumn(makeComplexColumn(i))
+
+      .build()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4ffc27ca/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetCompatibilityTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetCompatibilityTest.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetCompatibilityTest.scala
new file mode 100644
index 0000000..b4cdfd9
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetCompatibilityTest.scala
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+import java.io.File
+
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.hadoop.ParquetFileReader
+import org.apache.parquet.schema.MessageType
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.util.Utils
+
+abstract class ParquetCompatibilityTest extends QueryTest with ParquetTest 
with BeforeAndAfterAll {
+  protected var parquetStore: File = _
+
+  override protected def beforeAll(): Unit = {
+    parquetStore = Utils.createTempDir(namePrefix = "parquet-compat_")
+    parquetStore.delete()
+  }
+
+  override protected def afterAll(): Unit = {
+    Utils.deleteRecursively(parquetStore)
+  }
+
+  def readParquetSchema(path: String): MessageType = {
+    val fsPath = new Path(path)
+    val fs = fsPath.getFileSystem(configuration)
+    val parquetFiles = 
fs.listStatus(fsPath).toSeq.filterNot(_.getPath.getName.startsWith("_"))
+    val footers = ParquetFileReader.readAllFootersInParallel(configuration, 
parquetFiles, true)
+    footers.head.getParquetMetadata.getFileMetaData.getSchema
+  }
+}
+
+object ParquetCompatibilityTest {
+  def makeNullable[T <: AnyRef](i: Int)(f: => T): T = {
+    if (i % 3 == 0) null.asInstanceOf[T] else f
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4ffc27ca/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetThriftCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetThriftCompatibilitySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetThriftCompatibilitySuite.scala
new file mode 100644
index 0000000..d22066c
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetThriftCompatibilitySuite.scala
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.parquet
+
+import java.nio.ByteBuffer
+import java.util.{List => JList, Map => JMap}
+
+import scala.collection.JavaConversions._
+
+import org.apache.hadoop.fs.Path
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.apache.parquet.thrift.ThriftParquetWriter
+
+import org.apache.spark.sql.parquet.test.thrift.{Nested, ParquetThriftCompat, 
Suit}
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.{Row, SQLContext}
+
+class ParquetThriftCompatibilitySuite extends ParquetCompatibilityTest {
+  import ParquetCompatibilityTest._
+
+  override val sqlContext: SQLContext = TestSQLContext
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+
+    val writer =
+      new ThriftParquetWriter[ParquetThriftCompat](
+        new Path(parquetStore.getCanonicalPath),
+        classOf[ParquetThriftCompat],
+        CompressionCodecName.SNAPPY)
+
+    (0 until 10).foreach(i => writer.write(makeParquetThriftCompat(i)))
+    writer.close()
+  }
+
+  test("Read Parquet file generated by parquet-thrift") {
+    logInfo(
+      s"""Schema of the Parquet file written by parquet-thrift:
+         |${readParquetSchema(parquetStore.getCanonicalPath)}
+       """.stripMargin)
+
+    checkAnswer(sqlContext.read.parquet(parquetStore.getCanonicalPath), (0 
until 10).map { i =>
+      def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i)
+
+      Row(
+        i % 2 == 0,
+        i.toByte,
+        (i + 1).toShort,
+        i + 2,
+        i.toLong * 10,
+        i.toDouble + 0.2d,
+        // Thrift `BINARY` values are actually unencoded `STRING` values, and 
thus are always
+        // treated as `BINARY (UTF8)` in parquet-thrift, since parquet-thrift 
always assume
+        // Thrift `STRING`s are encoded using UTF-8.
+        s"val_$i",
+        s"val_$i",
+        // Thrift ENUM values are converted to Parquet binaries containing 
UTF-8 strings
+        Suit.values()(i % 4).name(),
+
+        nullable(i % 2 == 0: java.lang.Boolean),
+        nullable(i.toByte: java.lang.Byte),
+        nullable((i + 1).toShort: java.lang.Short),
+        nullable(i + 2: Integer),
+        nullable((i * 10).toLong: java.lang.Long),
+        nullable(i.toDouble + 0.2d: java.lang.Double),
+        nullable(s"val_$i"),
+        nullable(s"val_$i"),
+        nullable(Suit.values()(i % 4).name()),
+
+        Seq.tabulate(3)(n => s"arr_${i + n}"),
+        // Thrift `SET`s are converted to Parquet `LIST`s
+        Seq(i),
+        Seq.tabulate(3)(n => (i + n: Integer) -> s"val_${i + n}").toMap,
+        Seq.tabulate(3) { n =>
+          (i + n) -> Seq.tabulate(3) { m =>
+            Row(Seq.tabulate(3)(j => i + j + m), s"val_${i + m}")
+          }
+        }.toMap)
+    })
+  }
+
+  def makeParquetThriftCompat(i: Int): ParquetThriftCompat = {
+    def makeComplexColumn(i: Int): JMap[Integer, JList[Nested]] = {
+      mapAsJavaMap(Seq.tabulate(3) { n =>
+        (i + n: Integer) -> seqAsJavaList(Seq.tabulate(3) { m =>
+          new Nested(
+            seqAsJavaList(Seq.tabulate(3)(j => i + j + m)),
+            s"val_${i + m}")
+        })
+      }.toMap)
+    }
+
+    val value =
+      new ParquetThriftCompat(
+        i % 2 == 0,
+        i.toByte,
+        (i + 1).toShort,
+        i + 2,
+        i.toLong * 10,
+        i.toDouble + 0.2d,
+        ByteBuffer.wrap(s"val_$i".getBytes),
+        s"val_$i",
+        Suit.values()(i % 4),
+
+        seqAsJavaList(Seq.tabulate(3)(n => s"arr_${i + n}")),
+        setAsJavaSet(Set(i)),
+        mapAsJavaMap(Seq.tabulate(3)(n => (i + n: Integer) -> s"val_${i + 
n}").toMap),
+        makeComplexColumn(i))
+
+    if (i % 3 == 0) {
+      value
+    } else {
+      value
+        .setMaybeBoolColumn(i % 2 == 0)
+        .setMaybeByteColumn(i.toByte)
+        .setMaybeShortColumn((i + 1).toShort)
+        .setMaybeIntColumn(i + 2)
+        .setMaybeLongColumn(i.toLong * 10)
+        .setMaybeDoubleColumn(i.toDouble + 0.2d)
+        .setMaybeBinaryColumn(ByteBuffer.wrap(s"val_$i".getBytes))
+        .setMaybeStringColumn(s"val_$i")
+        .setMaybeEnumColumn(Suit.values()(i % 4))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4ffc27ca/sql/core/src/test/scripts/gen-code.sh
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scripts/gen-code.sh 
b/sql/core/src/test/scripts/gen-code.sh
new file mode 100755
index 0000000..5d8d8ad
--- /dev/null
+++ b/sql/core/src/test/scripts/gen-code.sh
@@ -0,0 +1,31 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+cd $(dirname $0)/..
+BASEDIR=`pwd`
+cd -
+
+rm -rf $BASEDIR/gen-java
+mkdir -p $BASEDIR/gen-java
+
+thrift\
+    --gen java\
+    -out $BASEDIR/gen-java\
+    $BASEDIR/thrift/parquet-compat.thrift
+
+avro-tools idl $BASEDIR/avro/parquet-compat.avdl > 
$BASEDIR/avro/parquet-compat.avpr
+avro-tools compile -string protocol $BASEDIR/avro/parquet-compat.avpr 
$BASEDIR/gen-java

http://git-wip-us.apache.org/repos/asf/spark/blob/4ffc27ca/sql/core/src/test/thrift/parquet-compat.thrift
----------------------------------------------------------------------
diff --git a/sql/core/src/test/thrift/parquet-compat.thrift 
b/sql/core/src/test/thrift/parquet-compat.thrift
new file mode 100644
index 0000000..fa5ed8c
--- /dev/null
+++ b/sql/core/src/test/thrift/parquet-compat.thrift
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+namespace java org.apache.spark.sql.parquet.test.thrift
+
+enum Suit {
+    SPADES,
+    HEARTS,
+    DIAMONDS,
+    CLUBS
+}
+
+struct Nested {
+    1: required list<i32> nestedIntsColumn;
+    2: required string nestedStringColumn;
+}
+
+/**
+ * This is a test struct for testing parquet-thrift compatibility.
+ */
+struct ParquetThriftCompat {
+    1: required bool boolColumn;
+    2: required byte byteColumn;
+    3: required i16 shortColumn;
+    4: required i32 intColumn;
+    5: required i64 longColumn;
+    6: required double doubleColumn;
+    7: required binary binaryColumn;
+    8: required string stringColumn;
+    9: required Suit enumColumn
+
+    10: optional bool maybeBoolColumn;
+    11: optional byte maybeByteColumn;
+    12: optional i16 maybeShortColumn;
+    13: optional i32 maybeIntColumn;
+    14: optional i64 maybeLongColumn;
+    15: optional double maybeDoubleColumn;
+    16: optional binary maybeBinaryColumn;
+    17: optional string maybeStringColumn;
+    18: optional Suit maybeEnumColumn;
+
+    19: required list<string> stringsColumn;
+    20: required set<i32> intSetColumn;
+    21: required map<i32, string> intToStringColumn;
+    22: required map<i32, list<Nested>> complexColumn;
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4ffc27ca/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
new file mode 100644
index 0000000..bb5f1fe
--- /dev/null
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.hive
+
+import org.apache.spark.sql.hive.test.TestHive
+import org.apache.spark.sql.parquet.ParquetCompatibilityTest
+import org.apache.spark.sql.{Row, SQLConf, SQLContext}
+
+class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest {
+  import ParquetCompatibilityTest.makeNullable
+
+  override val sqlContext: SQLContext = TestHive
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+
+    withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") {
+      withTempTable("data") {
+        sqlContext.sql(
+          s"""CREATE TABLE parquet_compat(
+             |  bool_column BOOLEAN,
+             |  byte_column TINYINT,
+             |  short_column SMALLINT,
+             |  int_column INT,
+             |  long_column BIGINT,
+             |  float_column FLOAT,
+             |  double_column DOUBLE,
+             |
+             |  strings_column ARRAY<STRING>,
+             |  int_to_string_column MAP<INT, STRING>
+             |)
+             |STORED AS PARQUET
+             |LOCATION '${parquetStore.getCanonicalPath}'
+           """.stripMargin)
+
+        val schema = sqlContext.table("parquet_compat").schema
+        val rowRDD = sqlContext.sparkContext.parallelize(makeRows).coalesce(1)
+        sqlContext.createDataFrame(rowRDD, schema).registerTempTable("data")
+        sqlContext.sql("INSERT INTO TABLE parquet_compat SELECT * FROM data")
+      }
+    }
+  }
+
+  override protected def afterAll(): Unit = {
+    sqlContext.sql("DROP TABLE parquet_compat")
+  }
+
+  test("Read Parquet file generated by parquet-hive") {
+    logInfo(
+      s"""Schema of the Parquet file written by parquet-hive:
+         |${readParquetSchema(parquetStore.getCanonicalPath)}
+       """.stripMargin)
+
+    // Unfortunately parquet-hive doesn't add `UTF8` annotation to BINARY when 
writing strings.
+    // Have to assume all BINARY values are strings here.
+    withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") {
+      checkAnswer(sqlContext.read.parquet(parquetStore.getCanonicalPath), 
makeRows)
+    }
+  }
+
+  def makeRows: Seq[Row] = {
+    (0 until 10).map { i =>
+      def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i)
+
+      Row(
+        nullable(i % 2 == 0: java.lang.Boolean),
+        nullable(i.toByte: java.lang.Byte),
+        nullable((i + 1).toShort: java.lang.Short),
+        nullable(i + 2: Integer),
+        nullable(i.toLong * 10: java.lang.Long),
+        nullable(i.toFloat + 0.1f: java.lang.Float),
+        nullable(i.toDouble + 0.2d: java.lang.Double),
+        nullable(Seq.tabulate(3)(n => s"arr_${i + n}")),
+        nullable(Seq.tabulate(3)(n => (i + n: Integer) -> s"val_${i + 
n}").toMap))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/4ffc27ca/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
index c2e0980..9d79a4b 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/parquetSuites.scala
@@ -21,14 +21,16 @@ import java.io.File
 
 import org.scalatest.BeforeAndAfterAll
 
+import org.apache.spark.sql._
 import org.apache.spark.sql.execution.{ExecutedCommand, PhysicalRDD}
 import org.apache.spark.sql.hive.execution.HiveTableScan
+import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.hive.test.TestHive._
 import org.apache.spark.sql.hive.test.TestHive.implicits._
 import org.apache.spark.sql.parquet.{ParquetRelation2, ParquetTableScan}
 import org.apache.spark.sql.sources.{InsertIntoDataSource, 
InsertIntoHadoopFsRelation, LogicalRelation}
+import org.apache.spark.sql.test.SQLTestUtils
 import org.apache.spark.sql.types._
-import org.apache.spark.sql.{DataFrame, QueryTest, Row, SQLConf, SaveMode}
 import org.apache.spark.util.Utils
 
 // The data where the partitioning key exists only in the directory structure.
@@ -685,6 +687,31 @@ class ParquetSourceSuiteBase extends 
ParquetPartitioningTest {
 
     sql("drop table spark_6016_fix")
   }
+
+  test("SPARK-8811: compatibility with array of struct in Hive") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      withTable("array_of_struct") {
+        val conf = Seq(
+          HiveContext.CONVERT_METASTORE_PARQUET.key -> "false",
+          SQLConf.PARQUET_BINARY_AS_STRING.key -> "true",
+          SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key -> "true")
+
+        withSQLConf(conf: _*) {
+          sql(
+            s"""CREATE TABLE array_of_struct
+               |STORED AS PARQUET LOCATION '$path'
+               |AS SELECT '1st', '2nd', ARRAY(NAMED_STRUCT('a', 'val_a', 'b', 
'val_b'))
+             """.stripMargin)
+
+          checkAnswer(
+            sqlContext.read.parquet(path),
+            Row("1st", "2nd", Seq(Row("val_a", "val_b"))))
+        }
+      }
+    }
+  }
 }
 
 class ParquetDataSourceOnSourceSuite extends ParquetSourceSuiteBase {
@@ -762,7 +789,9 @@ class ParquetDataSourceOffSourceSuite extends 
ParquetSourceSuiteBase {
 /**
  * A collection of tests for parquet data with various forms of partitioning.
  */
-abstract class ParquetPartitioningTest extends QueryTest with 
BeforeAndAfterAll {
+abstract class ParquetPartitioningTest extends QueryTest with SQLTestUtils 
with BeforeAndAfterAll {
+  override def sqlContext: SQLContext = TestHive
+
   var partitionedTableDir: File = null
   var normalTableDir: File = null
   var partitionedTableDirWithKey: File = null


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to