Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22880#discussion_r229530694
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
    @@ -49,34 +49,82 @@ import org.apache.spark.sql.types._
      * Due to this reason, we no longer rely on [[ReadContext]] to pass 
requested schema from [[init()]]
      * to [[prepareForRead()]], but use a private `var` for simplicity.
      */
    -private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone])
    +private[parquet] class ParquetReadSupport(val convertTz: Option[TimeZone],
    +  usingVectorizedReader: Boolean)
         extends ReadSupport[UnsafeRow] with Logging {
       private var catalystRequestedSchema: StructType = _
     
       def this() {
         // We need a zero-arg constructor for SpecificParquetRecordReaderBase. 
 But that is only
         // used in the vectorized reader, where we get the convertTz value 
directly, and the value here
         // is ignored.
    -    this(None)
    +    this(None, usingVectorizedReader = true)
       }
     
       /**
        * Called on executor side before [[prepareForRead()]] and instantiating 
actual Parquet record
        * readers.  Responsible for figuring out Parquet requested schema used 
for column pruning.
        */
       override def init(context: InitContext): ReadContext = {
    +    val conf = context.getConfiguration
         catalystRequestedSchema = {
    -      val conf = context.getConfiguration
           val schemaString = 
conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
           assert(schemaString != null, "Parquet requested schema not set.")
           StructType.fromString(schemaString)
         }
     
    -    val caseSensitive = 
context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key,
    +    val schemaPruningEnabled = 
conf.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
    +      SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get)
    +    val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key,
           SQLConf.CASE_SENSITIVE.defaultValue.get)
    -    val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema(
    -      context.getFileSchema, catalystRequestedSchema, caseSensitive)
    -
    +    val parquetFileSchema = context.getFileSchema
    +    val parquetClippedSchema = 
ParquetReadSupport.clipParquetSchema(parquetFileSchema,
    +      catalystRequestedSchema, caseSensitive)
    +
    +    // As part of schema clipping, we add fields in 
catalystRequestedSchema which are missing
    +    // from parquetFileSchema to parquetClippedSchema. However, nested 
schema pruning requires
    +    // we ignore unrequested field data when reading from a Parquet file. 
Therefore we pass two
    +    // schema to ParquetRecordMaterializer: the schema of the file data we 
want to read
    +    // (parquetRequestedSchema), and the schema of the rows we want to 
return
    +    // (catalystRequestedSchema). The reader is responsible for 
reconciling the differences between
    +    // the two.
    +    //
    +    // Aside from checking whether schema pruning is enabled 
(schemaPruningEnabled), there
    +    // is an additional complication to constructing 
parquetRequestedSchema. The manner in which
    +    // Spark's two Parquet readers reconcile the differences between 
parquetRequestedSchema and
    +    // catalystRequestedSchema differ. Spark's vectorized reader does not 
(currently) support
    +    // reading Parquet files with complex types in their schema. Further, 
it assumes that
    +    // parquetRequestedSchema includes all fields requested in 
catalystRequestedSchema. It includes
    +    // logic in its read path to skip fields in parquetRequestedSchema 
which are not present in the
    +    // file.
    +    //
    +    // Spark's parquet-mr based reader supports reading Parquet files of 
any kind of complex
    +    // schema, and it supports nested schema pruning as well. Unlike the 
vectorized reader, the
    +    // parquet-mr reader requires that parquetRequestedSchema include only 
those fields present in
    +    // the underlying parquetFileSchema. Therefore, in the case where we 
use the parquet-mr reader
    +    // we intersect the parquetClippedSchema with the parquetFileSchema to 
construct the
    +    // parquetRequestedSchema set in the ReadContext.
    --- End diff --
    
    Ok. I see.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to