This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 38d818913bb [SPARK-38977][SQL] Fix schema pruning with correlated 
subqueries
38d818913bb is described below

commit 38d818913bbb84e720cae149a236c61bf8fc4f18
Author: Anton Okolnychyi <aokolnyc...@apple.com>
AuthorDate: Fri Apr 22 14:11:47 2022 -0700

    [SPARK-38977][SQL] Fix schema pruning with correlated subqueries
    
    ### What changes were proposed in this pull request?
    
    This PR fixes schema pruning for queries with multiple correlated 
subqueries. Previously, Spark would throw an exception trying to determine root 
fields in `SchemaPruning$identifyRootFields`. That was happening because 
expressions in predicates that referenced attributes in subqueries were not 
ignored. That's why attributes from multiple subqueries could conflict with 
each other (e.g. incompatible types) even though they should be ignored.
    
    For instance, the following query would throw a runtime exception.
    
    ```
    SELECT name FROM contacts c
    WHERE
     EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id)
     AND
     EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = n.value)
    ```
    ```
    [info]   org.apache.spark.SparkException: Failed to merge fields 'value' 
and 'value'. Failed to merge incompatible data types int and string
    [info]   at 
org.apache.spark.sql.errors.QueryExecutionErrors$.failedMergingFieldsError(QueryExecutionErrors.scala:936)
    ```
    
    ### Why are the changes needed?
    
    These changes are needed to avoid exceptions for some queries with multiple 
correlated subqueries.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    This PR comes with tests.
    
    Closes #36303 from aokolnychyi/spark-38977.
    
    Authored-by: Anton Okolnychyi <aokolnyc...@apple.com>
    Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com>
    (cherry picked from commit 0c9947dabcb71de414c97c0e60a1067e468f2642)
    Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com>
---
 .../sql/catalyst/expressions/SchemaPruning.scala   |   4 +
 .../execution/datasources/SchemaPruningSuite.scala | 102 +++++++++++++++++++++
 2 files changed, 106 insertions(+)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala
index 9aa2766dd3e..1ebe680263f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SchemaPruning.scala
@@ -140,6 +140,10 @@ object SchemaPruning extends SQLConfHelper {
         RootField(field, derivedFromAtt = false, prunedIfAnyChildAccessed = 
true) :: Nil
       case IsNotNull(_: Attribute) | IsNull(_: Attribute) =>
         
expr.children.flatMap(getRootFields).map(_.copy(prunedIfAnyChildAccessed = 
true))
+      case s: SubqueryExpression =>
+        // use subquery references that only include outer attrs and
+        // ignore join conditions as those may include attributes from other 
tables
+        s.references.toSeq.flatMap(getRootFields)
       case _ =>
         expr.children.flatMap(getRootFields)
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
index 1175063bfa9..0b745f18768 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/SchemaPruningSuite.scala
@@ -901,4 +901,106 @@ abstract class SchemaPruningSuite
       .count()
     assert(count == 0)
   }
+
+  testSchemaPruning("SPARK-38977: schema pruning with correlated EXISTS 
subquery") {
+
+    import testImplicits._
+
+    withTempView("ids", "first_names") {
+      val df1 = Seq(1, 2, 3).toDF("value")
+      df1.createOrReplaceTempView("ids")
+
+      val df2 = Seq("John", "Bob").toDF("value")
+      df2.createOrReplaceTempView("first_names")
+
+      val query = sql(
+        """SELECT name FROM contacts c
+          |WHERE
+          |  EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id)
+          |  AND
+          |  EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = n.value)
+          |""".stripMargin)
+
+      checkScan(query, 
"struct<id:int,name:struct<first:string,middle:string,last:string>>")
+
+      checkAnswer(query, Row(Row("John", "Y.", "Doe")) :: Nil)
+    }
+  }
+
+  testSchemaPruning("SPARK-38977: schema pruning with correlated NOT EXISTS 
subquery") {
+
+    import testImplicits._
+
+    withTempView("ids", "first_names") {
+      val df1 = Seq(1, 2, 3).toDF("value")
+      df1.createOrReplaceTempView("ids")
+
+      val df2 = Seq("John", "Bob").toDF("value")
+      df2.createOrReplaceTempView("first_names")
+
+      val query = sql(
+        """SELECT name FROM contacts c
+          |WHERE
+          |  NOT EXISTS (SELECT 1 FROM ids i WHERE i.value = c.id)
+          |  AND
+          |  NOT EXISTS (SELECT 1 FROM first_names n WHERE c.name.first = 
n.value)
+          |""".stripMargin)
+
+      checkScan(query, 
"struct<id:int,name:struct<first:string,middle:string,last:string>>")
+
+      checkAnswer(query, Row(Row("Jane", "X.", "Doe")) :: Nil)
+    }
+  }
+
+  testSchemaPruning("SPARK-38977: schema pruning with correlated IN subquery") 
{
+
+    import testImplicits._
+
+    withTempView("ids", "first_names") {
+      val df1 = Seq(1, 2, 3).toDF("value")
+      df1.createOrReplaceTempView("ids")
+
+      val df2 = Seq("John", "Bob").toDF("value")
+      df2.createOrReplaceTempView("first_names")
+
+      val query = sql(
+        """SELECT name FROM contacts c
+          |WHERE
+          |  id IN (SELECT * FROM ids i WHERE c.pets > i.value)
+          |  AND
+          |  name.first IN (SELECT * FROM first_names n WHERE c.name.last < 
n.value)
+          |""".stripMargin)
+
+      checkScan(query,
+        
"struct<id:int,name:struct<first:string,middle:string,last:string>,pets:int>")
+
+      checkAnswer(query, Row(Row("John", "Y.", "Doe")) :: Nil)
+    }
+  }
+
+  testSchemaPruning("SPARK-38977: schema pruning with correlated NOT IN 
subquery") {
+
+    import testImplicits._
+
+    withTempView("ids", "first_names") {
+      val df1 = Seq(1, 2, 3).toDF("value")
+      df1.createOrReplaceTempView("ids")
+
+      val df2 = Seq("John", "Janet", "Jim", "Bob").toDF("value")
+      df2.createOrReplaceTempView("first_names")
+
+      val query = sql(
+        """SELECT name FROM contacts c
+          |WHERE
+          |  id NOT IN (SELECT * FROM ids i WHERE c.pets > i.value)
+          |  AND
+          |  name.first NOT IN (SELECT * FROM first_names n WHERE c.name.last 
> n.value)
+          |""".stripMargin)
+
+      checkScan(query,
+        
"struct<id:int,name:struct<first:string,middle:string,last:string>,pets:int>")
+
+      checkAnswer(query, Row(Row("Jane", "X.", "Doe")) :: Nil)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to