Repository: spark
Updated Branches:
  refs/heads/branch-1.5 e2c6ef810 -> 90245f65c


[SPARK-10005] [SQL] Fixes schema merging for nested structs

In case of schema merging, we only handled first level fields when converting 
Parquet groups to `InternalRow`s. Nested struct fields are not properly handled.

For example, the schema of a Parquet file to be read can be:

```
message individual {
  required group f1 {
    optional binary f11 (utf8);
  }
}
```

while the global schema is:

```
message global {
  required group f1 {
    optional binary f11 (utf8);
    optional int32 f12;
  }
}
```

This PR fixes this issue by padding missing fields when creating actual 
converters.

Author: Cheng Lian <l...@databricks.com>

Closes #8228 from liancheng/spark-10005/nested-schema-merging.

(cherry picked from commit ae2370e72f93db8a28b262e8252c55fe1fc9873c)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90245f65
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90245f65
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90245f65

Branch: refs/heads/branch-1.5
Commit: 90245f65c94a40d3210207abaf6f136f5ce2861f
Parents: e2c6ef8
Author: Cheng Lian <l...@databricks.com>
Authored: Sun Aug 16 10:17:58 2015 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Sun Aug 16 10:18:08 2015 -0700

----------------------------------------------------------------------
 .../parquet/CatalystReadSupport.scala           | 19 ++++--
 .../parquet/CatalystRowConverter.scala          | 70 ++++++++++++++++++--
 .../parquet/CatalystSchemaConverter.scala       | 15 +----
 .../datasources/parquet/ParquetQuerySuite.scala | 30 ++++++++-
 4 files changed, 112 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/90245f65/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
index 4049795..a4679bb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala
@@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types.StructType
 
 private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] 
with Logging {
+  // Called after `init()` when initializing Parquet record reader.
   override def prepareForRead(
       conf: Configuration,
       keyValueMetaData: JMap[String, String],
@@ -51,19 +52,30 @@ private[parquet] class CatalystReadSupport extends 
ReadSupport[InternalRow] with
           // available if the target file is written by Spark SQL.
           .orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY))
       }.map(StructType.fromString).getOrElse {
-        logDebug("Catalyst schema not available, falling back to Parquet 
schema")
+        logInfo("Catalyst schema not available, falling back to Parquet 
schema")
         toCatalyst.convert(parquetRequestedSchema)
       }
 
-    logDebug(s"Catalyst schema used to read Parquet files: 
$catalystRequestedSchema")
+    logInfo {
+      s"""Going to read the following fields from the Parquet file:
+         |
+         |Parquet form:
+         |$parquetRequestedSchema
+         |
+         |Catalyst form:
+         |$catalystRequestedSchema
+       """.stripMargin
+    }
+
     new CatalystRecordMaterializer(parquetRequestedSchema, 
catalystRequestedSchema)
   }
 
+  // Called before `prepareForRead()` when initializing Parquet record reader.
   override def init(context: InitContext): ReadContext = {
     val conf = context.getConfiguration
 
     // If the target file was written by Spark SQL, we should be able to find 
a serialized Catalyst
-    // schema of this file from its the metadata.
+    // schema of this file from its metadata.
     val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA))
 
     // Optional schema of requested columns, in the form of a string 
serialized from a Catalyst
@@ -141,7 +153,6 @@ private[parquet] class CatalystReadSupport extends 
ReadSupport[InternalRow] with
         
maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++
         maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _)
 
-    logInfo(s"Going to read Parquet file with these requested columns: 
$parquetRequestedSchema")
     new ReadContext(parquetRequestedSchema, metadata)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/90245f65/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
index ab5a6dd..18c5b50 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala
@@ -25,9 +25,10 @@ import scala.collection.mutable.ArrayBuffer
 
 import org.apache.parquet.column.Dictionary
 import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, 
PrimitiveConverter}
-import org.apache.parquet.schema.OriginalType.LIST
+import org.apache.parquet.schema.OriginalType.{LIST, INT_32, UTF8}
+import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE
 import org.apache.parquet.schema.Type.Repetition
-import org.apache.parquet.schema.{GroupType, PrimitiveType, Type}
+import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type}
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
@@ -88,12 +89,54 @@ private[parquet] class CatalystPrimitiveConverter(val 
updater: ParentContainerUp
 }
 
 /**
- * A [[CatalystRowConverter]] is used to convert Parquet "structs" into Spark 
SQL [[InternalRow]]s.
- * Since any Parquet record is also a struct, this converter can also be used 
as root converter.
+ * A [[CatalystRowConverter]] is used to convert Parquet records into Catalyst 
[[InternalRow]]s.
+ * Since Catalyst `StructType` is also a Parquet record, this converter can be 
used as root
+ * converter.  Take the following Parquet type as an example:
+ * {{{
+ *   message root {
+ *     required int32 f1;
+ *     optional group f2 {
+ *       required double f21;
+ *       optional binary f22 (utf8);
+ *     }
+ *   }
+ * }}}
+ * 5 converters will be created:
+ *
+ * - a root [[CatalystRowConverter]] for [[MessageType]] `root`, which 
contains:
+ *   - a [[CatalystPrimitiveConverter]] for required [[INT_32]] field `f1`, and
+ *   - a nested [[CatalystRowConverter]] for optional [[GroupType]] `f2`, 
which contains:
+ *     - a [[CatalystPrimitiveConverter]] for required [[DOUBLE]] field `f21`, 
and
+ *     - a [[CatalystStringConverter]] for optional [[UTF8]] string field `f22`
  *
  * When used as a root converter, [[NoopUpdater]] should be used since root 
converters don't have
  * any "parent" container.
  *
+ * @note Constructor argument [[parquetType]] refers to requested fields of 
the actual schema of the
+ *       Parquet file being read, while constructor argument [[catalystType]] 
refers to requested
+ *       fields of the global schema.  The key difference is that, in case of 
schema merging,
+ *       [[parquetType]] can be a subset of [[catalystType]].  For example, 
it's possible to have
+ *       the following [[catalystType]]:
+ *       {{{
+ *         new StructType()
+ *           .add("f1", IntegerType, nullable = false)
+ *           .add("f2", StringType, nullable = true)
+ *           .add("f3", new StructType()
+ *             .add("f31", DoubleType, nullable = false)
+ *             .add("f32", IntegerType, nullable = true)
+ *             .add("f33", StringType, nullable = true), nullable = false)
+ *       }}}
+ *       and the following [[parquetType]] (`f2` and `f32` are missing):
+ *       {{{
+ *         message root {
+ *           required int32 f1;
+ *           required group f3 {
+ *             required double f31;
+ *             optional binary f33 (utf8);
+ *           }
+ *         }
+ *       }}}
+ *
  * @param parquetType Parquet schema of Parquet records
  * @param catalystType Spark SQL schema that corresponds to the Parquet record 
type
  * @param updater An updater which propagates converted field values to the 
parent container
@@ -126,7 +169,24 @@ private[parquet] class CatalystRowConverter(
 
   // Converters for each field.
   private val fieldConverters: Array[Converter with HasParentContainerUpdater] 
= {
-    parquetType.getFields.zip(catalystType).zipWithIndex.map {
+    // In case of schema merging, `parquetType` can be a subset of 
`catalystType`.  We need to pad
+    // those missing fields and create converters for them, although values of 
these fields are
+    // always null.
+    val paddedParquetFields = {
+      val parquetFields = parquetType.getFields
+      val parquetFieldNames = parquetFields.map(_.getName).toSet
+      val missingFields = catalystType.filterNot(f => 
parquetFieldNames.contains(f.name))
+
+      // We don't need to worry about feature flag arguments like 
`assumeBinaryIsString` when
+      // creating the schema converter here, since values of missing fields 
are always null.
+      val toParquet = new CatalystSchemaConverter()
+
+      (parquetFields ++ missingFields.map(toParquet.convertField)).sortBy { f 
=>
+        catalystType.indexWhere(_.name == f.getName)
+      }
+    }
+
+    paddedParquetFields.zip(catalystType).zipWithIndex.map {
       case ((parquetFieldType, catalystField), ordinal) =>
         // Converted field value should be set to the `ordinal`-th cell of 
`currentRow`
         newConverter(parquetFieldType, catalystField.dataType, new 
RowUpdater(currentRow, ordinal))

http://git-wip-us.apache.org/repos/asf/spark/blob/90245f65/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
index 275646e..535f068 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala
@@ -72,18 +72,9 @@ private[parquet] class CatalystSchemaConverter(
     followParquetFormatSpec = conf.followParquetFormatSpec)
 
   def this(conf: Configuration) = this(
-    assumeBinaryIsString =
-      conf.getBoolean(
-        SQLConf.PARQUET_BINARY_AS_STRING.key,
-        SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get),
-    assumeInt96IsTimestamp =
-      conf.getBoolean(
-        SQLConf.PARQUET_INT96_AS_TIMESTAMP.key,
-        SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get),
-    followParquetFormatSpec =
-      conf.getBoolean(
-        SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key,
-        SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get))
+    assumeBinaryIsString = 
conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean,
+    assumeInt96IsTimestamp = 
conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean,
+    followParquetFormatSpec = 
conf.get(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key).toBoolean)
 
   /**
    * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL 
[[StructType]].

http://git-wip-us.apache.org/repos/asf/spark/blob/90245f65/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
index e2f2a8c..b7b70c2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala
@@ -21,7 +21,7 @@ import java.io.File
 
 import org.apache.hadoop.fs.Path
 
-import org.apache.spark.sql.{QueryTest, Row, SQLConf}
+import org.apache.spark.sql._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -201,4 +201,32 @@ class ParquetQuerySuite extends QueryTest with ParquetTest 
with SharedSQLContext
       assert(Decimal("67123.45") === Decimal(decimal))
     }
   }
+
+  test("SPARK-10005 Schema merging for nested struct") {
+    val sqlContext = _sqlContext
+    import sqlContext.implicits._
+
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+
+      def append(df: DataFrame): Unit = {
+        df.write.mode(SaveMode.Append).parquet(path)
+      }
+
+      // Note that both the following two DataFrames contain a single struct 
column with multiple
+      // nested fields.
+      append((1 to 2).map(i => Tuple1((i, i))).toDF())
+      append((1 to 2).map(i => Tuple1((i, i, i))).toDF())
+
+      withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") {
+        checkAnswer(
+          sqlContext.read.option("mergeSchema", "true").parquet(path),
+          Seq(
+            Row(Row(1, 1, null)),
+            Row(Row(2, 2, null)),
+            Row(Row(1, 1, 1)),
+            Row(Row(2, 2, 2))))
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to