Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22197#discussion_r213906227
  
    --- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 ---
    @@ -1021,6 +1022,113 @@ class ParquetFilterSuite extends QueryTest with 
ParquetTest with SharedSQLContex
           }
         }
       }
    +
    +  test("SPARK-25207: Case-insensitive field resolution for pushdown when 
reading parquet") {
    +    def createParquetFilter(caseSensitive: Boolean): ParquetFilters = {
    +      new ParquetFilters(conf.parquetFilterPushDownDate, 
conf.parquetFilterPushDownTimestamp,
    +        conf.parquetFilterPushDownDecimal, 
conf.parquetFilterPushDownStringStartWith,
    +        conf.parquetFilterPushDownInFilterThreshold, caseSensitive)
    +    }
    +    val caseSensitiveParquetFilters = createParquetFilter(caseSensitive = 
true)
    +    val caseInsensitiveParquetFilters = createParquetFilter(caseSensitive 
= false)
    +
    +    def testCaseInsensitiveResolution(
    +        schema: StructType,
    +        expected: FilterPredicate,
    +        filter: sources.Filter): Unit = {
    +      val parquetSchema = new 
SparkToParquetSchemaConverter(conf).convert(schema)
    +
    +      assertResult(Some(expected)) {
    +        caseInsensitiveParquetFilters.createFilter(parquetSchema, filter)
    +      }
    +      assertResult(None) {
    +        caseSensitiveParquetFilters.createFilter(parquetSchema, filter)
    +      }
    +    }
    +
    +    val schema = StructType(Seq(StructField("cint", IntegerType)))
    +
    +    testCaseInsensitiveResolution(
    +      schema, FilterApi.eq(intColumn("cint"), null.asInstanceOf[Integer]), 
sources.IsNull("CINT"))
    +
    +    testCaseInsensitiveResolution(
    +      schema,
    +      FilterApi.notEq(intColumn("cint"), null.asInstanceOf[Integer]),
    +      sources.IsNotNull("CINT"))
    +
    +    testCaseInsensitiveResolution(
    +      schema, FilterApi.eq(intColumn("cint"), 1000: Integer), 
sources.EqualTo("CINT", 1000))
    +
    +    testCaseInsensitiveResolution(
    +      schema,
    +      FilterApi.notEq(intColumn("cint"), 1000: Integer),
    +      sources.Not(sources.EqualTo("CINT", 1000)))
    +
    +    testCaseInsensitiveResolution(
    +      schema, FilterApi.eq(intColumn("cint"), 1000: Integer), 
sources.EqualNullSafe("CINT", 1000))
    +
    +    testCaseInsensitiveResolution(
    +      schema,
    +      FilterApi.notEq(intColumn("cint"), 1000: Integer),
    +      sources.Not(sources.EqualNullSafe("CINT", 1000)))
    +
    +    testCaseInsensitiveResolution(
    +      schema,
    +      FilterApi.lt(intColumn("cint"), 1000: Integer), 
sources.LessThan("CINT", 1000))
    +
    +    testCaseInsensitiveResolution(
    +      schema,
    +      FilterApi.ltEq(intColumn("cint"), 1000: Integer),
    +      sources.LessThanOrEqual("CINT", 1000))
    +
    +    testCaseInsensitiveResolution(
    +      schema, FilterApi.gt(intColumn("cint"), 1000: Integer), 
sources.GreaterThan("CINT", 1000))
    +
    +    testCaseInsensitiveResolution(
    +      schema,
    +      FilterApi.gtEq(intColumn("cint"), 1000: Integer),
    +      sources.GreaterThanOrEqual("CINT", 1000))
    +
    +    testCaseInsensitiveResolution(
    +      schema,
    +      FilterApi.or(
    +        FilterApi.eq(intColumn("cint"), 10: Integer),
    +        FilterApi.eq(intColumn("cint"), 20: Integer)),
    +      sources.In("CINT", Array(10, 20)))
    +
    +    val dupFieldSchema = StructType(
    +      Seq(StructField("cint", IntegerType), StructField("cINT", 
IntegerType)))
    +    val dupParquetSchema = new 
SparkToParquetSchemaConverter(conf).convert(dupFieldSchema)
    +    assertResult(None) {
    +      caseInsensitiveParquetFilters.createFilter(
    +        dupParquetSchema, sources.EqualTo("CINT", 1000))
    +    }
    +  }
    +
    +  test("SPARK-25207: exception when duplicate fields in case-insensitive 
mode") {
    +    withTempPath { dir =>
    +      val tableName = "spark_25207"
    +      val tableDir = dir.getAbsoluteFile + "/table"
    +      withTable(tableName) {
    +        withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
    +          spark.range(10).selectExpr("id as A", "id as B", "id as b")
    +            .write.mode("overwrite").parquet(tableDir)
    +        }
    +        sql(
    +          s"""
    +             |CREATE TABLE $tableName (A LONG, B LONG) USING PARQUET 
LOCATION '$tableDir'
    +           """.stripMargin)
    +
    +        withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
    +          val e = intercept[SparkException] {
    +            sql(s"select a from $tableName where b > 0").collect()
    --- End diff --
    
    can we read this table with case-sensitive mode?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to