Repository: spark
Updated Branches:
  refs/heads/branch-2.0 847ccf793 -> 810472a92


[SPARK-15719][SQL] Disables writing Parquet summary files by default

## What changes were proposed in this pull request?

This PR disables writing Parquet summary files by default (i.e., when Hadoop 
configuration "parquet.enable.summary-metadata" is not set).

Please refer to [SPARK-15719][1] for more details.

## How was this patch tested?

New test case added in `ParquetQuerySuite` to check no summary files are 
written by default.

[1]: https://issues.apache.org/jira/browse/SPARK-15719

Author: Cheng Lian <l...@databricks.com>

Closes #13455 from liancheng/spark-15719-disable-parquet-summary-files.

(cherry picked from commit 431542765785304edb76a19885fbc5f9b8ae7d64)
Signed-off-by: Cheng Lian <l...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/810472a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/810472a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/810472a9

Branch: refs/heads/branch-2.0
Commit: 810472a928811f078ef42825d1cf16cf711fcefb
Parents: 847ccf7
Author: Cheng Lian <l...@databricks.com>
Authored: Thu Jun 2 16:16:27 2016 -0700
Committer: Cheng Lian <l...@databricks.com>
Committed: Thu Jun 2 16:16:37 2016 -0700

----------------------------------------------------------------------
 .../datasources/parquet/ParquetFileFormat.scala |  7 ++++-
 .../datasources/parquet/ParquetIOSuite.scala    | 20 +++++++------
 .../datasources/parquet/ParquetQuerySuite.scala | 31 ++++++++++++++++----
 .../parquet/ParquetSchemaSuite.scala            | 19 ++----------
 .../sources/ParquetHadoopFsRelationSuite.scala  | 29 ++++++++++--------
 5 files changed, 62 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/810472a9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index ff7962d..ada9cd4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -124,6 +124,11 @@ private[sql] class ParquetFileFormat
     // Sets compression scheme
     conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodec)
 
+    // SPARK-15719: Disables writing Parquet summary files by default.
+    if (conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null) {
+      conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false)
+    }
+
     new OutputWriterFactory {
       override def newInstance(
           path: String,
@@ -786,7 +791,7 @@ private[sql] object ParquetFileFormat extends Logging {
     //
     // Parquet requires `FileStatus`es to read footers.  Here we try to send 
cached `FileStatus`es
     // to executor side to avoid fetching them again.  However, `FileStatus` 
is not `Serializable`
-    // but only `Writable`.  What makes it worth, for some reason, 
`FileStatus` doesn't play well
+    // but only `Writable`.  What makes it worse, for some reason, 
`FileStatus` doesn't play well
     // with `SerializableWritable[T]` and always causes a weird 
`IllegalStateException`.  These
     // facts virtually prevents us to serialize `FileStatus`es.
     //

http://git-wip-us.apache.org/repos/asf/spark/blob/810472a9/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index d0107aa..92f2db3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -515,17 +515,19 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSQLContext {
 
     val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions)
 
-    withTempPath { dir =>
-      val path = s"${dir.getCanonicalPath}/part-r-0.parquet"
-      spark.range(1 << 16).selectExpr("(id % 4) AS i")
-        
.coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path)
+    withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
+      withTempPath { dir =>
+        val path = s"${dir.getCanonicalPath}/part-r-0.parquet"
+        spark.range(1 << 16).selectExpr("(id % 4) AS i")
+          
.coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path)
 
-      val blockMetadata = readFooter(new Path(path), 
hadoopConf).getBlocks.asScala.head
-      val columnChunkMetadata = blockMetadata.getColumns.asScala.head
+        val blockMetadata = readFooter(new Path(path), 
hadoopConf).getBlocks.asScala.head
+        val columnChunkMetadata = blockMetadata.getColumns.asScala.head
 
-      // If the file is written with version2, this should include
-      // Encoding.RLE_DICTIONARY type. For version1, it is 
Encoding.PLAIN_DICTIONARY
-      
assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY))
+        // If the file is written with version2, this should include
+        // Encoding.RLE_DICTIONARY type. For version1, it is 
Encoding.PLAIN_DICTIONARY
+        
assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY))
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/810472a9/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index 0a2fb0e..78b97f6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -19,7 +19,8 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.io.File
 
-import org.apache.hadoop.fs.Path
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.parquet.hadoop.ParquetOutputFormat
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
@@ -148,13 +149,18 @@ class ParquetQuerySuite extends QueryTest with 
ParquetTest with SharedSQLContext
       }
     }
 
-    withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
-      SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true") {
+    withSQLConf(
+      SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
+      SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true",
+      ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true"
+    ) {
       testSchemaMerging(2)
     }
 
-    withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
-      SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "false") {
+    withSQLConf(
+      SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
+      SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "false"
+    ) {
       testSchemaMerging(3)
     }
   }
@@ -604,6 +610,21 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
       }
     }
   }
+
+  test("SPARK-15719: disable writing summary files by default") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      spark.range(3).write.parquet(path)
+
+      val fs = FileSystem.get(sparkContext.hadoopConfiguration)
+      val files = fs.listFiles(new Path(path), true)
+
+      while (files.hasNext) {
+        val file = files.next
+        assert(!file.getPath.getName.contains("_metadata"))
+      }
+    }
+  }
 }
 
 object TestingUDT {

http://git-wip-us.apache.org/repos/asf/spark/blob/810472a9/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
index 6db6492..1bc6f70 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala
@@ -451,31 +451,18 @@ class ParquetSchemaSuite extends ParquetSchemaTest {
   }
 
   test("schema merging failure error message") {
-    withTempPath { dir =>
-      val path = dir.getCanonicalPath
-      spark.range(3).write.parquet(s"$path/p=1")
-      spark.range(3).selectExpr("CAST(id AS INT) AS 
id").write.parquet(s"$path/p=2")
-
-      val message = intercept[SparkException] {
-        spark.read.option("mergeSchema", "true").parquet(path).schema
-      }.getMessage
+    import testImplicits._
 
-      assert(message.contains("Failed merging schema of file"))
-    }
-
-    // test for second merging (after read Parquet schema in parallel done)
     withTempPath { dir =>
       val path = dir.getCanonicalPath
       spark.range(3).write.parquet(s"$path/p=1")
-      spark.range(3).selectExpr("CAST(id AS INT) AS 
id").write.parquet(s"$path/p=2")
-
-      spark.sparkContext.conf.set("spark.default.parallelism", "20")
+      spark.range(3).select('id cast IntegerType as 
'id).write.parquet(s"$path/p=2")
 
       val message = intercept[SparkException] {
         spark.read.option("mergeSchema", "true").parquet(path).schema
       }.getMessage
 
-      assert(message.contains("Failed merging schema:"))
+      assert(message.contains("Failed merging schema"))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/810472a9/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
index f9a1d16..8aa018d 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala
@@ -21,6 +21,7 @@ import java.io.File
 
 import com.google.common.io.Files
 import org.apache.hadoop.fs.Path
+import org.apache.parquet.hadoop.ParquetOutputFormat
 
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.sql._
@@ -124,23 +125,25 @@ class ParquetHadoopFsRelationSuite extends 
HadoopFsRelationTest {
   }
 
   test("SPARK-8604: Parquet data source should write summary file while doing 
appending") {
-    withTempPath { dir =>
-      val path = dir.getCanonicalPath
-      val df = spark.range(0, 5).toDF()
-      df.write.mode(SaveMode.Overwrite).parquet(path)
+    withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") {
+      withTempPath { dir =>
+        val path = dir.getCanonicalPath
+        val df = spark.range(0, 5).toDF()
+        df.write.mode(SaveMode.Overwrite).parquet(path)
 
-      val summaryPath = new Path(path, "_metadata")
-      val commonSummaryPath = new Path(path, "_common_metadata")
+        val summaryPath = new Path(path, "_metadata")
+        val commonSummaryPath = new Path(path, "_common_metadata")
 
-      val fs = summaryPath.getFileSystem(spark.sessionState.newHadoopConf())
-      fs.delete(summaryPath, true)
-      fs.delete(commonSummaryPath, true)
+        val fs = summaryPath.getFileSystem(spark.sessionState.newHadoopConf())
+        fs.delete(summaryPath, true)
+        fs.delete(commonSummaryPath, true)
 
-      df.write.mode(SaveMode.Append).parquet(path)
-      checkAnswer(spark.read.parquet(path), df.union(df))
+        df.write.mode(SaveMode.Append).parquet(path)
+        checkAnswer(spark.read.parquet(path), df.union(df))
 
-      assert(fs.exists(summaryPath))
-      assert(fs.exists(commonSummaryPath))
+        assert(fs.exists(summaryPath))
+        assert(fs.exists(commonSummaryPath))
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to