Repository: spark
Updated Branches:
  refs/heads/branch-1.6 1887fa228 -> f14fb291d


[SPARK-11044][SQL] Parquet writer version fixed as version1

https://issues.apache.org/jira/browse/SPARK-11044

Spark writes a parquet file only with writer version1 ignoring the writer 
version given by user.

So, in this PR, it keeps the writer version if given or sets version1 as 
default.

Author: hyukjinkwon <gurwls...@gmail.com>
Author: HyukjinKwon <gurwls...@gmail.com>

Closes #9060 from HyukjinKwon/SPARK-11044.

(cherry picked from commit 7f8eb3bf6ed64eefc5472f5c5fb02e2db1e3f618)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f14fb291
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f14fb291
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f14fb291

Branch: refs/heads/branch-1.6
Commit: f14fb291d822720d8b578db0bdb656fb6c9ce590
Parents: 1887fa2
Author: hyukjinkwon <gurwls...@gmail.com>
Authored: Mon Nov 16 21:30:10 2015 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Tue Nov 17 03:09:33 2015 +0800

----------------------------------------------------------------------
 .../parquet/CatalystWriteSupport.scala          |  2 +-
 .../datasources/parquet/ParquetIOSuite.scala    | 34 ++++++++++++++++++++
 2 files changed, 35 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f14fb291/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
index 483363d..6862dea 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystWriteSupport.scala
@@ -429,7 +429,7 @@ private[parquet] object CatalystWriteSupport {
   def setSchema(schema: StructType, configuration: Configuration): Unit = {
     schema.map(_.name).foreach(CatalystSchemaConverter.checkFieldName)
     configuration.set(SPARK_ROW_SCHEMA, schema.json)
-    configuration.set(
+    configuration.setIfUnset(
       ParquetOutputFormat.WRITER_VERSION,
       ParquetProperties.WriterVersion.PARQUET_1_0.toString)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/f14fb291/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 78df363..2aa5dca 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.util.Collections
 
+import org.apache.parquet.column.{Encoding, ParquetProperties}
+
 import scala.collection.JavaConverters._
 import scala.reflect.ClassTag
 import scala.reflect.runtime.universe.TypeTag
@@ -534,6 +536,38 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
     }
   }
 
+  test("SPARK-11044 Parquet writer version fixed as version1 ") {
+    // For dictionary encoding, Parquet changes the encoding types according 
to its writer
+    // version. So, this test checks one of the encoding types in order to 
ensure that
+    // the file is written with writer version2.
+    withTempPath { dir =>
+      val clonedConf = new Configuration(hadoopConfiguration)
+      try {
+        // Write a Parquet file with writer version2.
+        hadoopConfiguration.set(ParquetOutputFormat.WRITER_VERSION,
+          ParquetProperties.WriterVersion.PARQUET_2_0.toString)
+
+        // By default, dictionary encoding is enabled from Parquet 1.2.0 but
+        // it is enabled just in case.
+        hadoopConfiguration.setBoolean(ParquetOutputFormat.ENABLE_DICTIONARY, 
true)
+        val path = s"${dir.getCanonicalPath}/part-r-0.parquet"
+        sqlContext.range(1 << 16).selectExpr("(id % 4) AS i")
+          .coalesce(1).write.mode("overwrite").parquet(path)
+
+        val blockMetadata = readFooter(new Path(path), 
hadoopConfiguration).getBlocks.asScala.head
+        val columnChunkMetadata = blockMetadata.getColumns.asScala.head
+
+        // If the file is written with version2, this should include
+        // Encoding.RLE_DICTIONARY type. For version1, it is 
Encoding.PLAIN_DICTIONARY
+        
assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY))
+      } finally {
+        // Manually clear the hadoop configuration for other tests.
+        hadoopConfiguration.clear()
+        clonedConf.asScala.foreach(entry => 
hadoopConfiguration.set(entry.getKey, entry.getValue))
+      }
+    }
+  }
+
   test("read dictionary encoded decimals written as INT32") {
     checkAnswer(
       // Decimal column in this file is encoded using plain dictionary


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to