This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 215609d  [SPARK-25407][SQL] Allow nested access for non-existent field 
for Parquet file when nested pruning is enabled
215609d is described below

commit 215609def22da14c464b37374ceae4f53a39a145
Author: Michael Allman <m...@allman.ms>
AuthorDate: Mon Apr 8 22:26:02 2019 +0900

    [SPARK-25407][SQL] Allow nested access for non-existent field for Parquet 
file when nested pruning is enabled
    
    ## What changes were proposed in this pull request?
    
    As part of schema clipping in `ParquetReadSupport.scala`, we add fields in 
the Catalyst requested schema which are missing from the Parquet file schema to 
the Parquet clipped schema. However, nested schema pruning requires we ignore 
unrequested field data when reading from a Parquet file. Therefore we pass two 
schema to `ParquetRecordMaterializer`: the schema of the file data we want to 
read and the schema of the rows we want to return. The reader is responsible 
for reconciling the di [...]
    
    Aside from checking whether schema pruning is enabled, there is an 
additional complication to constructing the Parquet requested schema. The 
manner in which Spark's two Parquet readers reconcile the differences between 
the Parquet requested schema and the Catalyst requested schema differ. Spark's 
vectorized reader does not (currently) support reading Parquet files with 
complex types in their schema. Further, it assumes that the Parquet requested 
schema includes all fields requested in [...]
    
    Spark's parquet-mr based reader supports reading Parquet files of any kind 
of complex schema, and it supports nested schema pruning as well. Unlike the 
vectorized reader, the parquet-mr reader requires that the Parquet requested 
schema include only those fields present in the underlying Parquet file's 
schema. Therefore, in the case where we use the parquet-mr reader we intersect 
the Parquet clipped schema with the Parquet file's schema to construct the 
Parquet requested schema that's  [...]
    
    _Additional description (by HyukjinKwon):_
    
    Let's suppose that we have a Parquet schema as below:
    
    ```
    message spark_schema {
      required int32 id;
      optional group name {
        optional binary first (UTF8);
        optional binary last (UTF8);
      }
      optional binary address (UTF8);
    }
    ```
    
    Currently, the clipped schema as follows:
    
    ```
    message spark_schema {
      optional group name {
        optional binary middle (UTF8);
      }
      optional binary address (UTF8);
    }
    ```
    
    Parquet MR does not support access to the nested non-existent field 
(`name.middle`).
    
    To workaround this, this PR removes `name.middle` request at all to Parquet 
reader as below:
    
    ```
    Parquet requested schema:
    message spark_schema {
      optional binary address (UTF8);
    }
    ```
    
    and produces the record (`name.middle`) properly as the requested Catalyst 
schema.
    
    ```
    root
    -- name: struct (nullable = true)
        |-- middle: string (nullable = true)
    -- address: string (nullable = true)
    ```
    
    I think technically this is what Parquet library should support since 
Parquet library made a design decision to produce `null` for non-existent 
fields IIRC. This PR targets to work around it.
    
    ## How was this patch tested?
    
    A previously ignored test case which exercises the failure scenario this PR 
addresses has been enabled.
    
    This closes #22880
    
    Closes #24307 from dongjoon-hyun/SPARK-25407.
    
    Lead-authored-by: Michael Allman <m...@allman.ms>
    Co-authored-by: Dongjoon Hyun <dh...@apple.com>
    Signed-off-by: HyukjinKwon <gurwls...@apache.org>
---
 .../datasources/parquet/ParquetFileFormat.scala    |  8 +-
 .../datasources/parquet/ParquetReadSupport.scala   | 88 +++++++++++++++++-----
 .../datasources/parquet/ParquetRowConverter.scala  | 21 ++++--
 .../execution/datasources/SchemaPruningSuite.scala |  2 +-
 4 files changed, 88 insertions(+), 31 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
index efa4f3f..e37f228 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
@@ -311,6 +311,9 @@ class ParquetFileFormat
       SQLConf.SESSION_LOCAL_TIMEZONE.key,
       sparkSession.sessionState.conf.sessionLocalTimeZone)
     hadoopConf.setBoolean(
+      SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
+      sparkSession.sessionState.conf.nestedSchemaPruningEnabled)
+    hadoopConf.setBoolean(
       SQLConf.CASE_SENSITIVE.key,
       sparkSession.sessionState.conf.caseSensitiveAnalysis)
 
@@ -424,11 +427,12 @@ class ParquetFileFormat
       } else {
         logDebug(s"Falling back to parquet-mr")
         // ParquetRecordReader returns UnsafeRow
+        val readSupport = new ParquetReadSupport(convertTz, 
enableVectorizedReader = false)
         val reader = if (pushed.isDefined && enableRecordFilter) {
           val parquetFilter = FilterCompat.get(pushed.get, null)
-          new ParquetRecordReader[UnsafeRow](new 
ParquetReadSupport(convertTz), parquetFilter)
+          new ParquetRecordReader[UnsafeRow](readSupport, parquetFilter)
         } else {
-          new ParquetRecordReader[UnsafeRow](new ParquetReadSupport(convertTz))
+          new ParquetRecordReader[UnsafeRow](readSupport)
         }
         val iter = new RecordReaderIterator(reader)
         // SPARK-23457 Register a task completion lister before 
`initialization`.
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
index 3319e73..df77665 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
@@ -49,15 +49,16 @@ import org.apache.spark.sql.types._
  * Due to this reason, we no longer rely on [[ReadContext]] to pass requested 
schema from [[init()]]
  * to [[prepareForRead()]], but use a private `var` for simplicity.
  */
-private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
-    extends ReadSupport[UnsafeRow] with Logging {
+private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone],
+    enableVectorizedReader: Boolean)
+  extends ReadSupport[UnsafeRow] with Logging {
   private var catalystRequestedSchema: StructType = _
 
   def this() {
     // We need a zero-arg constructor for SpecificParquetRecordReaderBase.  
But that is only
     // used in the vectorized reader, where we get the convertTz value 
directly, and the value here
     // is ignored.
-    this(None)
+    this(None, enableVectorizedReader = true)
   }
 
   /**
@@ -65,18 +66,48 @@ private[parquet] class ParquetReadSupport(val convertTz: 
Option[TimeZone])
    * readers.  Responsible for figuring out Parquet requested schema used for 
column pruning.
    */
   override def init(context: InitContext): ReadContext = {
+    val conf = context.getConfiguration
     catalystRequestedSchema = {
-      val conf = context.getConfiguration
       val schemaString = 
conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
       assert(schemaString != null, "Parquet requested schema not set.")
       StructType.fromString(schemaString)
     }
 
-    val caseSensitive = 
context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key,
+    val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key,
       SQLConf.CASE_SENSITIVE.defaultValue.get)
-    val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema(
-      context.getFileSchema, catalystRequestedSchema, caseSensitive)
-
+    val schemaPruningEnabled = 
conf.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
+      SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get)
+    val parquetFileSchema = context.getFileSchema
+    val parquetClippedSchema = 
ParquetReadSupport.clipParquetSchema(parquetFileSchema,
+      catalystRequestedSchema, caseSensitive)
+
+    // We pass two schema to ParquetRecordMaterializer:
+    // - parquetRequestedSchema: the schema of the file data we want to read
+    // - catalystRequestedSchema: the schema of the rows we want to return
+    // The reader is responsible for reconciling the differences between the 
two.
+    val parquetRequestedSchema = if (schemaPruningEnabled && 
!enableVectorizedReader) {
+      // Parquet-MR reader requires that parquetRequestedSchema include only 
those fields present
+      // in the underlying parquetFileSchema. Therefore, we intersect the 
parquetClippedSchema
+      // with the parquetFileSchema
+      ParquetReadSupport.intersectParquetGroups(parquetClippedSchema, 
parquetFileSchema)
+        .map(groupType => new MessageType(groupType.getName, 
groupType.getFields))
+        .getOrElse(ParquetSchemaConverter.EMPTY_MESSAGE)
+    } else {
+      // Spark's vectorized reader only support atomic types currently. It 
also skip fields
+      // in parquetRequestedSchema which are not present in the file.
+      parquetClippedSchema
+    }
+    logDebug(
+      s"""Going to read the following fields from the Parquet file with the 
following schema:
+         |Parquet file schema:
+         |$parquetFileSchema
+         |Parquet clipped schema:
+         |$parquetClippedSchema
+         |Parquet requested schema:
+         |$parquetRequestedSchema
+         |Catalyst requested schema:
+         |${catalystRequestedSchema.treeString}
+       """.stripMargin)
     new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
   }
 
@@ -90,19 +121,7 @@ private[parquet] class ParquetReadSupport(val convertTz: 
Option[TimeZone])
       keyValueMetaData: JMap[String, String],
       fileSchema: MessageType,
       readContext: ReadContext): RecordMaterializer[UnsafeRow] = {
-    log.debug(s"Preparing for read Parquet file with message type: 
$fileSchema")
     val parquetRequestedSchema = readContext.getRequestedSchema
-
-    logInfo {
-      s"""Going to read the following fields from the Parquet file:
-         |
-         |Parquet form:
-         |$parquetRequestedSchema
-         |Catalyst form:
-         |$catalystRequestedSchema
-       """.stripMargin
-    }
-
     new ParquetRecordMaterializer(
       parquetRequestedSchema,
       ParquetReadSupport.expandUDT(catalystRequestedSchema),
@@ -322,6 +341,35 @@ private[parquet] object ParquetReadSupport {
     }
   }
 
+  /**
+   * Computes the structural intersection between two Parquet group types.
+   * This is used to create a requestedSchema for ReadContext of Parquet-MR 
reader.
+   * Parquet-MR reader does not support the nested field access to 
non-existent field
+   * while parquet library does support to read the non-existent field by 
regular field access.
+   */
+  private def intersectParquetGroups(
+      groupType1: GroupType, groupType2: GroupType): Option[GroupType] = {
+    val fields =
+      groupType1.getFields.asScala
+        .filter(field => groupType2.containsField(field.getName))
+        .flatMap {
+          case field1: GroupType =>
+            val field2 = groupType2.getType(field1.getName)
+            if (field2.isPrimitive) {
+              None
+            } else {
+              intersectParquetGroups(field1, field2.asGroupType)
+            }
+          case field1 => Some(field1)
+        }
+
+    if (fields.nonEmpty) {
+      Some(groupType1.withNewFields(fields.asJava))
+    } else {
+      None
+    }
+  }
+
   def expandUDT(schema: StructType): StructType = {
     def expand(dataType: DataType): DataType = {
       dataType match {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 004a96d..b772b6b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -130,8 +130,8 @@ private[parquet] class ParquetRowConverter(
   extends ParquetGroupConverter(updater) with Logging {
 
   assert(
-    parquetType.getFieldCount == catalystType.length,
-    s"""Field counts of the Parquet schema and the Catalyst schema don't match:
+    parquetType.getFieldCount <= catalystType.length,
+    s"""Field count of the Parquet schema is greater than the field count of 
the Catalyst schema:
        |
        |Parquet schema:
        |$parquetType
@@ -182,10 +182,11 @@ private[parquet] class ParquetRowConverter(
 
   // Converters for each field.
   private val fieldConverters: Array[Converter with HasParentContainerUpdater] 
= {
-    parquetType.getFields.asScala.zip(catalystType).zipWithIndex.map {
-      case ((parquetFieldType, catalystField), ordinal) =>
-        // Converted field value should be set to the `ordinal`-th cell of 
`currentRow`
-        newConverter(parquetFieldType, catalystField.dataType, new 
RowUpdater(currentRow, ordinal))
+    parquetType.getFields.asScala.map { parquetField =>
+      val fieldIndex = catalystType.fieldIndex(parquetField.getName)
+      val catalystField = catalystType(fieldIndex)
+      // Converted field value should be set to the `fieldIndex`-th cell of 
`currentRow`
+      newConverter(parquetField, catalystField.dataType, new 
RowUpdater(currentRow, fieldIndex))
     }.toArray
   }
 
@@ -193,7 +194,7 @@ private[parquet] class ParquetRowConverter(
 
   override def end(): Unit = {
     var i = 0
-    while (i < currentRow.numFields) {
+    while (i < fieldConverters.length) {
       fieldConverters(i).updater.end()
       i += 1
     }
@@ -203,10 +204,14 @@ private[parquet] class ParquetRowConverter(
   override def start(): Unit = {
     var i = 0
     while (i < currentRow.numFields) {
-      fieldConverters(i).updater.start()
       currentRow.setNullAt(i)
       i += 1
     }
+    i = 0
+    while (i < fieldConverters.length) {
+      fieldConverters(i).updater.start()
+      i += 1
+    }
   }
 
   /**
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
index b0314e6..22317fe 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
@@ -135,7 +135,7 @@ abstract class SchemaPruningSuite
       Row("X.", 1) :: Row("Y.", 1) :: Row(null, 2) :: Row(null, 2) :: Nil)
   }
 
-  ignore("partial schema intersection - select missing subfield") {
+  testSchemaPruning("partial schema intersection - select missing subfield") {
     val query = sql("select name.middle, address from contacts where p=2")
     checkScan(query, "struct<name:struct<middle:string>,address:string>")
     checkAnswer(query.orderBy("id"),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to