Repository: spark Updated Branches: refs/heads/branch-2.0 847ccf793 -> 810472a92
[SPARK-15719][SQL] Disables writing Parquet summary files by default ## What changes were proposed in this pull request? This PR disables writing Parquet summary files by default (i.e., when Hadoop configuration "parquet.enable.summary-metadata" is not set). Please refer to [SPARK-15719][1] for more details. ## How was this patch tested? New test case added in `ParquetQuerySuite` to check no summary files are written by default. [1]: https://issues.apache.org/jira/browse/SPARK-15719 Author: Cheng Lian <l...@databricks.com> Closes #13455 from liancheng/spark-15719-disable-parquet-summary-files. (cherry picked from commit 431542765785304edb76a19885fbc5f9b8ae7d64) Signed-off-by: Cheng Lian <l...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/810472a9 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/810472a9 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/810472a9 Branch: refs/heads/branch-2.0 Commit: 810472a928811f078ef42825d1cf16cf711fcefb Parents: 847ccf7 Author: Cheng Lian <l...@databricks.com> Authored: Thu Jun 2 16:16:27 2016 -0700 Committer: Cheng Lian <l...@databricks.com> Committed: Thu Jun 2 16:16:37 2016 -0700 ---------------------------------------------------------------------- .../datasources/parquet/ParquetFileFormat.scala | 7 ++++- .../datasources/parquet/ParquetIOSuite.scala | 20 +++++++------ .../datasources/parquet/ParquetQuerySuite.scala | 31 ++++++++++++++++---- .../parquet/ParquetSchemaSuite.scala | 19 ++---------- .../sources/ParquetHadoopFsRelationSuite.scala | 29 ++++++++++-------- 5 files changed, 62 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/810472a9/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala index ff7962d..ada9cd4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala @@ -124,6 +124,11 @@ private[sql] class ParquetFileFormat // Sets compression scheme conf.set(ParquetOutputFormat.COMPRESSION, parquetOptions.compressionCodec) + // SPARK-15719: Disables writing Parquet summary files by default. + if (conf.get(ParquetOutputFormat.ENABLE_JOB_SUMMARY) == null) { + conf.setBoolean(ParquetOutputFormat.ENABLE_JOB_SUMMARY, false) + } + new OutputWriterFactory { override def newInstance( path: String, @@ -786,7 +791,7 @@ private[sql] object ParquetFileFormat extends Logging { // // Parquet requires `FileStatus`es to read footers. Here we try to send cached `FileStatus`es // to executor side to avoid fetching them again. However, `FileStatus` is not `Serializable` - // but only `Writable`. What makes it worth, for some reason, `FileStatus` doesn't play well + // but only `Writable`. What makes it worse, for some reason, `FileStatus` doesn't play well // with `SerializableWritable[T]` and always causes a weird `IllegalStateException`. These // facts virtually prevents us to serialize `FileStatus`es. // http://git-wip-us.apache.org/repos/asf/spark/blob/810472a9/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index d0107aa..92f2db3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -515,17 +515,19 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { val hadoopConf = spark.sessionState.newHadoopConfWithOptions(extraOptions) - withTempPath { dir => - val path = s"${dir.getCanonicalPath}/part-r-0.parquet" - spark.range(1 << 16).selectExpr("(id % 4) AS i") - .coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path) + withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") { + withTempPath { dir => + val path = s"${dir.getCanonicalPath}/part-r-0.parquet" + spark.range(1 << 16).selectExpr("(id % 4) AS i") + .coalesce(1).write.options(extraOptions).mode("overwrite").parquet(path) - val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head - val columnChunkMetadata = blockMetadata.getColumns.asScala.head + val blockMetadata = readFooter(new Path(path), hadoopConf).getBlocks.asScala.head + val columnChunkMetadata = blockMetadata.getColumns.asScala.head - // If the file is written with version2, this should include - // Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY - assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY)) + // If the file is written with version2, this should include + // Encoding.RLE_DICTIONARY type. For version1, it is Encoding.PLAIN_DICTIONARY + assert(columnChunkMetadata.getEncodings.contains(Encoding.RLE_DICTIONARY)) + } } } http://git-wip-us.apache.org/repos/asf/spark/blob/810472a9/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index 0a2fb0e..78b97f6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -19,7 +19,8 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.File -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier} @@ -148,13 +149,18 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } - withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", - SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true") { + withSQLConf( + SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", + SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true", + ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true" + ) { testSchemaMerging(2) } - withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", - SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "false") { + withSQLConf( + SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", + SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "false" + ) { testSchemaMerging(3) } } @@ -604,6 +610,21 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext } } } + + test("SPARK-15719: disable writing summary files by default") { + withTempPath { dir => + val path = dir.getCanonicalPath + spark.range(3).write.parquet(path) + + val fs = FileSystem.get(sparkContext.hadoopConfiguration) + val files = fs.listFiles(new Path(path), true) + + while (files.hasNext) { + val file = files.next + assert(!file.getPath.getName.contains("_metadata")) + } + } + } } object TestingUDT { http://git-wip-us.apache.org/repos/asf/spark/blob/810472a9/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala index 6db6492..1bc6f70 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaSuite.scala @@ -451,31 +451,18 @@ class ParquetSchemaSuite extends ParquetSchemaTest { } test("schema merging failure error message") { - withTempPath { dir => - val path = dir.getCanonicalPath - spark.range(3).write.parquet(s"$path/p=1") - spark.range(3).selectExpr("CAST(id AS INT) AS id").write.parquet(s"$path/p=2") - - val message = intercept[SparkException] { - spark.read.option("mergeSchema", "true").parquet(path).schema - }.getMessage + import testImplicits._ - assert(message.contains("Failed merging schema of file")) - } - - // test for second merging (after read Parquet schema in parallel done) withTempPath { dir => val path = dir.getCanonicalPath spark.range(3).write.parquet(s"$path/p=1") - spark.range(3).selectExpr("CAST(id AS INT) AS id").write.parquet(s"$path/p=2") - - spark.sparkContext.conf.set("spark.default.parallelism", "20") + spark.range(3).select('id cast IntegerType as 'id).write.parquet(s"$path/p=2") val message = intercept[SparkException] { spark.read.option("mergeSchema", "true").parquet(path).schema }.getMessage - assert(message.contains("Failed merging schema:")) + assert(message.contains("Failed merging schema")) } } http://git-wip-us.apache.org/repos/asf/spark/blob/810472a9/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala index f9a1d16..8aa018d 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/ParquetHadoopFsRelationSuite.scala @@ -21,6 +21,7 @@ import java.io.File import com.google.common.io.Files import org.apache.hadoop.fs.Path +import org.apache.parquet.hadoop.ParquetOutputFormat import org.apache.spark.deploy.SparkHadoopUtil import org.apache.spark.sql._ @@ -124,23 +125,25 @@ class ParquetHadoopFsRelationSuite extends HadoopFsRelationTest { } test("SPARK-8604: Parquet data source should write summary file while doing appending") { - withTempPath { dir => - val path = dir.getCanonicalPath - val df = spark.range(0, 5).toDF() - df.write.mode(SaveMode.Overwrite).parquet(path) + withSQLConf(ParquetOutputFormat.ENABLE_JOB_SUMMARY -> "true") { + withTempPath { dir => + val path = dir.getCanonicalPath + val df = spark.range(0, 5).toDF() + df.write.mode(SaveMode.Overwrite).parquet(path) - val summaryPath = new Path(path, "_metadata") - val commonSummaryPath = new Path(path, "_common_metadata") + val summaryPath = new Path(path, "_metadata") + val commonSummaryPath = new Path(path, "_common_metadata") - val fs = summaryPath.getFileSystem(spark.sessionState.newHadoopConf()) - fs.delete(summaryPath, true) - fs.delete(commonSummaryPath, true) + val fs = summaryPath.getFileSystem(spark.sessionState.newHadoopConf()) + fs.delete(summaryPath, true) + fs.delete(commonSummaryPath, true) - df.write.mode(SaveMode.Append).parquet(path) - checkAnswer(spark.read.parquet(path), df.union(df)) + df.write.mode(SaveMode.Append).parquet(path) + checkAnswer(spark.read.parquet(path), df.union(df)) - assert(fs.exists(summaryPath)) - assert(fs.exists(commonSummaryPath)) + assert(fs.exists(summaryPath)) + assert(fs.exists(commonSummaryPath)) + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org