Repository: spark
Updated Branches:
  refs/heads/master 5ba2d4406 -> 6175d6cfe


[SPARK-8838] [SQL] Add config to enable/disable merging part-files when merging 
parquet schema

JIRA: https://issues.apache.org/jira/browse/SPARK-8838

Currently all part-files are merged when merging parquet schema. However, in 
case there are many part-files and we can make sure that all the part-files 
have the same schema as their summary file. If so, we provide a configuration 
to disable merging part-files when merging parquet schema.

In short, we need to merge parquet schema because different summary files may 
contain different schema. But the part-files are confirmed to have the same 
schema with summary files.

Author: Liang-Chi Hsieh <vii...@appier.com>

Closes #7238 from viirya/option_partfile_merge and squashes the following 
commits:

71d5b5f [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into 
option_partfile_merge
8816f44 [Liang-Chi Hsieh] For comments.
dbc8e6b [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into 
option_partfile_merge
afc2fa1 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into 
option_partfile_merge
d4ed7e6 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into 
option_partfile_merge
df43027 [Liang-Chi Hsieh] Get dataStatuses' partitions based on all paths.
4eb2f00 [Liang-Chi Hsieh] Use given parameter.
ea8f6e5 [Liang-Chi Hsieh] Correct the code comments.
a57be0e [Liang-Chi Hsieh] Merge part-files if there are no summary files.
47df981 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into 
option_partfile_merge
4caf293 [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into 
option_partfile_merge
0e734e0 [Liang-Chi Hsieh] Use correct API.
3b6be5b [Liang-Chi Hsieh] Fix key not found.
4bdd7e0 [Liang-Chi Hsieh] Don't read footer files if we can skip them.
8bbebcb [Liang-Chi Hsieh] Figure out how to test the config.
bbd4ce7 [Liang-Chi Hsieh] Add config to enable/disable merging part-files when 
merging parquet schema.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6175d6cf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6175d6cf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6175d6cf

Branch: refs/heads/master
Commit: 6175d6cfe795fbd88e3ee713fac375038a3993a8
Parents: 5ba2d44
Author: Liang-Chi Hsieh <vii...@appier.com>
Authored: Thu Jul 30 17:45:30 2015 +0800
Committer: Cheng Lian <l...@databricks.com>
Committed: Thu Jul 30 17:45:30 2015 +0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/sql/SQLConf.scala    |  7 +++++
 .../spark/sql/parquet/ParquetRelation.scala     | 19 +++++++++++++-
 .../spark/sql/parquet/ParquetQuerySuite.scala   | 27 ++++++++++++++++++++
 3 files changed, 52 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6175d6cf/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
index cdb0c7a..2564bbd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala
@@ -247,6 +247,13 @@ private[spark] object SQLConf {
           "otherwise the schema is picked from the summary file or a random 
data file " +
           "if no summary file is available.")
 
+  val PARQUET_SCHEMA_RESPECT_SUMMARIES = 
booleanConf("spark.sql.parquet.respectSummaryFiles",
+    defaultValue = Some(false),
+    doc = "When true, we make assumption that all part-files of Parquet are 
consistent with " +
+          "summary files and we will ignore them when merging schema. 
Otherwise, if this is " +
+          "false, which is the default, we will merge all part-files. This 
should be considered " +
+          "as expert-only option, and shouldn't be enabled before knowing what 
it means exactly.")
+
   val PARQUET_BINARY_AS_STRING = 
booleanConf("spark.sql.parquet.binaryAsString",
     defaultValue = Some(false),
     doc = "Some other Parquet-producing systems, in particular Impala and 
older versions of " +

http://git-wip-us.apache.org/repos/asf/spark/blob/6175d6cf/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
index 1a8176d..b4337a4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/ParquetRelation.scala
@@ -124,6 +124,9 @@ private[sql] class ParquetRelation(
       .map(_.toBoolean)
       
.getOrElse(sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED))
 
+  private val mergeRespectSummaries =
+    sqlContext.conf.getConf(SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES)
+
   private val maybeMetastoreSchema = parameters
     .get(ParquetRelation.METASTORE_SCHEMA)
     .map(DataType.fromJson(_).asInstanceOf[StructType])
@@ -421,7 +424,21 @@ private[sql] class ParquetRelation(
       val filesToTouch =
         if (shouldMergeSchemas) {
           // Also includes summary files, 'cause there might be empty 
partition directories.
-          (metadataStatuses ++ commonMetadataStatuses ++ dataStatuses).toSeq
+
+          // If mergeRespectSummaries config is true, we assume that all 
part-files are the same for
+          // their schema with summary files, so we ignore them when merging 
schema.
+          // If the config is disabled, which is the default setting, we merge 
all part-files.
+          // In this mode, we only need to merge schemas contained in all 
those summary files.
+          // You should enable this configuration only if you are very sure 
that for the parquet
+          // part-files to read there are corresponding summary files 
containing correct schema.
+
+          val needMerged: Seq[FileStatus] =
+            if (mergeRespectSummaries) {
+              Seq()
+            } else {
+              dataStatuses
+            }
+          (metadataStatuses ++ commonMetadataStatuses ++ needMerged).toSeq
         } else {
           // Tries any "_common_metadata" first. Parquet files written by old 
versions or Parquet
           // don't have this.

http://git-wip-us.apache.org/repos/asf/spark/blob/6175d6cf/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index c037faf..a95f70f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -17,10 +17,13 @@
 
 package org.apache.spark.sql.parquet
 
+import java.io.File
+
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.{QueryTest, Row, SQLConf}
+import org.apache.spark.util.Utils
 
 /**
  * A test suite that tests various Parquet queries.
@@ -123,6 +126,30 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
{
     }
   }
 
+  test("Enabling/disabling merging partfiles when merging parquet schema") {
+    def testSchemaMerging(expectedColumnNumber: Int): Unit = {
+      withTempDir { dir =>
+        val basePath = dir.getCanonicalPath
+        sqlContext.range(0, 10).toDF("a").write.parquet(new Path(basePath, 
"foo=1").toString)
+        sqlContext.range(0, 10).toDF("b").write.parquet(new Path(basePath, 
"foo=2").toString)
+        // delete summary files, so if we don't merge part-files, one column 
will not be included.
+        Utils.deleteRecursively(new File(basePath + "/foo=1/_metadata"))
+        Utils.deleteRecursively(new File(basePath + "/foo=1/_common_metadata"))
+        assert(sqlContext.read.parquet(basePath).columns.length === 
expectedColumnNumber)
+      }
+    }
+
+    withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
+      SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "true") {
+      testSchemaMerging(2)
+    }
+
+    withSQLConf(SQLConf.PARQUET_SCHEMA_MERGING_ENABLED.key -> "true",
+      SQLConf.PARQUET_SCHEMA_RESPECT_SUMMARIES.key -> "false") {
+      testSchemaMerging(3)
+    }
+  }
+
   test("Enabling/disabling schema merging") {
     def testSchemaMerging(expectedColumnNumber: Int): Unit = {
       withTempDir { dir =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to