This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 24b3baf0177 [SPARK-40280][SQL] Add support for parquet push down for 
annotated int and long
24b3baf0177 is described below

commit 24b3baf0177fc1446bf59bb34987296aefd4b318
Author: Robert (Bobby) Evans <bo...@apache.org>
AuthorDate: Thu Sep 8 08:54:28 2022 -0500

    [SPARK-40280][SQL] Add support for parquet push down for annotated int and 
long
    
    ### What changes were proposed in this pull request?
    This fixes SPARK-40280 by normalizing a parquet int/long that has optional 
metadata with it to look like the expected version that does not have the extra 
metadata.
    
    ## Why are the changes needed?
    This allows predicate push down in parquet to work when reading files that 
are complaint with the parquet specification, but different from what Spark 
writes.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    I added unit tests that cover this use case. I also did some manual testing 
on some queries to verify that less data is actually read after this change.
    
    Closes #37747 from revans2/normalize_int_long_parquet_push.
    
    Authored-by: Robert (Bobby) Evans <bo...@apache.org>
    Signed-off-by: Thomas Graves <tgra...@apache.org>
---
 .../datasources/parquet/ParquetFilters.scala       |  16 +++-
 .../test/resources/test-data/tagged_int.parquet    | Bin 0 -> 305 bytes
 .../test/resources/test-data/tagged_long.parquet   | Bin 0 -> 313 bytes
 .../datasources/parquet/ParquetFilterSuite.scala   |  82 +++++++++++++++++++++
 4 files changed, 96 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index f93419412e6..c34f2827659 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -30,7 +30,7 @@ import org.apache.parquet.filter2.predicate._
 import org.apache.parquet.filter2.predicate.SparkFilterApi._
 import org.apache.parquet.io.api.Binary
 import org.apache.parquet.schema.{GroupType, LogicalTypeAnnotation, 
MessageType, PrimitiveComparator, PrimitiveType, Type}
-import 
org.apache.parquet.schema.LogicalTypeAnnotation.{DecimalLogicalTypeAnnotation, 
TimeUnit}
+import 
org.apache.parquet.schema.LogicalTypeAnnotation.{DecimalLogicalTypeAnnotation, 
IntLogicalTypeAnnotation, TimeUnit}
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName
 import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
 import org.apache.parquet.schema.Type.Repetition
@@ -59,6 +59,18 @@ class ParquetFilters(
   // nested columns. If any part of the names contains `dots`, it is quoted to 
avoid confusion.
   // See `org.apache.spark.sql.connector.catalog.quote` for implementation 
details.
   private val nameToParquetField : Map[String, ParquetPrimitiveField] = {
+    def getNormalizedLogicalType(p: PrimitiveType): LogicalTypeAnnotation = {
+      // SPARK-40280: Signed 64 bits on an INT64 and signed 32 bits on an 
INT32 are optional, but
+      // the rest of the code here assumes they are not set, so normalize them 
to not being set.
+      (p.getPrimitiveTypeName, p.getLogicalTypeAnnotation) match {
+        case (INT32, intType: IntLogicalTypeAnnotation)
+          if intType.getBitWidth() == 32 && intType.isSigned() => null
+        case (INT64, intType: IntLogicalTypeAnnotation)
+          if intType.getBitWidth() == 64 && intType.isSigned() => null
+        case (_, otherType) => otherType
+      }
+    }
+
     // Recursively traverse the parquet schema to get primitive fields that 
can be pushed-down.
     // `parentFieldNames` is used to keep track of the current nested level 
when traversing.
     def getPrimitiveFields(
@@ -70,7 +82,7 @@ class ParquetFilters(
         //                    repeated columns 
(https://issues.apache.org/jira/browse/PARQUET-34)
         case p: PrimitiveType if p.getRepetition != Repetition.REPEATED =>
           Some(ParquetPrimitiveField(fieldNames = parentFieldNames :+ 
p.getName,
-            fieldType = ParquetSchemaType(p.getLogicalTypeAnnotation,
+            fieldType = ParquetSchemaType(getNormalizedLogicalType(p),
               p.getPrimitiveTypeName, p.getTypeLength)))
         // Note that when g is a `Struct`, `g.getOriginalType` is `null`.
         // When g is a `Map`, `g.getOriginalType` is `MAP`.
diff --git a/sql/core/src/test/resources/test-data/tagged_int.parquet 
b/sql/core/src/test/resources/test-data/tagged_int.parquet
new file mode 100644
index 00000000000..840f5dbdb93
Binary files /dev/null and 
b/sql/core/src/test/resources/test-data/tagged_int.parquet differ
diff --git a/sql/core/src/test/resources/test-data/tagged_long.parquet 
b/sql/core/src/test/resources/test-data/tagged_long.parquet
new file mode 100644
index 00000000000..3ff9450d834
Binary files /dev/null and 
b/sql/core/src/test/resources/test-data/tagged_long.parquet differ
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
index 55b6eaa95e8..9adcb43c838 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilterSuite.scala
@@ -370,6 +370,47 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
     }
   }
 
+  test("SPARK-40280: filter pushdown - int with annotation") {
+    implicit val df = readResourceParquetFile("test-data/tagged_int.parquet")
+
+    val intAttr = df("_c0").expr
+    assert(intAttr.dataType === IntegerType)
+
+    checkFilterPredicate(intAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+    checkFilterPredicate(intAttr.isNotNull, classOf[NotEq[_]],
+      (1 to 4).map(i => Row.apply(i)))
+
+    checkFilterPredicate(intAttr === 1, classOf[Eq[_]], 1)
+    checkFilterPredicate(intAttr <=> 1, classOf[Eq[_]], 1)
+    checkFilterPredicate(intAttr =!= 1, classOf[NotEq[_]],
+      (2 to 4).map(i => Row.apply(i)))
+
+    checkFilterPredicate(intAttr < 2, classOf[Lt[_]], 1)
+    checkFilterPredicate(intAttr > 3, classOf[Gt[_]], 4)
+    checkFilterPredicate(intAttr <= 1, classOf[LtEq[_]], 1)
+    checkFilterPredicate(intAttr >= 4, classOf[GtEq[_]], 4)
+
+    checkFilterPredicate(Literal(1) === intAttr, classOf[Eq[_]], 1)
+    checkFilterPredicate(Literal(1) <=> intAttr, classOf[Eq[_]], 1)
+    checkFilterPredicate(Literal(2) > intAttr, classOf[Lt[_]], 1)
+    checkFilterPredicate(Literal(3) < intAttr, classOf[Gt[_]], 4)
+    checkFilterPredicate(Literal(1) >= intAttr, classOf[LtEq[_]], 1)
+    checkFilterPredicate(Literal(4) <= intAttr, classOf[GtEq[_]], 4)
+
+    checkFilterPredicate(!(intAttr < 4), classOf[GtEq[_]], 4)
+    checkFilterPredicate(intAttr < 2 || intAttr > 3, classOf[Operators.Or],
+      Seq(Row(1), Row(4)))
+
+    Seq(3, 20).foreach { threshold =>
+      withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD.key -> 
s"$threshold") {
+        checkFilterPredicate(
+          In(intAttr, Array(2, 3, 4, 5, 6, 7).map(Literal.apply)),
+          if (threshold == 3) classOf[FilterIn[_]] else classOf[Operators.Or],
+          Seq(Row(2), Row(3), Row(4)))
+      }
+    }
+  }
+
   test("filter pushdown - long") {
     val data = (1 to 4).map(i => Tuple1(Option(i.toLong)))
     withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>
@@ -414,6 +455,47 @@ abstract class ParquetFilterSuite extends QueryTest with 
ParquetTest with Shared
     }
   }
 
+  test("SPARK-40280: filter pushdown - long with annotation") {
+    implicit val df = readResourceParquetFile("test-data/tagged_long.parquet")
+
+    val longAttr = df("_c0").expr
+    assert(longAttr.dataType === LongType)
+
+    checkFilterPredicate(longAttr.isNull, classOf[Eq[_]], Seq.empty[Row])
+    checkFilterPredicate(longAttr.isNotNull, classOf[NotEq[_]],
+      (1 to 4).map(i => Row.apply(i)))
+
+    checkFilterPredicate(longAttr === 1, classOf[Eq[_]], 1)
+    checkFilterPredicate(longAttr <=> 1, classOf[Eq[_]], 1)
+    checkFilterPredicate(longAttr =!= 1, classOf[NotEq[_]],
+      (2 to 4).map(i => Row.apply(i)))
+
+    checkFilterPredicate(longAttr < 2, classOf[Lt[_]], 1)
+    checkFilterPredicate(longAttr > 3, classOf[Gt[_]], 4)
+    checkFilterPredicate(longAttr <= 1, classOf[LtEq[_]], 1)
+    checkFilterPredicate(longAttr >= 4, classOf[GtEq[_]], 4)
+
+    checkFilterPredicate(Literal(1) === longAttr, classOf[Eq[_]], 1)
+    checkFilterPredicate(Literal(1) <=> longAttr, classOf[Eq[_]], 1)
+    checkFilterPredicate(Literal(2) > longAttr, classOf[Lt[_]], 1)
+    checkFilterPredicate(Literal(3) < longAttr, classOf[Gt[_]], 4)
+    checkFilterPredicate(Literal(1) >= longAttr, classOf[LtEq[_]], 1)
+    checkFilterPredicate(Literal(4) <= longAttr, classOf[GtEq[_]], 4)
+
+    checkFilterPredicate(!(longAttr < 4), classOf[GtEq[_]], 4)
+    checkFilterPredicate(longAttr < 2 || longAttr > 3, classOf[Operators.Or],
+      Seq(Row(1), Row(4)))
+
+    Seq(3, 20).foreach { threshold =>
+      withSQLConf(SQLConf.PARQUET_FILTER_PUSHDOWN_INFILTERTHRESHOLD.key -> 
s"$threshold") {
+        checkFilterPredicate(
+          In(longAttr, Array(2L, 3L, 4L, 5L, 6L, 7L).map(Literal.apply)),
+          if (threshold == 3) classOf[FilterIn[_]] else classOf[Operators.Or],
+          Seq(Row(2L), Row(3L), Row(4L)))
+      }
+    }
+  }
+
   test("filter pushdown - float") {
     val data = (1 to 4).map(i => Tuple1(Option(i.toFloat)))
     withNestedParquetDataFrame(data) { case (inputDF, colName, resultFun) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to