[ 
https://issues.apache.org/jira/browse/SPARK-48429?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhen Wang updated SPARK-48429:
------------------------------
    Description: 
*org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown* removes 
pushed filters, which may cause 
*org.apache.spark.sql.execution.dynamicpruning.CleanupDynamicPruningFilters* to 
not take effect for *V2ScanRelation*

 

Reproduction:

After applying the following patch, execute the 
DynamicPartitionPruningV2FilterSuiteAEOn.`SPARK-38148: Do not add dynamic 
partition pruning if there exists static partition pruning` unit test.

 
{code:java}
Subject: [PATCH] test
---
Index: 
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
    (revision 9ad2372bc6b4ab8955839b87534da887ca9642c0)
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
    (date 1716799141402)
@@ -298,6 +298,9 @@
     if (partitioning.length == 1 && partitioning.head.references.length == 1) {
       filter match {
         case In(attrName, _) if attrName == 
partitioning.head.references.head.toString => true
+        case EqualTo(attrName, _) if attrName == 
partitioning.head.references.head.toString => true
+        case IsNotNull(attrName) if attrName == 
partitioning.head.references.head.toString =>
+          true
         case _ => false
       }
     } else {
Index: 
sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
    (revision 9ad2372bc6b4ab8955839b87534da887ca9642c0)
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
    (date 1716798802053)
@@ -1489,12 +1489,7 @@
     "pruning") {
     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
       Seq(
-        "f.store_id = 1" -> false,
-        "1 = f.store_id" -> false,
-        "f.store_id <=> 1" -> false,
-        "1 <=> f.store_id" -> false,
-        "f.store_id > 1" -> true,
-        "5 > f.store_id" -> true).foreach { case (condition, hasDPP) =>
+        "f.store_id = 1" -> false).foreach { case (condition, hasDPP) =>
         // partitioned table at left side
         val df1 = sql(
           s"""
@@ -1843,6 +1838,7 @@
   override protected def initState(): Unit = {
     super.initState()
     spark.conf.set("spark.sql.catalog.testcat", 
classOf[InMemoryTableWithV2FilterCatalog].getName)
+    spark.conf.set("spark.sql.defaultCatalog", "testcat")
   }
 }{code}
 

 

  was:
org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown removes 
pushed filters, which may cause 
org.apache.spark.sql.execution.dynamicpruning.CleanupDynamicPruningFilters to 
not take effect for V2ScanRelation

 

Reproduction:

After applying the following patch, execute the 
DynamicPartitionPruningV2FilterSuiteAEOn.`SPARK-38148: Do not add dynamic 
partition pruning if there exists static partition pruning` unit test.

 
{code:java}
Subject: [PATCH] test
---
Index: 
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
    (revision 9ad2372bc6b4ab8955839b87534da887ca9642c0)
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
    (date 1716799141402)
@@ -298,6 +298,9 @@
     if (partitioning.length == 1 && partitioning.head.references.length == 1) {
       filter match {
         case In(attrName, _) if attrName == 
partitioning.head.references.head.toString => true
+        case EqualTo(attrName, _) if attrName == 
partitioning.head.references.head.toString => true
+        case IsNotNull(attrName) if attrName == 
partitioning.head.references.head.toString =>
+          true
         case _ => false
       }
     } else {
Index: 
sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
IDEA additional info:
Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
<+>UTF-8
===================================================================
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
    (revision 9ad2372bc6b4ab8955839b87534da887ca9642c0)
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
    (date 1716798802053)
@@ -1489,12 +1489,7 @@
     "pruning") {
     withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
       Seq(
-        "f.store_id = 1" -> false,
-        "1 = f.store_id" -> false,
-        "f.store_id <=> 1" -> false,
-        "1 <=> f.store_id" -> false,
-        "f.store_id > 1" -> true,
-        "5 > f.store_id" -> true).foreach { case (condition, hasDPP) =>
+        "f.store_id = 1" -> false).foreach { case (condition, hasDPP) =>
         // partitioned table at left side
         val df1 = sql(
           s"""
@@ -1843,6 +1838,7 @@
   override protected def initState(): Unit = {
     super.initState()
     spark.conf.set("spark.sql.catalog.testcat", 
classOf[InMemoryTableWithV2FilterCatalog].getName)
+    spark.conf.set("spark.sql.defaultCatalog", "testcat")
   }
 }{code}
 

 


> CleanupDynamicPruningFilters does not take effect for V2ScanRelation
> --------------------------------------------------------------------
>
>                 Key: SPARK-48429
>                 URL: https://issues.apache.org/jira/browse/SPARK-48429
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 4.0.0
>            Reporter: Zhen Wang
>            Priority: Major
>
> *org.apache.spark.sql.execution.datasources.v2.V2ScanRelationPushDown* 
> removes pushed filters, which may cause 
> *org.apache.spark.sql.execution.dynamicpruning.CleanupDynamicPruningFilters* 
> to not take effect for *V2ScanRelation*
>  
> Reproduction:
> After applying the following patch, execute the 
> DynamicPartitionPruningV2FilterSuiteAEOn.`SPARK-38148: Do not add dynamic 
> partition pruning if there exists static partition pruning` unit test.
>  
> {code:java}
> Subject: [PATCH] test
> ---
> Index: 
> sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
> IDEA additional info:
> Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
> <+>UTF-8
> ===================================================================
> diff --git 
> a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
>  
> b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
> --- 
> a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
>     (revision 9ad2372bc6b4ab8955839b87534da887ca9642c0)
> +++ 
> b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala
>     (date 1716799141402)
> @@ -298,6 +298,9 @@
>      if (partitioning.length == 1 && partitioning.head.references.length == 
> 1) {
>        filter match {
>          case In(attrName, _) if attrName == 
> partitioning.head.references.head.toString => true
> +        case EqualTo(attrName, _) if attrName == 
> partitioning.head.references.head.toString => true
> +        case IsNotNull(attrName) if attrName == 
> partitioning.head.references.head.toString =>
> +          true
>          case _ => false
>        }
>      } else {
> Index: 
> sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
> IDEA additional info:
> Subsystem: com.intellij.openapi.diff.impl.patch.CharsetEP
> <+>UTF-8
> ===================================================================
> diff --git 
> a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
>  
> b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
> --- 
> a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
>     (revision 9ad2372bc6b4ab8955839b87534da887ca9642c0)
> +++ 
> b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala
>     (date 1716798802053)
> @@ -1489,12 +1489,7 @@
>      "pruning") {
>      withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true") {
>        Seq(
> -        "f.store_id = 1" -> false,
> -        "1 = f.store_id" -> false,
> -        "f.store_id <=> 1" -> false,
> -        "1 <=> f.store_id" -> false,
> -        "f.store_id > 1" -> true,
> -        "5 > f.store_id" -> true).foreach { case (condition, hasDPP) =>
> +        "f.store_id = 1" -> false).foreach { case (condition, hasDPP) =>
>          // partitioned table at left side
>          val df1 = sql(
>            s"""
> @@ -1843,6 +1838,7 @@
>    override protected def initState(): Unit = {
>      super.initState()
>      spark.conf.set("spark.sql.catalog.testcat", 
> classOf[InMemoryTableWithV2FilterCatalog].getName)
> +    spark.conf.set("spark.sql.defaultCatalog", "testcat")
>    }
>  }{code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to