Github user wangyum commented on a diff in the pull request: https://github.com/apache/spark/pull/21696#discussion_r199999002 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala --- @@ -19,166 +19,186 @@ package org.apache.spark.sql.execution.datasources.parquet import java.sql.Date +import scala.collection.JavaConverters._ + import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.FilterApi._ import org.apache.parquet.io.api.Binary -import org.apache.parquet.schema.PrimitiveComparator +import org.apache.parquet.schema._ +import org.apache.parquet.schema.OriginalType._ +import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLDate import org.apache.spark.sql.sources -import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String /** * Some utility function to convert Spark data source filters to Parquet filters. */ private[parquet] class ParquetFilters(pushDownDate: Boolean, pushDownStartWith: Boolean) { + case class ParquetSchemaType( + originalType: OriginalType, + primitiveTypeName: PrimitiveType.PrimitiveTypeName, + decimalMetadata: DecimalMetadata) + private def dateToDays(date: Date): SQLDate = { DateTimeUtils.fromJavaDate(date) } - private val makeEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { - case BooleanType => + private val makeEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { + // BooleanType + case ParquetSchemaType(null, BOOLEAN, null) => (n: String, v: Any) => FilterApi.eq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) - case IntegerType => + // IntegerType + case ParquetSchemaType(null, INT32, null) => (n: String, v: Any) => FilterApi.eq(intColumn(n), v.asInstanceOf[Integer]) - case LongType => + // LongType + case ParquetSchemaType(null, INT64, null) => (n: String, v: Any) => FilterApi.eq(longColumn(n), v.asInstanceOf[java.lang.Long]) - case FloatType => + // FloatType + case ParquetSchemaType(null, FLOAT, null) => (n: String, v: Any) => FilterApi.eq(floatColumn(n), v.asInstanceOf[java.lang.Float]) - case DoubleType => + // DoubleType + case ParquetSchemaType(null, DOUBLE, null) => (n: String, v: Any) => FilterApi.eq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) + // StringType // Binary.fromString and Binary.fromByteArray don't accept null values - case StringType => + case ParquetSchemaType(UTF8, BINARY, null) => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull) - case BinaryType => + // BinaryType + case ParquetSchemaType(null, BINARY, null) => (n: String, v: Any) => FilterApi.eq( binaryColumn(n), Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) - case DateType if pushDownDate => + // DateType + case ParquetSchemaType(DATE, INT32, null) if pushDownDate => (n: String, v: Any) => FilterApi.eq( intColumn(n), Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull) } - private val makeNotEq: PartialFunction[DataType, (String, Any) => FilterPredicate] = { - case BooleanType => + private val makeNotEq: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { + case ParquetSchemaType(null, BOOLEAN, null) => (n: String, v: Any) => FilterApi.notEq(booleanColumn(n), v.asInstanceOf[java.lang.Boolean]) - case IntegerType => + case ParquetSchemaType(null, INT32, null) => (n: String, v: Any) => FilterApi.notEq(intColumn(n), v.asInstanceOf[Integer]) - case LongType => + case ParquetSchemaType(null, INT64, null) => (n: String, v: Any) => FilterApi.notEq(longColumn(n), v.asInstanceOf[java.lang.Long]) - case FloatType => + case ParquetSchemaType(null, FLOAT, null) => (n: String, v: Any) => FilterApi.notEq(floatColumn(n), v.asInstanceOf[java.lang.Float]) - case DoubleType => + case ParquetSchemaType(null, DOUBLE, null) => (n: String, v: Any) => FilterApi.notEq(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - case StringType => + case ParquetSchemaType(UTF8, BINARY, null) => (n: String, v: Any) => FilterApi.notEq( binaryColumn(n), Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull) - case BinaryType => + case ParquetSchemaType(null, BINARY, null) => (n: String, v: Any) => FilterApi.notEq( binaryColumn(n), Option(v).map(b => Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])).orNull) - case DateType if pushDownDate => + case ParquetSchemaType(DATE, INT32, null) if pushDownDate => (n: String, v: Any) => FilterApi.notEq( intColumn(n), Option(v).map(date => dateToDays(date.asInstanceOf[Date]).asInstanceOf[Integer]).orNull) } - private val makeLt: PartialFunction[DataType, (String, Any) => FilterPredicate] = { - case IntegerType => + private val makeLt: PartialFunction[ParquetSchemaType, (String, Any) => FilterPredicate] = { + case ParquetSchemaType(null, INT32, null) => (n: String, v: Any) => FilterApi.lt(intColumn(n), v.asInstanceOf[Integer]) - case LongType => + case ParquetSchemaType(null, INT64, null) => (n: String, v: Any) => FilterApi.lt(longColumn(n), v.asInstanceOf[java.lang.Long]) - case FloatType => + case ParquetSchemaType(null, FLOAT, null) => (n: String, v: Any) => FilterApi.lt(floatColumn(n), v.asInstanceOf[java.lang.Float]) - case DoubleType => + case ParquetSchemaType(null, DOUBLE, null) => (n: String, v: Any) => FilterApi.lt(doubleColumn(n), v.asInstanceOf[java.lang.Double]) - case StringType => - (n: String, v: Any) => - FilterApi.lt(binaryColumn(n), - Binary.fromString(v.asInstanceOf[String])) - case BinaryType => - (n: String, v: Any) => - FilterApi.lt(binaryColumn(n), Binary.fromReusedByteArray(v.asInstanceOf[Array[Byte]])) - case DateType if pushDownDate => + case ParquetSchemaType(UTF8, BINARY, null) => + (n: String, v: Any) => FilterApi.lt( + binaryColumn(n), + Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull) --- End diff -- oh, sorry. I copy `Option(v).map(s => Binary.fromString(s.asInstanceOf[String])).orNull)` from `makeEq`.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org