Github user wangyum commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21696#discussion_r200011264
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 ---
    @@ -19,187 +19,200 @@ package 
org.apache.spark.sql.execution.datasources.parquet
     
     import java.sql.Date
     
    +import scala.collection.JavaConverters.asScalaBufferConverter
    +
     import org.apache.parquet.filter2.predicate._
     import org.apache.parquet.filter2.predicate.FilterApi._
     import org.apache.parquet.io.api.Binary
    -import org.apache.parquet.schema.PrimitiveComparator
    +import org.apache.parquet.schema.{DecimalMetadata, MessageType, 
OriginalType, PrimitiveComparator, PrimitiveType}
    +import org.apache.parquet.schema.OriginalType._
    +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
    +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
     
     import org.apache.spark.sql.catalyst.util.DateTimeUtils
     import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate
     import org.apache.spark.sql.sources
    -import org.apache.spark.sql.types._
     import org.apache.spark.unsafe.types.UTF8String
     
     /**
      * Some utility function to convert Spark data source filters to Parquet 
filters.
      */
     private[parquet] class ParquetFilters(pushDownDate: Boolean, 
pushDownStartWith: Boolean) {
     
    +  private case class ParquetSchemaType(
    +      originalType: OriginalType,
    +      primitiveTypeName: PrimitiveTypeName,
    +      decimalMetadata: DecimalMetadata)
    +
    +  private val ParquetBooleanType = ParquetSchemaType(null, BOOLEAN, null)
    +  private val ParquetIntegerType = ParquetSchemaType(null, INT32, null)
    +  private val ParquetLongType = ParquetSchemaType(null, INT64, null)
    +  private val ParquetFloatType = ParquetSchemaType(null, FLOAT, null)
    +  private val ParquetDoubleType = ParquetSchemaType(null, DOUBLE, null)
    +  private val ParquetStringType = ParquetSchemaType(UTF8, BINARY, null)
    +  private val ParquetBinaryType = ParquetSchemaType(null, BINARY, null)
    +  private val ParquetDateType = ParquetSchemaType(DATE, INT32, null)
    +
       private def dateToDays(date: Date): SQLDate = {
         DateTimeUtils.fromJavaDate(date)
       }
     
    -  private val makeEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
    -    case BooleanType =>
    +  private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
    +    case ParquetBooleanType =>
           (n: String, v: Any) => FilterApi.eq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
    -    case IntegerType =>
    +    case ParquetIntegerType =>
           (n: String, v: Any) => FilterApi.eq(intColumn(n), 
v.asInstanceOf[Integer])
    -    case LongType =>
    +    case ParquetLongType =>
           (n: String, v: Any) => FilterApi.eq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
    -    case FloatType =>
    +    case ParquetFloatType =>
           (n: String, v: Any) => FilterApi.eq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
    -    case DoubleType =>
    +    case ParquetDoubleType =>
           (n: String, v: Any) => FilterApi.eq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
     
         // Binary.fromString and Binary.fromByteArray don't accept null values
    -    case StringType =>
    +    case ParquetStringType =>
           (n: String, v: Any) => FilterApi.eq(
             binaryColumn(n),
             Option(v).map(s => 
Binary.fromString(s.asInstanceOf[String])).orNull)
    -    case BinaryType =>
    +    case ParquetBinaryType =>
           (n: String, v: Any) => FilterApi.eq(
             binaryColumn(n),
             Option(v).map(b => 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
    -    case DateType if pushDownDate =>
    +    case ParquetDateType if pushDownDate =>
           (n: String, v: Any) => FilterApi.eq(
             intColumn(n),
             Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
       }
     
    -  private val makeNotEq: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
    -    case BooleanType =>
    +  private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) 
=> FilterPredicate] = {
    +    case ParquetBooleanType =>
           (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), 
v.asInstanceOf[java.lang.Boolean])
    -    case IntegerType =>
    +    case ParquetIntegerType =>
           (n: String, v: Any) => FilterApi.notEq(intColumn(n), 
v.asInstanceOf[Integer])
    -    case LongType =>
    +    case ParquetLongType =>
           (n: String, v: Any) => FilterApi.notEq(longColumn(n), 
v.asInstanceOf[java.lang.Long])
    -    case FloatType =>
    +    case ParquetFloatType =>
           (n: String, v: Any) => FilterApi.notEq(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
    -    case DoubleType =>
    +    case ParquetDoubleType =>
           (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
     
    -    case StringType =>
    +    case ParquetStringType =>
           (n: String, v: Any) => FilterApi.notEq(
             binaryColumn(n),
             Option(v).map(s => 
Binary.fromString(s.asInstanceOf[String])).orNull)
    -    case BinaryType =>
    +    case ParquetBinaryType =>
           (n: String, v: Any) => FilterApi.notEq(
             binaryColumn(n),
             Option(v).map(b => 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull)
    -    case DateType if pushDownDate =>
    +    case ParquetDateType if pushDownDate =>
           (n: String, v: Any) => FilterApi.notEq(
             intColumn(n),
             Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
       }
     
    -  private val makeLt: PartialFunction[DataType, (String, Any) => 
FilterPredicate] = {
    -    case IntegerType =>
    +  private val makeLt: PartialFunction[ParquetSchemaType, (String, Any) => 
FilterPredicate] = {
    +    case ParquetIntegerType =>
           (n: String, v: Any) => FilterApi.lt(intColumn(n), 
v.asInstanceOf[Integer])
    -    case LongType =>
    +    case ParquetLongType =>
           (n: String, v: Any) => FilterApi.lt(longColumn(n), 
v.asInstanceOf[java.lang.Long])
    -    case FloatType =>
    +    case ParquetFloatType =>
           (n: String, v: Any) => FilterApi.lt(floatColumn(n), 
v.asInstanceOf[java.lang.Float])
    -    case DoubleType =>
    +    case ParquetDoubleType =>
           (n: String, v: Any) => FilterApi.lt(doubleColumn(n), 
v.asInstanceOf[java.lang.Double])
     
    -    case StringType =>
    +    case ParquetStringType =>
           (n: String, v: Any) =>
    -        FilterApi.lt(binaryColumn(n),
    -          Binary.fromString(v.asInstanceOf[String]))
    -    case BinaryType =>
    +        FilterApi.lt(binaryColumn(n), 
Binary.fromString(v.asInstanceOf[String]))
    +    case ParquetBinaryType =>
           (n: String, v: Any) =>
             FilterApi.lt(binaryColumn(n), 
Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]]))
    -    case DateType if pushDownDate =>
    -      (n: String, v: Any) => FilterApi.lt(
    -        intColumn(n),
    -        Option(v).map(date => 
dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull)
    +    case ParquetDateType if pushDownDate =>
    +      (n: String, v: Any) =>
    +        FilterApi.lt(intColumn(n), 
dateToDays(v.asInstanceOf[Date]).asInstanceOf[Integer])
    --- End diff --
    
    `v` can't be `null` here, remove `Option(v).map`.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to