Repository: spark
Updated Branches:
  refs/heads/master bf02e3771 -> 6722aca80


[SPARK-8785] [SQL] Improve Parquet schema merging

JIRA: https://issues.apache.org/jira/browse/SPARK-8785

Currently, the parquet schema merging (`ParquetRelation2.readSchema`) may spend 
much time to merge duplicate schema. We can select only non duplicate schema 
and merge them later.

Author: Liang-Chi Hsieh <vii...@gmail.com>
Author: Liang-Chi Hsieh <vii...@appier.com>

Closes #7182 from viirya/improve_parquet_merging and squashes the following 
commits:

5cf934f [Liang-Chi Hsieh] Refactor it to make it faster.
f3411ea [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into 
improve_parquet_merging
a63c3ff [Liang-Chi Hsieh] Improve Parquet schema merging.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6722aca8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6722aca8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6722aca8

Branch: refs/heads/master
Commit: 6722aca809ddc28aa20abf3bbb2e0de8629a9903
Parents: bf02e37
Author: Liang-Chi Hsieh <vii...@gmail.com>
Authored: Wed Jul 8 10:09:50 2015 -0700
Committer: Cheng Lian <l...@databricks.com>
Committed: Wed Jul 8 10:09:50 2015 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/parquet/newParquet.scala   | 82 ++++++++++++--------
 1 file changed, 48 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/6722aca8/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index 6bc69c6..ce456e7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -21,6 +21,7 @@ import java.net.URI
 import java.util.{List => JList}
 
 import scala.collection.JavaConversions._
+import scala.collection.mutable
 import scala.util.Try
 
 import com.google.common.base.Objects
@@ -30,8 +31,9 @@ import org.apache.hadoop.mapreduce._
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.parquet.filter2.predicate.FilterApi
 import org.apache.parquet.hadoop._
-import org.apache.parquet.hadoop.metadata.CompressionCodecName
+import org.apache.parquet.hadoop.metadata.{FileMetaData, CompressionCodecName}
 import org.apache.parquet.hadoop.util.ContextUtil
+import org.apache.parquet.schema.MessageType
 
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -508,44 +510,56 @@ private[sql] object ParquetRelation2 extends Logging {
 
   private[parquet] def readSchema(
       footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = {
-    footers.map { footer =>
+
+    def parseParquetSchema(schema: MessageType): StructType = {
+      StructType.fromAttributes(
+        // TODO Really no need to use `Attribute` here, we only need to know 
the data type.
+        ParquetTypesConverter.convertToAttributes(
+          schema,
+          sqlContext.conf.isParquetBinaryAsString,
+          sqlContext.conf.isParquetINT96AsTimestamp))
+    }
+
+    val seen = mutable.HashSet[String]()
+    val finalSchemas: Seq[StructType] = footers.flatMap { footer =>
       val metadata = footer.getParquetMetadata.getFileMetaData
-      val parquetSchema = metadata.getSchema
-      val maybeSparkSchema = metadata
+      val serializedSchema = metadata
         .getKeyValueMetaData
         .toMap
         .get(RowReadSupport.SPARK_METADATA_KEY)
-        .flatMap { serializedSchema =>
-          // Don't throw even if we failed to parse the serialized Spark 
schema. Just fallback to
-          // whatever is available.
-          Try(DataType.fromJson(serializedSchema))
-            .recover { case _: Throwable =>
-              logInfo(
-                s"Serialized Spark schema in Parquet key-value metadata is not 
in JSON format, " +
-                  "falling back to the deprecated DataType.fromCaseClassString 
parser.")
-              DataType.fromCaseClassString(serializedSchema)
-            }
-            .recover { case cause: Throwable =>
-              logWarning(
-                s"""Failed to parse serialized Spark schema in Parquet 
key-value metadata:
-                   |\t$serializedSchema
-                 """.stripMargin,
-                cause)
-            }
-            .map(_.asInstanceOf[StructType])
-            .toOption
-        }
-
-      maybeSparkSchema.getOrElse {
-        // Falls back to Parquet schema if Spark SQL schema is absent.
-        StructType.fromAttributes(
-          // TODO Really no need to use `Attribute` here, we only need to know 
the data type.
-          ParquetTypesConverter.convertToAttributes(
-            parquetSchema,
-            sqlContext.conf.isParquetBinaryAsString,
-            sqlContext.conf.isParquetINT96AsTimestamp))
+      if (serializedSchema == None) {
+        // Falls back to Parquet schema if no Spark SQL schema found.
+        Some(parseParquetSchema(metadata.getSchema))
+      } else if (!seen.contains(serializedSchema.get)) {
+        seen += serializedSchema.get
+
+        // Don't throw even if we failed to parse the serialized Spark schema. 
Just fallback to
+        // whatever is available.
+        Some(Try(DataType.fromJson(serializedSchema.get))
+          .recover { case _: Throwable =>
+            logInfo(
+              s"Serialized Spark schema in Parquet key-value metadata is not 
in JSON format, " +
+                "falling back to the deprecated DataType.fromCaseClassString 
parser.")
+            DataType.fromCaseClassString(serializedSchema.get)
+          }
+          .recover { case cause: Throwable =>
+            logWarning(
+              s"""Failed to parse serialized Spark schema in Parquet key-value 
metadata:
+                 |\t$serializedSchema
+               """.stripMargin,
+              cause)
+          }
+          .map(_.asInstanceOf[StructType])
+          .getOrElse {
+            // Falls back to Parquet schema if Spark SQL schema can't be 
parsed.
+            parseParquetSchema(metadata.getSchema)
+          })
+      } else {
+        None
       }
-    }.reduceOption { (left, right) =>
+    }
+
+    finalSchemas.reduceOption { (left, right) =>
       try left.merge(right) catch { case e: Throwable =>
         throw new SparkException(s"Failed to merge incompatible schemas $left 
and $right", e)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to