Repository: spark Updated Branches: refs/heads/branch-1.5 e2c6ef810 -> 90245f65c
[SPARK-10005] [SQL] Fixes schema merging for nested structs In case of schema merging, we only handled first level fields when converting Parquet groups to `InternalRow`s. Nested struct fields are not properly handled. For example, the schema of a Parquet file to be read can be: ``` message individual { required group f1 { optional binary f11 (utf8); } } ``` while the global schema is: ``` message global { required group f1 { optional binary f11 (utf8); optional int32 f12; } } ``` This PR fixes this issue by padding missing fields when creating actual converters. Author: Cheng Lian <l...@databricks.com> Closes #8228 from liancheng/spark-10005/nested-schema-merging. (cherry picked from commit ae2370e72f93db8a28b262e8252c55fe1fc9873c) Signed-off-by: Yin Huai <yh...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/90245f65 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/90245f65 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/90245f65 Branch: refs/heads/branch-1.5 Commit: 90245f65c94a40d3210207abaf6f136f5ce2861f Parents: e2c6ef8 Author: Cheng Lian <l...@databricks.com> Authored: Sun Aug 16 10:17:58 2015 -0700 Committer: Yin Huai <yh...@databricks.com> Committed: Sun Aug 16 10:18:08 2015 -0700 ---------------------------------------------------------------------- .../parquet/CatalystReadSupport.scala | 19 ++++-- .../parquet/CatalystRowConverter.scala | 70 ++++++++++++++++++-- .../parquet/CatalystSchemaConverter.scala | 15 +---- .../datasources/parquet/ParquetQuerySuite.scala | 30 ++++++++- 4 files changed, 112 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/90245f65/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index 4049795..a4679bb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -32,6 +32,7 @@ import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.types.StructType private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with Logging { + // Called after `init()` when initializing Parquet record reader. override def prepareForRead( conf: Configuration, keyValueMetaData: JMap[String, String], @@ -51,19 +52,30 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with // available if the target file is written by Spark SQL. .orElse(metadata.get(CatalystReadSupport.SPARK_METADATA_KEY)) }.map(StructType.fromString).getOrElse { - logDebug("Catalyst schema not available, falling back to Parquet schema") + logInfo("Catalyst schema not available, falling back to Parquet schema") toCatalyst.convert(parquetRequestedSchema) } - logDebug(s"Catalyst schema used to read Parquet files: $catalystRequestedSchema") + logInfo { + s"""Going to read the following fields from the Parquet file: + | + |Parquet form: + |$parquetRequestedSchema + | + |Catalyst form: + |$catalystRequestedSchema + """.stripMargin + } + new CatalystRecordMaterializer(parquetRequestedSchema, catalystRequestedSchema) } + // Called before `prepareForRead()` when initializing Parquet record reader. override def init(context: InitContext): ReadContext = { val conf = context.getConfiguration // If the target file was written by Spark SQL, we should be able to find a serialized Catalyst - // schema of this file from its the metadata. + // schema of this file from its metadata. val maybeRowSchema = Option(conf.get(RowWriteSupport.SPARK_ROW_SCHEMA)) // Optional schema of requested columns, in the form of a string serialized from a Catalyst @@ -141,7 +153,6 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++ maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _) - logInfo(s"Going to read Parquet file with these requested columns: $parquetRequestedSchema") new ReadContext(parquetRequestedSchema, metadata) } } http://git-wip-us.apache.org/repos/asf/spark/blob/90245f65/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index ab5a6dd..18c5b50 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -25,9 +25,10 @@ import scala.collection.mutable.ArrayBuffer import org.apache.parquet.column.Dictionary import org.apache.parquet.io.api.{Binary, Converter, GroupConverter, PrimitiveConverter} -import org.apache.parquet.schema.OriginalType.LIST +import org.apache.parquet.schema.OriginalType.{LIST, INT_32, UTF8} +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.DOUBLE import org.apache.parquet.schema.Type.Repetition -import org.apache.parquet.schema.{GroupType, PrimitiveType, Type} +import org.apache.parquet.schema.{GroupType, MessageType, PrimitiveType, Type} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ @@ -88,12 +89,54 @@ private[parquet] class CatalystPrimitiveConverter(val updater: ParentContainerUp } /** - * A [[CatalystRowConverter]] is used to convert Parquet "structs" into Spark SQL [[InternalRow]]s. - * Since any Parquet record is also a struct, this converter can also be used as root converter. + * A [[CatalystRowConverter]] is used to convert Parquet records into Catalyst [[InternalRow]]s. + * Since Catalyst `StructType` is also a Parquet record, this converter can be used as root + * converter. Take the following Parquet type as an example: + * {{{ + * message root { + * required int32 f1; + * optional group f2 { + * required double f21; + * optional binary f22 (utf8); + * } + * } + * }}} + * 5 converters will be created: + * + * - a root [[CatalystRowConverter]] for [[MessageType]] `root`, which contains: + * - a [[CatalystPrimitiveConverter]] for required [[INT_32]] field `f1`, and + * - a nested [[CatalystRowConverter]] for optional [[GroupType]] `f2`, which contains: + * - a [[CatalystPrimitiveConverter]] for required [[DOUBLE]] field `f21`, and + * - a [[CatalystStringConverter]] for optional [[UTF8]] string field `f22` * * When used as a root converter, [[NoopUpdater]] should be used since root converters don't have * any "parent" container. * + * @note Constructor argument [[parquetType]] refers to requested fields of the actual schema of the + * Parquet file being read, while constructor argument [[catalystType]] refers to requested + * fields of the global schema. The key difference is that, in case of schema merging, + * [[parquetType]] can be a subset of [[catalystType]]. For example, it's possible to have + * the following [[catalystType]]: + * {{{ + * new StructType() + * .add("f1", IntegerType, nullable = false) + * .add("f2", StringType, nullable = true) + * .add("f3", new StructType() + * .add("f31", DoubleType, nullable = false) + * .add("f32", IntegerType, nullable = true) + * .add("f33", StringType, nullable = true), nullable = false) + * }}} + * and the following [[parquetType]] (`f2` and `f32` are missing): + * {{{ + * message root { + * required int32 f1; + * required group f3 { + * required double f31; + * optional binary f33 (utf8); + * } + * } + * }}} + * * @param parquetType Parquet schema of Parquet records * @param catalystType Spark SQL schema that corresponds to the Parquet record type * @param updater An updater which propagates converted field values to the parent container @@ -126,7 +169,24 @@ private[parquet] class CatalystRowConverter( // Converters for each field. private val fieldConverters: Array[Converter with HasParentContainerUpdater] = { - parquetType.getFields.zip(catalystType).zipWithIndex.map { + // In case of schema merging, `parquetType` can be a subset of `catalystType`. We need to pad + // those missing fields and create converters for them, although values of these fields are + // always null. + val paddedParquetFields = { + val parquetFields = parquetType.getFields + val parquetFieldNames = parquetFields.map(_.getName).toSet + val missingFields = catalystType.filterNot(f => parquetFieldNames.contains(f.name)) + + // We don't need to worry about feature flag arguments like `assumeBinaryIsString` when + // creating the schema converter here, since values of missing fields are always null. + val toParquet = new CatalystSchemaConverter() + + (parquetFields ++ missingFields.map(toParquet.convertField)).sortBy { f => + catalystType.indexWhere(_.name == f.getName) + } + } + + paddedParquetFields.zip(catalystType).zipWithIndex.map { case ((parquetFieldType, catalystField), ordinal) => // Converted field value should be set to the `ordinal`-th cell of `currentRow` newConverter(parquetFieldType, catalystField.dataType, new RowUpdater(currentRow, ordinal)) http://git-wip-us.apache.org/repos/asf/spark/blob/90245f65/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index 275646e..535f068 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -72,18 +72,9 @@ private[parquet] class CatalystSchemaConverter( followParquetFormatSpec = conf.followParquetFormatSpec) def this(conf: Configuration) = this( - assumeBinaryIsString = - conf.getBoolean( - SQLConf.PARQUET_BINARY_AS_STRING.key, - SQLConf.PARQUET_BINARY_AS_STRING.defaultValue.get), - assumeInt96IsTimestamp = - conf.getBoolean( - SQLConf.PARQUET_INT96_AS_TIMESTAMP.key, - SQLConf.PARQUET_INT96_AS_TIMESTAMP.defaultValue.get), - followParquetFormatSpec = - conf.getBoolean( - SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key, - SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.defaultValue.get)) + assumeBinaryIsString = conf.get(SQLConf.PARQUET_BINARY_AS_STRING.key).toBoolean, + assumeInt96IsTimestamp = conf.get(SQLConf.PARQUET_INT96_AS_TIMESTAMP.key).toBoolean, + followParquetFormatSpec = conf.get(SQLConf.PARQUET_FOLLOW_PARQUET_FORMAT_SPEC.key).toBoolean) /** * Converts Parquet [[MessageType]] `parquetSchema` to a Spark SQL [[StructType]]. http://git-wip-us.apache.org/repos/asf/spark/blob/90245f65/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala index e2f2a8c..b7b70c2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetQuerySuite.scala @@ -21,7 +21,7 @@ import java.io.File import org.apache.hadoop.fs.Path -import org.apache.spark.sql.{QueryTest, Row, SQLConf} +import org.apache.spark.sql._ import org.apache.spark.sql.test.SharedSQLContext import org.apache.spark.sql.types._ import org.apache.spark.util.Utils @@ -201,4 +201,32 @@ class ParquetQuerySuite extends QueryTest with ParquetTest with SharedSQLContext assert(Decimal("67123.45") === Decimal(decimal)) } } + + test("SPARK-10005 Schema merging for nested struct") { + val sqlContext = _sqlContext + import sqlContext.implicits._ + + withTempPath { dir => + val path = dir.getCanonicalPath + + def append(df: DataFrame): Unit = { + df.write.mode(SaveMode.Append).parquet(path) + } + + // Note that both the following two DataFrames contain a single struct column with multiple + // nested fields. + append((1 to 2).map(i => Tuple1((i, i))).toDF()) + append((1 to 2).map(i => Tuple1((i, i, i))).toDF()) + + withSQLConf(SQLConf.PARQUET_BINARY_AS_STRING.key -> "true") { + checkAnswer( + sqlContext.read.option("mergeSchema", "true").parquet(path), + Seq( + Row(Row(1, 1, null)), + Row(Row(2, 2, null)), + Row(Row(1, 1, 1)), + Row(Row(2, 2, 2)))) + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org