Repository: spark Updated Branches: refs/heads/master 5ba2d4406 -> 6175d6cfe
[SPARK-8838] [SQL] Add config to enable/disable merging part-files when merging parquet schema JIRA: https://issues.apache.org/jira/browse/SPARK-8838 Currently all part-files are merged when merging parquet schema. However, in case there are many part-files and we can make sure that all the part-files have the same schema as their summary file. If so, we provide a configuration to disable merging part-files when merging parquet schema. In short, we need to merge parquet schema because different summary files may contain different schema. But the part-files are confirmed to have the same schema with summary files. Author: Liang-Chi Hsieh <vii...@appier.com> Closes #7238 from viirya/option_partfile_merge and squashes the following commits: 71d5b5f [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge 8816f44 [Liang-Chi Hsieh] For comments. dbc8e6b [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge afc2fa1 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge d4ed7e6 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge df43027 [Liang-Chi Hsieh] Get dataStatuses' partitions based on all paths. 4eb2f00 [Liang-Chi Hsieh] Use given parameter. ea8f6e5 [Liang-Chi Hsieh] Correct the code comments. a57be0e [Liang-Chi Hsieh] Merge part-files if there are no summary files. 47df981 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge 4caf293 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into option_partfile_merge 0e734e0 [Liang-Chi Hsieh] Use correct API. 3b6be5b [Liang-Chi Hsieh] Fix key not found. 4bdd7e0 [Liang-Chi Hsieh] Don't read footer files if we can skip them. 8bbebcb [Liang-Chi Hsieh] Figure out how to test the config. bbd4ce7 [Liang-Chi Hsieh] Add config to enable/disable merging part-files when merging parquet schema. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6175d6cf Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6175d6cf Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6175d6cf Branch: refs/heads/master Commit: 6175d6cfe795fbd88e3ee713fac375038a3993a8 Parents: 5ba2d44 Author: Liang-Chi Hsieh <vii...@appier.com> Authored: Thu Jul 30 17:45:30 2015 +0800 Committer: Cheng Lian <l...@databricks.com> Committed: Thu Jul 30 17:45:30 2015 +0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/sql/SQLConf.scala | 7 +++++ .../spark/sql/parquet/ParquetRelation.scala | 19 +++++++++++++- .../spark/sql/parquet/ParquetQuerySuite.scala | 27 ++++++++++++++++++++ 3 files changed, 52 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6175d6cf/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index cdb0c7a..2564bbd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -247,6 +247,13 @@ private[spark] object SQLConf { "otherwise the schema is picked from the summary file or a random data file " + "if no summary file is available.") + val PARQUET_SCHEMA_RESPECT_SUMMARIES = booleanConf("spark.sql.parquet.respectSummaryFiles", + defaultValue = Some(false), + doc = "When true, we make assumption that all part-files of Parquet are consistent with " + + "summary files and we will ignore them when merging schema. Otherwise, if this is " + + "false, which is the default, we will merge all part-files. This should be considered " + + "as expert-only option, and shouldn't be enabled before knowing what it means exactly.") + val PARQUET_BINARY_AS_STRING = booleanConf("spark.sql.parquet.binaryAsString", defaultValue = Some(false), doc = "Some other Parquet-producing systems, in particular Impala and older versions of " + http://git-wip-us.apache.org/repos/asf/spark/blob/6175d6cf/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala index 1a8176d..b4337a4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala @@ -124,6 +124,9 @@ private[sql] class ParquetRelation( .map(_.toBoolean) .getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED)) + private val mergeRespectSummaries = + sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES) + private val maybeMetastoreSchema = parameters .get(ParquetRelation.METASTORE_SCHEMA) .map(DataType.fromJson(_).asInstanceOf[StructType]) @@ -421,7 +424,21 @@ private[sql] class ParquetRelation( val filesToTouch = if (shouldMergeSchemas) { // Also includes summary files, 'cause there might be empty partition directories. - (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq + + // If mergeRespectSummaries config is true, we assume that all part-files are the same for + // their schema with summary files, so we ignore them when merging schema. + // If the config is disabled, which is the default setting, we merge all part-files. + // In this mode, we only need to merge schemas contained in all those summary files. + // You should enable this configuration only if you are very sure that for the parquet + // part-files to read there are corresponding summary files containing correct schema. + + val needMerged: Seq[FileStatus] = + if (mergeRespectSummaries) { + Seq() + } else { + dataStatuses + } + (metadataStatuses ++ commonMetadataStatuses ++ needMerged).toSeq } else { // Tries any "_common_metadata" first. Parquet files written by old versions or Parquet // don't have this. http://git-wip-us.apache.org/repos/asf/spark/blob/6175d6cf/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala index c037faf..a95f70f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.parquet +import java.io.File + import org.apache.hadoop.fs.Path import org.apache.spark.sql.types._ import org.apache.spark.sql.{QueryTest, Row, SQLConf} +import org.apache.spark.util.Utils /** * A test suite that tests various Parquet queries. @@ -123,6 +126,30 @@ class ParquetQuerySuite extends QueryTest with ParquetTest { } } + test("Enabling/disabling merging partfiles when merging parquet schema") { + def testSchemaMerging(expectedColumnNumber: Int): Unit = { + withTempDir { dir => + val basePath = dir.getCanonicalPath + sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, "foo=1").toString) + sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, "foo=2").toString) + // delete summary files, so if we don't merge part-files, one column will not be included. + Utils.deleteRecursively(new File(basePath + "/foo=1/_metadata")) + Utils.deleteRecursively(new File(basePath + "/foo=1/_common_metadata")) + assert(sqlContext.read.parquet(basePath).columns.length === expectedColumnNumber) + } + } + + withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", + SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true") { + testSchemaMerging(2) + } + + withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true", + SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "false") { + testSchemaMerging(3) + } + } + test("Enabling/disabling schema merging") { def testSchemaMerging(expectedColumnNumber: Int): Unit = { withTempDir { dir => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org