spark git commit: [SPARK-6315] [SQL] Also tries the case class string parser while reading Parquet schema
Repository: spark Updated Branches: refs/heads/master bc37c9743 - 937c1e550 [SPARK-6315] [SQL] Also tries the case class string parser while reading Parquet schema When writing Parquet files, Spark 1.1.x persists the schema string into Parquet metadata with the result of `StructType.toString`, which was then deprecated in Spark 1.2 by a schema string in JSON format. But we still need to take the old schema format into account while reading Parquet files. !-- Reviewable:start -- [img src=https://reviewable.io/review_button.png; height=40 alt=Review on Reviewable/](https://reviewable.io/reviews/apache/spark/5034) !-- Reviewable:end -- Author: Cheng Lian l...@databricks.com Closes #5034 from liancheng/spark-6315 and squashes the following commits: a182f58 [Cheng Lian] Adds a regression test b9c6dbe [Cheng Lian] Also tries the case class string parser while reading Parquet schema Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/937c1e55 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/937c1e55 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/937c1e55 Branch: refs/heads/master Commit: 937c1e5503963e67a5412be993d30dbec6fc9883 Parents: bc37c97 Author: Cheng Lian l...@databricks.com Authored: Sat Mar 21 11:18:45 2015 +0800 Committer: Cheng Lian l...@databricks.com Committed: Sat Mar 21 11:18:45 2015 +0800 -- .../apache/spark/sql/parquet/newParquet.scala | 23 ++- .../spark/sql/parquet/ParquetIOSuite.scala | 42 ++-- 2 files changed, 60 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/937c1e55/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index fbe7a41..410600b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -681,7 +681,7 @@ private[sql] case class ParquetRelation2( } } -private[sql] object ParquetRelation2 { +private[sql] object ParquetRelation2 extends Logging { // Whether we should merge schemas collected from all Parquet part-files. val MERGE_SCHEMA = mergeSchema @@ -701,7 +701,26 @@ private[sql] object ParquetRelation2 { .getKeyValueMetaData .toMap .get(RowReadSupport.SPARK_METADATA_KEY) -.map(DataType.fromJson(_).asInstanceOf[StructType]) +.flatMap { serializedSchema = + // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to + // whatever is available. + Try(DataType.fromJson(serializedSchema)) +.recover { case _: Throwable = + logInfo( +sSerialized Spark schema in Parquet key-value metadata is not in JSON format, + + falling back to the deprecated DataType.fromCaseClassString parser.) + DataType.fromCaseClassString(serializedSchema) +} +.recover { case cause: Throwable = + logWarning( +sFailed to parse serialized Spark schema in Parquet key-value metadata: + |\t$serializedSchema + .stripMargin, +cause) +} +.map(_.asInstanceOf[StructType]) +.toOption +} maybeSparkSchema.getOrElse { // Falls back to Parquet schema if Spark SQL schema is absent. http://git-wip-us.apache.org/repos/asf/spark/blob/937c1e55/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index a70b3c7..5438095 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -28,8 +28,8 @@ import parquet.example.data.simple.SimpleGroup import parquet.example.data.{Group, GroupWriter} import parquet.hadoop.api.WriteSupport import parquet.hadoop.api.WriteSupport.WriteContext -import parquet.hadoop.metadata.CompressionCodecName -import parquet.hadoop.{ParquetFileWriter, ParquetWriter} +import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, CompressionCodecName} +import parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter} import parquet.io.api.RecordConsumer import parquet.schema.{MessageType, MessageTypeParser} @@ -38,7 +38,7 @@ import
spark git commit: [SPARK-6315] [SQL] Also tries the case class string parser while reading Parquet schema
Repository: spark Updated Branches: refs/heads/branch-1.3 df83e2197 - b75943f66 [SPARK-6315] [SQL] Also tries the case class string parser while reading Parquet schema When writing Parquet files, Spark 1.1.x persists the schema string into Parquet metadata with the result of `StructType.toString`, which was then deprecated in Spark 1.2 by a schema string in JSON format. But we still need to take the old schema format into account while reading Parquet files. !-- Reviewable:start -- [img src=https://reviewable.io/review_button.png; height=40 alt=Review on Reviewable/](https://reviewable.io/reviews/apache/spark/5034) !-- Reviewable:end -- Author: Cheng Lian l...@databricks.com Closes #5034 from liancheng/spark-6315 and squashes the following commits: a182f58 [Cheng Lian] Adds a regression test b9c6dbe [Cheng Lian] Also tries the case class string parser while reading Parquet schema (cherry picked from commit 937c1e5503963e67a5412be993d30dbec6fc9883) Signed-off-by: Cheng Lian l...@databricks.com Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b75943f6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b75943f6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b75943f6 Branch: refs/heads/branch-1.3 Commit: b75943f661b74c59c19847430663450bf57cfd8e Parents: df83e21 Author: Cheng Lian l...@databricks.com Authored: Sat Mar 21 11:18:45 2015 +0800 Committer: Cheng Lian l...@databricks.com Committed: Sat Mar 21 11:19:41 2015 +0800 -- .../apache/spark/sql/parquet/newParquet.scala | 23 ++- .../spark/sql/parquet/ParquetIOSuite.scala | 42 ++-- 2 files changed, 60 insertions(+), 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/b75943f6/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala index e82f7ad..3ee4015 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala @@ -662,7 +662,7 @@ private[sql] case class ParquetRelation2( } } -private[sql] object ParquetRelation2 { +private[sql] object ParquetRelation2 extends Logging { // Whether we should merge schemas collected from all Parquet part-files. val MERGE_SCHEMA = mergeSchema @@ -682,7 +682,26 @@ private[sql] object ParquetRelation2 { .getKeyValueMetaData .toMap .get(RowReadSupport.SPARK_METADATA_KEY) -.map(DataType.fromJson(_).asInstanceOf[StructType]) +.flatMap { serializedSchema = + // Don't throw even if we failed to parse the serialized Spark schema. Just fallback to + // whatever is available. + Try(DataType.fromJson(serializedSchema)) +.recover { case _: Throwable = + logInfo( +sSerialized Spark schema in Parquet key-value metadata is not in JSON format, + + falling back to the deprecated DataType.fromCaseClassString parser.) + DataType.fromCaseClassString(serializedSchema) +} +.recover { case cause: Throwable = + logWarning( +sFailed to parse serialized Spark schema in Parquet key-value metadata: + |\t$serializedSchema + .stripMargin, +cause) +} +.map(_.asInstanceOf[StructType]) +.toOption +} maybeSparkSchema.getOrElse { // Falls back to Parquet schema if Spark SQL schema is absent. http://git-wip-us.apache.org/repos/asf/spark/blob/b75943f6/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala -- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala index a70b3c7..5438095 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala @@ -28,8 +28,8 @@ import parquet.example.data.simple.SimpleGroup import parquet.example.data.{Group, GroupWriter} import parquet.hadoop.api.WriteSupport import parquet.hadoop.api.WriteSupport.WriteContext -import parquet.hadoop.metadata.CompressionCodecName -import parquet.hadoop.{ParquetFileWriter, ParquetWriter} +import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, CompressionCodecName} +import parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter} import