spark git commit: [SPARK-6315] [SQL] Also tries the case class string parser while reading Parquet schema

2015-03-20 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master bc37c9743 - 937c1e550


[SPARK-6315] [SQL] Also tries the case class string parser while reading 
Parquet schema

When writing Parquet files, Spark 1.1.x persists the schema string into Parquet 
metadata with the result of `StructType.toString`, which was then deprecated in 
Spark 1.2 by a schema string in JSON format. But we still need to take the old 
schema format into account while reading Parquet files.

!-- Reviewable:start --
[img src=https://reviewable.io/review_button.png; height=40 alt=Review on 
Reviewable/](https://reviewable.io/reviews/apache/spark/5034)
!-- Reviewable:end --

Author: Cheng Lian l...@databricks.com

Closes #5034 from liancheng/spark-6315 and squashes the following commits:

a182f58 [Cheng Lian] Adds a regression test
b9c6dbe [Cheng Lian] Also tries the case class string parser while reading 
Parquet schema


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/937c1e55
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/937c1e55
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/937c1e55

Branch: refs/heads/master
Commit: 937c1e5503963e67a5412be993d30dbec6fc9883
Parents: bc37c97
Author: Cheng Lian l...@databricks.com
Authored: Sat Mar 21 11:18:45 2015 +0800
Committer: Cheng Lian l...@databricks.com
Committed: Sat Mar 21 11:18:45 2015 +0800

--
 .../apache/spark/sql/parquet/newParquet.scala   | 23 ++-
 .../spark/sql/parquet/ParquetIOSuite.scala  | 42 ++--
 2 files changed, 60 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/937c1e55/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index fbe7a41..410600b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -681,7 +681,7 @@ private[sql] case class ParquetRelation2(
   }
 }
 
-private[sql] object ParquetRelation2 {
+private[sql] object ParquetRelation2 extends Logging {
   // Whether we should merge schemas collected from all Parquet part-files.
   val MERGE_SCHEMA = mergeSchema
 
@@ -701,7 +701,26 @@ private[sql] object ParquetRelation2 {
 .getKeyValueMetaData
 .toMap
 .get(RowReadSupport.SPARK_METADATA_KEY)
-.map(DataType.fromJson(_).asInstanceOf[StructType])
+.flatMap { serializedSchema =
+  // Don't throw even if we failed to parse the serialized Spark 
schema. Just fallback to
+  // whatever is available.
+  Try(DataType.fromJson(serializedSchema))
+.recover { case _: Throwable =
+  logInfo(
+sSerialized Spark schema in Parquet key-value metadata is not 
in JSON format,  +
+  falling back to the deprecated DataType.fromCaseClassString 
parser.)
+  DataType.fromCaseClassString(serializedSchema)
+}
+.recover { case cause: Throwable =
+  logWarning(
+sFailed to parse serialized Spark schema in Parquet 
key-value metadata:
+   |\t$serializedSchema
+ .stripMargin,
+cause)
+}
+.map(_.asInstanceOf[StructType])
+.toOption
+}
 
   maybeSparkSchema.getOrElse {
 // Falls back to Parquet schema if Spark SQL schema is absent.

http://git-wip-us.apache.org/repos/asf/spark/blob/937c1e55/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index a70b3c7..5438095 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -28,8 +28,8 @@ import parquet.example.data.simple.SimpleGroup
 import parquet.example.data.{Group, GroupWriter}
 import parquet.hadoop.api.WriteSupport
 import parquet.hadoop.api.WriteSupport.WriteContext
-import parquet.hadoop.metadata.CompressionCodecName
-import parquet.hadoop.{ParquetFileWriter, ParquetWriter}
+import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, 
CompressionCodecName}
+import parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter}
 import parquet.io.api.RecordConsumer
 import parquet.schema.{MessageType, MessageTypeParser}
 
@@ -38,7 +38,7 @@ import 

spark git commit: [SPARK-6315] [SQL] Also tries the case class string parser while reading Parquet schema

2015-03-20 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-1.3 df83e2197 - b75943f66


[SPARK-6315] [SQL] Also tries the case class string parser while reading 
Parquet schema

When writing Parquet files, Spark 1.1.x persists the schema string into Parquet 
metadata with the result of `StructType.toString`, which was then deprecated in 
Spark 1.2 by a schema string in JSON format. But we still need to take the old 
schema format into account while reading Parquet files.

!-- Reviewable:start --
[img src=https://reviewable.io/review_button.png; height=40 alt=Review on 
Reviewable/](https://reviewable.io/reviews/apache/spark/5034)
!-- Reviewable:end --

Author: Cheng Lian l...@databricks.com

Closes #5034 from liancheng/spark-6315 and squashes the following commits:

a182f58 [Cheng Lian] Adds a regression test
b9c6dbe [Cheng Lian] Also tries the case class string parser while reading 
Parquet schema

(cherry picked from commit 937c1e5503963e67a5412be993d30dbec6fc9883)
Signed-off-by: Cheng Lian l...@databricks.com


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b75943f6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b75943f6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b75943f6

Branch: refs/heads/branch-1.3
Commit: b75943f661b74c59c19847430663450bf57cfd8e
Parents: df83e21
Author: Cheng Lian l...@databricks.com
Authored: Sat Mar 21 11:18:45 2015 +0800
Committer: Cheng Lian l...@databricks.com
Committed: Sat Mar 21 11:19:41 2015 +0800

--
 .../apache/spark/sql/parquet/newParquet.scala   | 23 ++-
 .../spark/sql/parquet/ParquetIOSuite.scala  | 42 ++--
 2 files changed, 60 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b75943f6/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
index e82f7ad..3ee4015 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/parquet/newParquet.scala
@@ -662,7 +662,7 @@ private[sql] case class ParquetRelation2(
   }
 }
 
-private[sql] object ParquetRelation2 {
+private[sql] object ParquetRelation2 extends Logging {
   // Whether we should merge schemas collected from all Parquet part-files.
   val MERGE_SCHEMA = mergeSchema
 
@@ -682,7 +682,26 @@ private[sql] object ParquetRelation2 {
 .getKeyValueMetaData
 .toMap
 .get(RowReadSupport.SPARK_METADATA_KEY)
-.map(DataType.fromJson(_).asInstanceOf[StructType])
+.flatMap { serializedSchema =
+  // Don't throw even if we failed to parse the serialized Spark 
schema. Just fallback to
+  // whatever is available.
+  Try(DataType.fromJson(serializedSchema))
+.recover { case _: Throwable =
+  logInfo(
+sSerialized Spark schema in Parquet key-value metadata is not 
in JSON format,  +
+  falling back to the deprecated DataType.fromCaseClassString 
parser.)
+  DataType.fromCaseClassString(serializedSchema)
+}
+.recover { case cause: Throwable =
+  logWarning(
+sFailed to parse serialized Spark schema in Parquet 
key-value metadata:
+   |\t$serializedSchema
+ .stripMargin,
+cause)
+}
+.map(_.asInstanceOf[StructType])
+.toOption
+}
 
   maybeSparkSchema.getOrElse {
 // Falls back to Parquet schema if Spark SQL schema is absent.

http://git-wip-us.apache.org/repos/asf/spark/blob/b75943f6/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
index a70b3c7..5438095 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetIOSuite.scala
@@ -28,8 +28,8 @@ import parquet.example.data.simple.SimpleGroup
 import parquet.example.data.{Group, GroupWriter}
 import parquet.hadoop.api.WriteSupport
 import parquet.hadoop.api.WriteSupport.WriteContext
-import parquet.hadoop.metadata.CompressionCodecName
-import parquet.hadoop.{ParquetFileWriter, ParquetWriter}
+import parquet.hadoop.metadata.{ParquetMetadata, FileMetaData, 
CompressionCodecName}
+import parquet.hadoop.{Footer, ParquetFileWriter, ParquetWriter}
 import