Github user yucai commented on a diff in the pull request: https://github.com/apache/spark/pull/22197#discussion_r213551202 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -350,25 +356,38 @@ private[parquet] class ParquetFilters( } /** - * Returns a map from name of the column to the data type, if predicate push down applies. + * Returns a map, which contains parquet field name and data type, if predicate push down applies. */ - private def getFieldMap(dataType: MessageType): Map[String, ParquetSchemaType] = dataType match { - case m: MessageType => - // Here we don't flatten the fields in the nested schema but just look up through - // root fields. Currently, accessing to nested fields does not push down filters - // and it does not support to create filters for them. - m.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f => - f.getName -> ParquetSchemaType( - f.getOriginalType, f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata) - }.toMap - case _ => Map.empty[String, ParquetSchemaType] + private def getFieldMap(dataType: MessageType): Map[String, ParquetField] = { + // Here we don't flatten the fields in the nested schema but just look up through + // root fields. Currently, accessing to nested fields does not push down filters + // and it does not support to create filters for them. + val primitiveFields = + dataType.getFields.asScala.filter(_.isPrimitive).map(_.asPrimitiveType()).map { f => + f.getName -> ParquetField(f.getName, + ParquetSchemaType(f.getOriginalType, + f.getPrimitiveTypeName, f.getTypeLength, f.getDecimalMetadata)) + } + if (caseSensitive) { + primitiveFields.toMap + } else { + // Don't consider ambiguity here, i.e. more than one field is matched in case insensitive + // mode, just skip pushdown for these fields, they will trigger Exception when reading, + // See: SPARK-25132. --- End diff -- @cloud-fan, it is a great idea, thanks! I think it is not to "dedup" before pushdown and pruning. Maybe we should do parquet schema clip before pushdown and pruning. If duplicated fields are detected, throw the exception. If not, pass clipped parquet schema via hadoopconf to parquet lib. ``` catalystRequestedSchema = { val conf = context.getConfiguration val schemaString = conf.get(ParquetReadSupport.SPARK_ROW_REQUESTED_SCHEMA) assert(schemaString != null, "Parquet requested schema not set.") StructType.fromString(schemaString) } val caseSensitive = context.getConfiguration.getBoolean(SQLConf.CASE_SENSITIVE.key, SQLConf.CASE_SENSITIVE.defaultValue.get) val parquetRequestedSchema = ParquetReadSupport.clipParquetSchema( context.getFileSchema, catalystRequestedSchema, caseSensitive) ``` I am trying this way, will update soon.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org