Repository: spark
Updated Branches:
  refs/heads/master fbad920db -> 75d202073


[SPARK-11694][FOLLOW-UP] Clean up imports, use a common function for metadata 
and add a test for FIXED_LEN_BYTE_ARRAY

As discussed https://github.com/apache/spark/pull/9660 
https://github.com/apache/spark/pull/9060, I cleaned up unused imports, added a 
test for fixed-length byte array and used a common function for writing 
metadata for Parquet.

For the test for fixed-length byte array, I have tested and checked the 
encoding types with 
[parquet-tools](https://github.com/Parquet/parquet-mr/tree/master/parquet-tools).

Author: hyukjinkwon <gurwls...@gmail.com>

Closes #9754 from HyukjinKwon/SPARK-11694-followup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/75d20207
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/75d20207
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/75d20207

Branch: refs/heads/master
Commit: 75d202073143d5a7f943890d8682b5b0cf9e3092
Parents: fbad920
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Tue Nov 17 14:35:00 2015 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Tue Nov 17 14:35:00 2015 +0800

----------------------------------------------------------------------
 .../src/test/resources/dec-in-fixed-len.parquet | Bin 0 -> 460 bytes
 .../datasources/parquet/ParquetIOSuite.scala    |  42 +++++++------------
 2 files changed, 15 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/75d20207/sql/core/src/test/resources/dec-in-fixed-len.parquet
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/dec-in-fixed-len.parquet 
b/sql/core/src/test/resources/dec-in-fixed-len.parquet
new file mode 100644
index 0000000..6ad37d5
Binary files /dev/null and 
b/sql/core/src/test/resources/dec-in-fixed-len.parquet differ

http://git-wip-us.apache.org/repos/asf/spark/blob/75d20207/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index a148fac..177ab42 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -17,8 +17,6 @@
 
 package org.apache.spark.sql.execution.datasources.parquet
 
-import java.util.Collections
-
 import org.apache.parquet.column.{Encoding, ParquetProperties}
 
 import scala.collection.JavaConverters._
@@ -33,7 +31,7 @@ import org.apache.parquet.example.data.{Group, GroupWriter}
 import org.apache.parquet.hadoop._
 import org.apache.parquet.hadoop.api.WriteSupport
 import org.apache.parquet.hadoop.api.WriteSupport.WriteContext
-import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, 
ParquetMetadata}
+import org.apache.parquet.hadoop.metadata.CompressionCodecName
 import org.apache.parquet.io.api.RecordConsumer
 import org.apache.parquet.schema.{MessageType, MessageTypeParser}
 
@@ -243,15 +241,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
       """.stripMargin)
 
     withTempPath { location =>
-      val extraMetadata = Map.empty[String, String].asJava
-      val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, 
"Spark")
       val path = new Path(location.getCanonicalPath)
-      val footer = List(
-        new Footer(path, new ParquetMetadata(fileMetadata, 
Collections.emptyList()))
-      ).asJava
-
-      ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, 
path, footer)
-
+      val conf = sparkContext.hadoopConfiguration
+      writeMetadata(parquetSchema, path, conf)
       val errorMessage = intercept[Throwable] {
         sqlContext.read.parquet(path.toString).printSchema()
       }.toString
@@ -267,20 +259,14 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
         |}
       """.stripMargin)
 
+    val expectedSparkTypes = Seq(StringType, BinaryType)
+
     withTempPath { location =>
-      val extraMetadata = Map.empty[String, String].asJava
-      val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, 
"Spark")
       val path = new Path(location.getCanonicalPath)
-      val footer = List(
-        new Footer(path, new ParquetMetadata(fileMetadata, 
Collections.emptyList()))
-      ).asJava
-
-      ParquetFileWriter.writeMetadataFile(sparkContext.hadoopConfiguration, 
path, footer)
-
-      val jsonDataType = 
sqlContext.read.parquet(path.toString).schema(0).dataType
-      assert(jsonDataType === StringType)
-      val bsonDataType = 
sqlContext.read.parquet(path.toString).schema(1).dataType
-      assert(bsonDataType === BinaryType)
+      val conf = sparkContext.hadoopConfiguration
+      writeMetadata(parquetSchema, path, conf)
+      val sparkTypes = 
sqlContext.read.parquet(path.toString).schema.map(_.dataType)
+      assert(sparkTypes === expectedSparkTypes)
     }
   }
 
@@ -607,10 +593,12 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
       sqlContext.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 
'i64_dec))
   }
 
-  // TODO Adds test case for reading dictionary encoded decimals written as 
`FIXED_LEN_BYTE_ARRAY`
-  // The Parquet writer version Spark 1.6 and prior versions use is 
`PARQUET_1_0`, which doesn't
-  // provide dictionary encoding support for `FIXED_LEN_BYTE_ARRAY`.  Should 
add a test here once
-  // we upgrade to `PARQUET_2_0`.
+  test("read dictionary encoded decimals written as FIXED_LEN_BYTE_ARRAY") {
+    checkAnswer(
+      // Decimal column in this file is encoded using plain dictionary
+      readResourceParquetFile("dec-in-fixed-len.parquet"),
+      sqlContext.range(1 << 4).select('id % 10 cast DecimalType(10, 2) as 
'fixed_len_dec))
+  }
 }
 
 class JobCommitFailureParquetOutputCommitter(outputPath: Path, context: 
TaskAttemptContext)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to