This is an automated email from the ASF dual-hosted git repository. tgraves pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new 24818bf4c9d [SPARK-40280][SQL] Add support for parquet push down for annotated int and long 24818bf4c9d is described below commit 24818bf4c9d410259fab9ab63c407bedb7a932d1 Author: Robert (Bobby) Evans <bo...@apache.org> AuthorDate: Thu Sep 8 08:54:28 2022 -0500 [SPARK-40280][SQL] Add support for parquet push down for annotated int and long ### What changes were proposed in this pull request? This fixes SPARK-40280 by normalizing a parquet int/long that has optional metadata with it to look like the expected version that does not have the extra metadata. ## Why are the changes needed? This allows predicate push down in parquet to work when reading files that are complaint with the parquet specification, but different from what Spark writes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? I added unit tests that cover this use case. I also did some manual testing on some queries to verify that less data is actually read after this change. Closes #37747 from revans2/normalize_int_long_parquet_push. Authored-by: Robert (Bobby) Evans <bo...@apache.org> Signed-off-by: Thomas Graves <tgra...@apache.org> (cherry picked from commit 24b3baf0177fc1446bf59bb34987296aefd4b318) Signed-off-by: Thomas Graves <tgra...@apache.org> --- .../datasources/parquet/ParquetFilters.scala | 16 +++- .../test/resources/test-data/tagged_int.parquet | Bin 0 -> 305 bytes .../test/resources/test-data/tagged_long.parquet | Bin 0 -> 313 bytes .../datasources/parquet/ParquetFilterSuite.scala | 82 +++++++++++++++++++++ 4 files changed, 96 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index d1b4506a0e4..e2cc3698a47 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -30,7 +30,7 @@ import org.apache.parquet.filter2.predicate._ import org.apache.parquet.filter2.predicate.SparkFilterApi._ import org.apache.parquet.io.api.Binary import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, MessageType, PrimitiveComparator, PrimitiveType, Type} -import org.apache.parquet.schema.LogicalTypeAnnotation.{DecimalLogicalTypeAnnotation, TimeUnit} +import org.apache.parquet.schema.LogicalTypeAnnotation.{DecimalLogicalTypeAnnotation, IntLogicalTypeAnnotation, TimeUnit} import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._ import org.apache.parquet.schema.Type.Repetition @@ -59,6 +59,18 @@ class ParquetFilters( // nested columns. If any part of the names contains `dots`, it is quoted to avoid confusion. // See `org.apache.spark.sql.connector.catalog.quote` for implementation details. private val nameToParquetField : Map[String, ParquetPrimitiveField] = { + def getNormalizedLogicalType(p: PrimitiveType): LogicalTypeAnnotation = { + // SPARK-40280: Signed 64 bits on an INT64 and signed 32 bits on an INT32 are optional, but + // the rest of the code here assumes they are not set, so normalize them to not being set. + (p.getPrimitiveTypeName, p.getLogicalTypeAnnotation) match { + case (INT32, intType: IntLogicalTypeAnnotation) + if intType.getBitWidth() == 32 && intType.isSigned() => null + case (INT64, intType: IntLogicalTypeAnnotation) + if intType.getBitWidth() == 64 && intType.isSigned() => null + case (_, otherType) => otherType + } + } + // Recursively traverse the parquet schema to get primitive fields that can be pushed-down. // `parentFieldNames` is used to keep track of the current nested level when traversing. def getPrimitiveFields( @@ -70,7 +82,7 @@ class ParquetFilters( // repeated columns (https://issues.apache.org/jira/browse/PARQUET-34) case p: PrimitiveType if p.getRepetition != Repetition.REPEATED => Some(ParquetPrimitiveField(fieldNames = parentFieldNames :+ p.getName, - fieldType = ParquetSchemaType(p.getLogicalTypeAnnotation, + fieldType = ParquetSchemaType(getNormalizedLogicalType(p), p.getPrimitiveTypeName, p.getTypeLength))) // Note that when g is a `Struct`, `g.getOriginalType` is `null`. // When g is a `Map`, `g.getOriginalType` is `MAP`. diff --git a/sql/core/src/test/resources/test-data/tagged_int.parquet b/sql/core/src/test/resources/test-data/tagged_int.parquet new file mode 100644 index 00000000000..840f5dbdb93 Binary files /dev/null and b/sql/core/src/test/resources/test-data/tagged_int.parquet differ diff --git a/sql/core/src/test/resources/test-data/tagged_long.parquet b/sql/core/src/test/resources/test-data/tagged_long.parquet new file mode 100644 index 00000000000..3ff9450d834 Binary files /dev/null and b/sql/core/src/test/resources/test-data/tagged_long.parquet differ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala index 307b4e848a8..f7b290da3fd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala @@ -364,6 +364,47 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } + test("SPARK-40280: filter pushdown - int with annotation") { + implicit val df = readResourceParquetFile("test-data/tagged_int.parquet") + + val intAttr = df("_c0").expr + assert(intAttr.dataType === IntegerType) + + checkFilterPredicate(intAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) + checkFilterPredicate(intAttr.isNotNull, classOf[NotEq[_]], + (1 to 4).map(i => Row.apply(i))) + + checkFilterPredicate(intAttr === 1, classOf[Eq[_]], 1) + checkFilterPredicate(intAttr <=> 1, classOf[Eq[_]], 1) + checkFilterPredicate(intAttr =!= 1, classOf[NotEq[_]], + (2 to 4).map(i => Row.apply(i))) + + checkFilterPredicate(intAttr < 2, classOf[Lt[_]], 1) + checkFilterPredicate(intAttr > 3, classOf[Gt[_]], 4) + checkFilterPredicate(intAttr <= 1, classOf[LtEq[_]], 1) + checkFilterPredicate(intAttr >= 4, classOf[GtEq[_]], 4) + + checkFilterPredicate(Literal(1) === intAttr, classOf[Eq[_]], 1) + checkFilterPredicate(Literal(1) <=> intAttr, classOf[Eq[_]], 1) + checkFilterPredicate(Literal(2) > intAttr, classOf[Lt[_]], 1) + checkFilterPredicate(Literal(3) < intAttr, classOf[Gt[_]], 4) + checkFilterPredicate(Literal(1) >= intAttr, classOf[LtEq[_]], 1) + checkFilterPredicate(Literal(4) <= intAttr, classOf[GtEq[_]], 4) + + checkFilterPredicate(!(intAttr < 4), classOf[GtEq[_]], 4) + checkFilterPredicate(intAttr < 2 || intAttr > 3, classOf[Operators.Or], + Seq(Row(1), Row(4))) + + Seq(3, 20).foreach { threshold => + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD.key -> s"$threshold") { + checkFilterPredicate( + In(intAttr, Array(2, 3, 4, 5, 6, 7).map(Literal.apply)), + if (threshold == 3) classOf[FilterIn[_]] else classOf[Operators.Or], + Seq(Row(2), Row(3), Row(4))) + } + } + } + test("filter pushdown - long") { val data = (1 to 4).map(i => Tuple1(Option(i.toLong))) withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) => @@ -408,6 +449,47 @@ abstract class ParquetFilterSuite extends QueryTest with ParquetTest with Shared } } + test("SPARK-40280: filter pushdown - long with annotation") { + implicit val df = readResourceParquetFile("test-data/tagged_long.parquet") + + val longAttr = df("_c0").expr + assert(longAttr.dataType === LongType) + + checkFilterPredicate(longAttr.isNull, classOf[Eq[_]], Seq.empty[Row]) + checkFilterPredicate(longAttr.isNotNull, classOf[NotEq[_]], + (1 to 4).map(i => Row.apply(i))) + + checkFilterPredicate(longAttr === 1, classOf[Eq[_]], 1) + checkFilterPredicate(longAttr <=> 1, classOf[Eq[_]], 1) + checkFilterPredicate(longAttr =!= 1, classOf[NotEq[_]], + (2 to 4).map(i => Row.apply(i))) + + checkFilterPredicate(longAttr < 2, classOf[Lt[_]], 1) + checkFilterPredicate(longAttr > 3, classOf[Gt[_]], 4) + checkFilterPredicate(longAttr <= 1, classOf[LtEq[_]], 1) + checkFilterPredicate(longAttr >= 4, classOf[GtEq[_]], 4) + + checkFilterPredicate(Literal(1) === longAttr, classOf[Eq[_]], 1) + checkFilterPredicate(Literal(1) <=> longAttr, classOf[Eq[_]], 1) + checkFilterPredicate(Literal(2) > longAttr, classOf[Lt[_]], 1) + checkFilterPredicate(Literal(3) < longAttr, classOf[Gt[_]], 4) + checkFilterPredicate(Literal(1) >= longAttr, classOf[LtEq[_]], 1) + checkFilterPredicate(Literal(4) <= longAttr, classOf[GtEq[_]], 4) + + checkFilterPredicate(!(longAttr < 4), classOf[GtEq[_]], 4) + checkFilterPredicate(longAttr < 2 || longAttr > 3, classOf[Operators.Or], + Seq(Row(1), Row(4))) + + Seq(3, 20).foreach { threshold => + withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD.key -> s"$threshold") { + checkFilterPredicate( + In(longAttr, Array(2L, 3L, 4L, 5L, 6L, 7L).map(Literal.apply)), + if (threshold == 3) classOf[FilterIn[_]] else classOf[Operators.Or], + Seq(Row(2L), Row(3L), Row(4L))) + } + } + } + test("filter pushdown - float") { val data = (1 to 4).map(i => Tuple1(Option(i.toFloat))) withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org