Repository: spark Updated Branches: refs/heads/master bf02e3771 -> 6722aca80
[SPARK-8785] [SQL] Improve Parquet schema merging JIRA: https://issues.apache.org/jira/browse/SPARK-8785 Currently, the parquet schema merging (`ParquetRelation2.readSchema`) may spend much time to merge duplicate schema. We can select only non duplicate schema and merge them later. Author: Liang-Chi Hsieh <vii...@gmail.com> Author: Liang-Chi Hsieh <vii...@appier.com> Closes #7182 from viirya/improve_parquet_merging and squashes the following commits: 5cf934f [Liang-Chi Hsieh] Refactor it to make it faster. f3411ea [Liang-Chi Hsieh] Merge remote-tracking branch 'upstream/master' into improve_parquet_merging a63c3ff [Liang-Chi Hsieh] Improve Parquet schema merging. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6722aca8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6722aca8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6722aca8 Branch: refs/heads/master Commit: 6722aca809ddc28aa20abf3bbb2e0de8629a9903 Parents: bf02e37 Author: Liang-Chi Hsieh <vii...@gmail.com> Authored: Wed Jul 8 10:09:50 2015 -0700 Committer: Cheng Lian <l...@databricks.com> Committed: Wed Jul 8 10:09:50 2015 -0700 ---------------------------------------------------------------------- .../apache/spark/sql/parquet/newParquet.scala | 82 ++++++++++++-------- 1 file changed, 48 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6722aca8/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index 6bc69c6..ce456e7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -21,6 +21,7 @@ import java.net.URI import java.util.{List => JList} import scala.collection.JavaConversions._ +import scala.collection.mutable import scala.util.Try import com.google.common.base.Objects @@ -30,8 +31,9 @@ import org.apache.hadoop.mapreduce._ import org.apache.hadoop.mapreduce.lib.input.FileInputFormat import org.apache.parquet.filter2.predicate.FilterApi import org.apache.parquet.hadoop._ -import org.apache.parquet.hadoop.metadata.CompressionCodecName +import org.apache.parquet.hadoop.metadata.{FileMetaData, CompressionCodecName} import org.apache.parquet.hadoop.util.ContextUtil +import org.apache.parquet.schema.MessageType import org.apache.spark.broadcast.Broadcast import org.apache.spark.deploy.SparkHadoopUtil @@ -508,44 +510,56 @@ private[sql] object ParquetRelation2 extends Logging { private[parquet] def readSchema( footers: Seq[Footer], sqlContext: SQLContext): Option[StructType] = { - footers.map { footer => + + def parseParquetSchema(schema: MessageType): StructType = { + StructType.fromAttributes( + // TODO Really no need to use `Attribute` here, we only need to know the data type. + ParquetTypesConverter.convertToAttributes( + schema, + sqlContext.conf.isParquetBinaryAsString, + sqlContext.conf.isParquetINT96AsTimestamp)) + } + + val seen = mutable.HashSet[String]() + val finalSchemas: Seq[StructType] = footers.flatMap { footer => val metadata = footer.getParquetMetadata.getFileMetaData - val parquetSchema = metadata.getSchema - val maybeSparkSchema = metadata + val serializedSchema = metadata .getKeyValueMetaData .toMap .get(RowReadSupport.SPARK_METADATA_KEY) - .flatMap { serializedSchema => - // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to - // whatever is available. - Try(DataType.fromJson(serializedSchema)) - .recover { case _: Throwable => - logInfo( - s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + - "falling back to the deprecated DataType.fromCaseClassString parser.") - DataType.fromCaseClassString(serializedSchema) - } - .recover { case cause: Throwable => - logWarning( - s"""Failed to parse serialized Spark schema in Parquet key-value metadata: - |\t$serializedSchema - """.stripMargin, - cause) - } - .map(_.asInstanceOf[StructType]) - .toOption - } - - maybeSparkSchema.getOrElse { - // Falls back to Parquet schema if Spark SQL schema is absent. - StructType.fromAttributes( - // TODO Really no need to use `Attribute` here, we only need to know the data type. - ParquetTypesConverter.convertToAttributes( - parquetSchema, - sqlContext.conf.isParquetBinaryAsString, - sqlContext.conf.isParquetINT96AsTimestamp)) + if (serializedSchema == None) { + // Falls back to Parquet schema if no Spark SQL schema found. + Some(parseParquetSchema(metadata.getSchema)) + } else if (!seen.contains(serializedSchema.get)) { + seen += serializedSchema.get + + // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to + // whatever is available. + Some(Try(DataType.fromJson(serializedSchema.get)) + .recover { case _: Throwable => + logInfo( + s"Serialized Spark schema in Parquet key-value metadata is not in JSON format, " + + "falling back to the deprecated DataType.fromCaseClassString parser.") + DataType.fromCaseClassString(serializedSchema.get) + } + .recover { case cause: Throwable => + logWarning( + s"""Failed to parse serialized Spark schema in Parquet key-value metadata: + |\t$serializedSchema + """.stripMargin, + cause) + } + .map(_.asInstanceOf[StructType]) + .getOrElse { + // Falls back to Parquet schema if Spark SQL schema can't be parsed. + parseParquetSchema(metadata.getSchema) + }) + } else { + None } - }.reduceOption { (left, right) => + } + + finalSchemas.reduceOption { (left, right) => try left.merge(right) catch { case e: Throwable => throw new SparkException(s"Failed to merge incompatible schemas $left and $right", e) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org