Repository: spark
Updated Branches:
  refs/heads/master 662bb9667 -> a2f4cdceb


[SPARK-8580] [SQL] Refactors ParquetHiveCompatibilitySuite and adds more test 
cases

This PR refactors `ParquetHiveCompatibilitySuite` so that it's easier to add 
new test cases.

Hit two bugs, SPARK-10177 and HIVE-11625, while working on this, added test 
cases for them and marked as ignored for now. SPARK-10177 will be addressed in 
a separate PR.

Author: Cheng Lian <l...@databricks.com>

Closes #8392 from liancheng/spark-8580/parquet-hive-compat-tests.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2f4cdce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2f4cdce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2f4cdce

Branch: refs/heads/master
Commit: a2f4cdceba32aaa0df59df335ca0ce1ac73fc6c2
Parents: 662bb96
Author: Cheng Lian <l...@databricks.com>
Authored: Mon Aug 24 14:11:19 2015 -0700
Committer: Davies Liu <davies....@gmail.com>
Committed: Mon Aug 24 14:11:19 2015 -0700

----------------------------------------------------------------------
 .../hive/ParquetHiveCompatibilitySuite.scala    | 132 +++++++++++++------
 1 file changed, 93 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a2f4cdce/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
index 13452e7..bc30180 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/ParquetHiveCompatibilitySuite.scala
@@ -17,15 +17,17 @@
 
 package org.apache.spark.sql.hive
 
+import java.sql.Timestamp
+import java.util.{Locale, TimeZone}
+
 import org.apache.hadoop.hive.conf.HiveConf
+import org.scalatest.BeforeAndAfterAll
 
-import org.apache.spark.sql.hive.test.TestHive
 import 
org.apache.spark.sql.execution.datasources.parquet.ParquetCompatibilityTest
+import org.apache.spark.sql.hive.test.TestHive
 import org.apache.spark.sql.{Row, SQLConf, SQLContext}
 
-class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest {
-  import ParquetCompatibilityTest.makeNullable
-
+class ParquetHiveCompatibilitySuite extends ParquetCompatibilityTest with 
BeforeAndAfterAll {
   override def _sqlContext: SQLContext = TestHive
   private val sqlContext = _sqlContext
 
@@ -35,69 +37,121 @@ class ParquetHiveCompatibilitySuite extends 
ParquetCompatibilityTest {
    */
   private val stagingDir = new HiveConf().getVar(HiveConf.ConfVars.STAGINGDIR)
 
-  test("Read Parquet file generated by parquet-hive") {
+  private val originalTimeZone = TimeZone.getDefault
+  private val originalLocale = Locale.getDefault
+
+  protected override def beforeAll(): Unit = {
+    TimeZone.setDefault(TimeZone.getTimeZone("America/Los_Angeles"))
+    Locale.setDefault(Locale.US)
+  }
+
+  override protected def afterAll(): Unit = {
+    TimeZone.setDefault(originalTimeZone)
+    Locale.setDefault(originalLocale)
+  }
+
+  override protected def logParquetSchema(path: String): Unit = {
+    val schema = readParquetSchema(path, { path =>
+      !path.getName.startsWith("_") && !path.getName.startsWith(stagingDir)
+    })
+
+    logInfo(
+      s"""Schema of the Parquet file written by parquet-avro:
+         |$schema
+       """.stripMargin)
+  }
+
+  private def testParquetHiveCompatibility(row: Row, hiveTypes: String*): Unit 
= {
     withTable("parquet_compat") {
       withTempPath { dir =>
         val path = dir.getCanonicalPath
 
+        // Hive columns are always nullable, so here we append a all-null row.
+        val rows = row :: Row(Seq.fill(row.length)(null): _*) :: Nil
+
+        // Don't convert Hive metastore Parquet tables to let Hive write those 
Parquet files.
         withSQLConf(HiveContext.CONVERT_METASTORE_PARQUET.key -> "false") {
           withTempTable("data") {
-            sqlContext.sql(
+            val fields = hiveTypes.zipWithIndex.map { case (typ, index) => s"  
col_$index $typ" }
+
+            val ddl =
               s"""CREATE TABLE parquet_compat(
-                 |  bool_column BOOLEAN,
-                 |  byte_column TINYINT,
-                 |  short_column SMALLINT,
-                 |  int_column INT,
-                 |  long_column BIGINT,
-                 |  float_column FLOAT,
-                 |  double_column DOUBLE,
-                 |
-                 |  strings_column ARRAY<STRING>,
-                 |  int_to_string_column MAP<INT, STRING>
+                 |${fields.mkString(",\n")}
                  |)
                  |STORED AS PARQUET
                  |LOCATION '$path'
+               """.stripMargin
+
+            logInfo(
+              s"""Creating testing Parquet table with the following DDL:
+                 |$ddl
                """.stripMargin)
 
+            sqlContext.sql(ddl)
+
             val schema = sqlContext.table("parquet_compat").schema
-            val rowRDD = 
sqlContext.sparkContext.parallelize(makeRows).coalesce(1)
+            val rowRDD = sqlContext.sparkContext.parallelize(rows).coalesce(1)
             sqlContext.createDataFrame(rowRDD, 
schema).registerTempTable("data")
             sqlContext.sql("INSERT INTO TABLE parquet_compat SELECT * FROM 
data")
           }
         }
 
-        val schema = readParquetSchema(path, { path =>
-          !path.getName.startsWith("_") && !path.getName.startsWith(stagingDir)
-        })
-
-        logInfo(
-          s"""Schema of the Parquet file written by parquet-hive:
-             |$schema
-           """.stripMargin)
+        logParquetSchema(path)
 
         // Unfortunately parquet-hive doesn't add `UTF8` annotation to BINARY 
when writing strings.
         // Have to assume all BINARY values are strings here.
         withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") {
-          checkAnswer(sqlContext.read.parquet(path), makeRows)
+          checkAnswer(sqlContext.read.parquet(path), rows)
         }
       }
     }
   }
 
-  def makeRows: Seq[Row] = {
-    (0 until 10).map { i =>
-      def nullable[T <: AnyRef]: ( => T) => T = makeNullable[T](i)
+  test("simple primitives") {
+    testParquetHiveCompatibility(
+      Row(true, 1.toByte, 2.toShort, 3, 4.toLong, 5.1f, 6.1d, "foo"),
+      "BOOLEAN", "TINYINT", "SMALLINT", "INT", "BIGINT", "FLOAT", "DOUBLE", 
"STRING")
+  }
 
+  ignore("SPARK-10177 timestamp") {
+    testParquetHiveCompatibility(Row(Timestamp.valueOf("2015-08-24 
00:31:00")), "TIMESTAMP")
+  }
+
+  test("array") {
+    testParquetHiveCompatibility(
       Row(
-        nullable(i % 2 == 0: java.lang.Boolean),
-        nullable(i.toByte: java.lang.Byte),
-        nullable((i + 1).toShort: java.lang.Short),
-        nullable(i + 2: Integer),
-        nullable(i.toLong * 10: java.lang.Long),
-        nullable(i.toFloat + 0.1f: java.lang.Float),
-        nullable(i.toDouble + 0.2d: java.lang.Double),
-        nullable(Seq.tabulate(3)(n => s"arr_${i + n}")),
-        nullable(Seq.tabulate(3)(n => (i + n: Integer) -> s"val_${i + 
n}").toMap))
-    }
+        Seq[Integer](1: Integer, null, 2: Integer, null),
+        Seq[String]("foo", null, "bar", null),
+        Seq[Seq[Integer]](
+          Seq[Integer](1: Integer, null),
+          Seq[Integer](2: Integer, null))),
+      "ARRAY<INT>",
+      "ARRAY<STRING>",
+      "ARRAY<ARRAY<INT>>")
+  }
+
+  test("map") {
+    testParquetHiveCompatibility(
+      Row(
+        Map[Integer, String](
+          (1: Integer) -> "foo",
+          (2: Integer) -> null)),
+      "MAP<INT, STRING>")
+  }
+
+  // HIVE-11625: Parquet map entries with null keys are dropped by Hive
+  ignore("map entries with null keys") {
+    testParquetHiveCompatibility(
+      Row(
+        Map[Integer, String](
+          null.asInstanceOf[Integer] -> "bar",
+          null.asInstanceOf[Integer] -> null)),
+      "MAP<INT, STRING>")
+  }
+
+  test("struct") {
+    testParquetHiveCompatibility(
+      Row(Row(1, Seq("foo", "bar", null))),
+      "STRUCT<f0: INT, f1: ARRAY<STRING>>")
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to