Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22148#discussion_r211149160
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
 ---
    @@ -277,14 +291,35 @@ private[parquet] object ParquetReadSupport {
        * @return A list of clipped [[GroupType]] fields, which can be empty.
        */
       private def clipParquetGroupFields(
    -      parquetRecord: GroupType, structType: StructType): Seq[Type] = {
    -    val parquetFieldMap = parquetRecord.getFields.asScala.map(f => 
f.getName -> f).toMap
    +      parquetRecord: GroupType, structType: StructType, caseSensitive: 
Boolean): Seq[Type] = {
         val toParquet = new 
SparkToParquetSchemaConverter(writeLegacyParquetFormat = false)
    -    structType.map { f =>
    -      parquetFieldMap
    -        .get(f.name)
    -        .map(clipParquetType(_, f.dataType))
    -        .getOrElse(toParquet.convertField(f))
    +    if (caseSensitive) {
    +      val caseSensitiveParquetFieldMap =
    +        parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
    +      structType.map { f =>
    +        caseSensitiveParquetFieldMap
    +          .get(f.name)
    +          .map(clipParquetType(_, f.dataType, caseSensitive))
    +          .getOrElse(toParquet.convertField(f))
    +      }
    +    } else {
    +      // Do case-insensitive resolution only if in case-insensitive mode
    +      val caseInsensitiveParquetFieldMap =
    +        parquetRecord.getFields.asScala.groupBy(_.getName.toLowerCase)
    +      structType.map { f =>
    +        caseInsensitiveParquetFieldMap
    +          .get(f.name.toLowerCase)
    +          .map { parquetTypes =>
    +            if (parquetTypes.size > 1) {
    +              // Need to fail if there is ambiguity, i.e. more than one 
field is matched
    +              val parquetTypesString = 
parquetTypes.map(_.getName).mkString("[", ", ", "]")
    +              throw new AnalysisException(s"""Found duplicate field(s) 
"${f.name}": """ +
    --- End diff --
    
    This is trigger at runtime at executor side, we should probably use 
`RuntimeException` here.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to