Github user yucai commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22197#discussion_r213551202
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
    @@ -350,25 +356,38 @@ private[parquet] class ParquetFilters(
       }
     
       /**
    -   * Returns a map from name of the column to the data type, if predicate 
push down applies.
    +   * Returns a map, which contains parquet field name and data type, if 
predicate push down applies.
        */
    -  private def getFieldMap(dataType: MessageType): Map[String, 
ParquetSchemaType] = dataType match {
    -    case m: MessageType =>
    -      // Here we don't flatten the fields in the nested schema but just 
look up through
    -      // root fields. Currently, accessing to nested fields does not push 
down filters
    -      // and it does not support to create filters for them.
    -      
m.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f =>
    -        f.getName -> ParquetSchemaType(
    -          f.getOriginalType, f.getPrimitiveTypeName, f.getTypeLength, 
f.getDecimalMetadata)
    -      }.toMap
    -    case _ => Map.empty[String, ParquetSchemaType]
    +  private def getFieldMap(dataType: MessageType): Map[String, 
ParquetField] = {
    +    // Here we don't flatten the fields in the nested schema but just look 
up through
    +    // root fields. Currently, accessing to nested fields does not push 
down filters
    +    // and it does not support to create filters for them.
    +    val primitiveFields =
    +      
dataType.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { 
f =>
    +        f.getName -> ParquetField(f.getName,
    +          ParquetSchemaType(f.getOriginalType,
    +            f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata))
    +      }
    +    if (caseSensitive) {
    +      primitiveFields.toMap
    +    } else {
    +      // Don't consider ambiguity here, i.e. more than one field is 
matched in case insensitive
    +      // mode, just skip pushdown for these fields, they will trigger 
Exception when reading,
    +      // See: SPARK-25132.
    --- End diff --
    
    @cloud-fan, it is a great idea, thanks!
    I think it is not to "dedup" before pushdown and pruning.
    Maybe we should do parquet schema clip before pushdown and pruning.
    If duplicated fields are detected, throw the exception.
    If not, pass clipped parquet schema via hadoopconf to parquet lib.
    ```
        catalystRequestedSchema = {
          val conf = context.getConfiguration
          val schemaString = 
conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA)
          assert(schemaString != null, "Parquet requested schema not set.")
          StructType.fromString(schemaString)
        }
    
        val caseSensitive = 
context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key,
          SQLConf.CASE_SENSITIVE.defaultValue.get)
        val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema(
          context.getFileSchema, catalystRequestedSchema, caseSensitive)
    ```
    I am trying this way, will update soon.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to