Github user wangyum commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21696#discussion_r199999002
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
    @@ -19,166 +19,186 @@ package 
org.apache.spark.sql.execution.datasources.parquet
     
     import java.sql.Date
     
    +import scala.collection.JavaConverters._
    +
     import org.apache.parquet.filter2.predicate._
     import org.apache.parquet.filter2.predicate.FilterApi._
     import org.apache.parquet.io.api.Binary
    -import org.apache.parquet.schema.PrimitiveComparator
    +import org.apache.parquet.schema._
    +import org.apache.parquet.schema.OriginalType._
    +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
     
     import org.apache.spark.sql.catalyst.util.DateTimeUtils
     import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
     import org.apache.spark.sql.sources
    -import org.apache.spark.sql.types._
     import org.apache.spark.unsafe.types.UTF8String
     
     /**
      * Some utility function to convert Spark data source filters to Parquet 
filters.
      */
     private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
     
    +  case class ParquetSchemaType(
    +      originalType: OriginalType,
    +      primitiveTypeName: PrimitiveType.PrimitiveTypeName,
    +      decimalMetadata: DecimalMetadata)
    +
       private def dateToDays(date: Date): SQLDate = {
         DateTimeUtils.fromJavaDate(date)
       }
     
    -  private val makeEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
    -    case BooleanType =>
    +  private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
    +    // BooleanType
    +    case ParquetSchemaType(null, BOOLEAN, null) =>
           (n: String, v: Any) => FilterApi.eq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
    -    case IntegerType =>
    +    // IntegerType
    +    case ParquetSchemaType(null, INT32, null) =>
           (n: String, v: Any) => FilterApi.eq(intColumn(n), 
v.asInstanceOf[Integer])
    -    case LongType =>
    +    // LongType
    +    case ParquetSchemaType(null, INT64, null) =>
           (n: String, v: Any) => FilterApi.eq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
    -    case FloatType =>
    +    // FloatType
    +    case ParquetSchemaType(null, FLOAT, null) =>
           (n: String, v: Any) => FilterApi.eq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
    -    case DoubleType =>
    +    // DoubleType
    +    case ParquetSchemaType(null, DOUBLE, null) =>
           (n: String, v: Any) => FilterApi.eq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
     
    +    // StringType
         // Binary.fromString and Binary.fromByteArray don't accept null values
    -    case StringType =>
    +    case ParquetSchemaType(UTF8, BINARY, null) =>
           (n: String, v: Any) => FilterApi.eq(
             binaryColumn(n),
             Option(v).map(s => 
Binary.fromString(s.asInstanceOf[String])).orNull)
    -    case BinaryType =>
    +    // BinaryType
    +    case ParquetSchemaType(null, BINARY, null) =>
           (n: String, v: Any) => FilterApi.eq(
             binaryColumn(n),
             Option(v).map(b => 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
    -    case DateType if pushDownDate =>
    +    // DateType
    +    case ParquetSchemaType(DATE, INT32, null) if pushDownDate =>
           (n: String, v: Any) => FilterApi.eq(
             intColumn(n),
             Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
       }
     
    -  private val makeNotEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
    -    case BooleanType =>
    +  private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) 
=> FilterPredicate] = {
    +    case ParquetSchemaType(null, BOOLEAN, null) =>
           (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
    -    case IntegerType =>
    +    case ParquetSchemaType(null, INT32, null) =>
           (n: String, v: Any) => FilterApi.notEq(intColumn(n), 
v.asInstanceOf[Integer])
    -    case LongType =>
    +    case ParquetSchemaType(null, INT64, null) =>
           (n: String, v: Any) => FilterApi.notEq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
    -    case FloatType =>
    +    case ParquetSchemaType(null, FLOAT, null) =>
           (n: String, v: Any) => FilterApi.notEq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
    -    case DoubleType =>
    +    case ParquetSchemaType(null, DOUBLE, null) =>
           (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
     
    -    case StringType =>
    +    case ParquetSchemaType(UTF8, BINARY, null) =>
           (n: String, v: Any) => FilterApi.notEq(
             binaryColumn(n),
             Option(v).map(s => 
Binary.fromString(s.asInstanceOf[String])).orNull)
    -    case BinaryType =>
    +    case ParquetSchemaType(null, BINARY, null) =>
           (n: String, v: Any) => FilterApi.notEq(
             binaryColumn(n),
             Option(v).map(b => 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
    -    case DateType if pushDownDate =>
    +    case ParquetSchemaType(DATE, INT32, null) if pushDownDate =>
           (n: String, v: Any) => FilterApi.notEq(
             intColumn(n),
             Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
       }
     
    -  private val makeLt: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
    -    case IntegerType =>
    +  private val makeLt: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
    +    case ParquetSchemaType(null, INT32, null) =>
           (n: String, v: Any) => FilterApi.lt(intColumn(n), 
v.asInstanceOf[Integer])
    -    case LongType =>
    +    case ParquetSchemaType(null, INT64, null) =>
           (n: String, v: Any) => FilterApi.lt(longColumn(n), 
v.asInstanceOf[java.lang.Long])
    -    case FloatType =>
    +    case ParquetSchemaType(null, FLOAT, null) =>
           (n: String, v: Any) => FilterApi.lt(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
    -    case DoubleType =>
    +    case ParquetSchemaType(null, DOUBLE, null) =>
           (n: String, v: Any) => FilterApi.lt(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
     
    -    case StringType =>
    -      (n: String, v: Any) =>
    -        FilterApi.lt(binaryColumn(n),
    -          Binary.fromString(v.asInstanceOf[String]))
    -    case BinaryType =>
    -      (n: String, v: Any) =>
    -        FilterApi.lt(binaryColumn(n), 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
    -    case DateType if pushDownDate =>
    +    case ParquetSchemaType(UTF8, BINARY, null) =>
    +      (n: String, v: Any) => FilterApi.lt(
    +        binaryColumn(n),
    +        Option(v).map(s => 
Binary.fromString(s.asInstanceOf[String])).orNull)
    --- End diff --
    
    oh, sorry. I copy `Option(v).map(s => 
Binary.fromString(s.asInstanceOf[String])).orNull)` from `makeEq`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to