This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 75e6b731cce [SPARK-44549][SQL] Support window functions in correlated 
scalar subqueries
75e6b731cce is described below

commit 75e6b731cced459cd4a7eaed75b0bb84334950a3
Author: Andrey Gubichev <andrey.gubic...@databricks.com>
AuthorDate: Wed Aug 23 22:49:41 2023 +0800

    [SPARK-44549][SQL] Support window functions in correlated scalar subqueries
    
    ### What changes were proposed in this pull request?
    
    Support window functions in correlated scalar subqueries.
    Support in IN/EXISTS subqueries will come after we migrate them into 
DecorrelateInnerQuery framework. In addition, correlation is not yet supported 
inside the window function itself.
    
    ### Why are the changes needed?
    
    Supports more subqueries.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, users can run more subqueries now.
    
    ### How was this patch tested?
    
    Unit and query tests. The results of query tests were checked against 
Postgresql.
    
    Closes #42383 from agubichev/SPARK-44549-corr-window.
    
    Authored-by: Andrey Gubichev <andrey.gubic...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/analysis/CheckAnalysis.scala      |   5 +
 .../catalyst/optimizer/DecorrelateInnerQuery.scala |  22 ++-
 .../optimizer/DecorrelateInnerQuerySuite.scala     |  44 ++++++
 .../analyzer-results/join-lateral.sql.out          |  69 +++++----
 .../exists-subquery/exists-aggregate.sql.out       |  24 ++++
 .../subquery/in-subquery/in-group-by.sql.out       |  23 +++
 .../scalar-subquery-predicate.sql.out              | 159 +++++++++++++++++++++
 .../resources/sql-tests/inputs/join-lateral.sql    |   2 +-
 .../subquery/exists-subquery/exists-aggregate.sql  |   7 +
 .../inputs/subquery/in-subquery/in-group-by.sql    |   7 +-
 .../scalar-subquery/scalar-subquery-predicate.sql  |  38 +++++
 .../sql-tests/results/join-lateral.sql.out         |  46 ++----
 .../exists-subquery/exists-aggregate.sql.out       |  26 ++++
 .../subquery/in-subquery/in-group-by.sql.out       |  25 ++++
 .../scalar-subquery-predicate.sql.out              |  79 ++++++++++
 .../scala/org/apache/spark/sql/SubquerySuite.scala |   2 +-
 16 files changed, 511 insertions(+), 67 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index a86a6052708..0d663dfb5f9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -1351,6 +1351,11 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
           failOnInvalidOuterReference(a)
           checkPlan(a.child, aggregated = true, canContainOuter)
 
+        // Same as Aggregate above.
+        case w: Window =>
+          failOnInvalidOuterReference(w)
+          checkPlan(w.child, aggregated = true, canContainOuter)
+
         // Distinct does not host any correlated expressions, but during the 
optimization phase
         // it will be rewritten as Aggregate, which can only be on a 
correlation path if the
         // correlation contains only the supported correlated equality 
predicates.
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
index a3e264579f4..a07177f6e8a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
@@ -478,7 +478,8 @@ object DecorrelateInnerQuery extends PredicateHelper {
     // parentOuterReferences: a set of parent outer references. As we recurse 
down we collect the
     // set of outer references that are part of the Domain, and use it to 
construct the DomainJoins
     // and join conditions.
-    // aggregated: a boolean flag indicating whether the result of the plan 
will be aggregated.
+    // aggregated: a boolean flag indicating whether the result of the plan 
will be aggregated
+    // (or used as an input for a window function)
     // underSetOp: a boolean flag indicating whether a set operator (e.g. 
UNION) is a parent of the
     // inner plan.
     //
@@ -654,6 +655,25 @@ object DecorrelateInnerQuery extends PredicateHelper {
             val newProject = Project(newProjectList ++ referencesToAdd, 
newChild)
             (newProject, joinCond, outerReferenceMap)
 
+          case w @ Window(projectList, partitionSpec, orderSpec, child) =>
+            val outerReferences = collectOuterReferences(w.expressions)
+            assert(outerReferences.isEmpty, s"Correlated column is not allowed 
in window " +
+              s"function: $w")
+            val newOuterReferences = parentOuterReferences ++ outerReferences
+            val (newChild, joinCond, outerReferenceMap) =
+              decorrelate(child, newOuterReferences, aggregated = true, 
underSetOp)
+            // For now these are no-op, as we don't allow correlated 
references in the window
+            // function itself.
+            val newProjectList = replaceOuterReferences(projectList, 
outerReferenceMap)
+            val newPartitionSpec = replaceOuterReferences(partitionSpec, 
outerReferenceMap)
+            val newOrderSpec = replaceOuterReferences(orderSpec, 
outerReferenceMap)
+            val referencesToAdd = missingReferences(newProjectList, joinCond)
+
+            val newWindow = Window(newProjectList ++ referencesToAdd,
+              partitionSpec = newPartitionSpec ++ referencesToAdd,
+              orderSpec = newOrderSpec, newChild)
+            (newWindow, joinCond, outerReferenceMap)
+
           case a @ Aggregate(groupingExpressions, aggregateExpressions, child) 
=>
             val outerReferences = collectOuterReferences(a.expressions)
             val newOuterReferences = parentOuterReferences ++ outerReferences
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala
index 21ac8849fe2..0f7b01c0715 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala
@@ -581,4 +581,48 @@ class DecorrelateInnerQuerySuite extends PlanTest {
               Project(Seq(a4, b4), testRelation4)))))
     check(innerPlan, outerPlan, correctAnswer, Seq(a <=> a))
   }
+
+  test("window function with correlated equality predicate") {
+    val outerPlan = testRelation2
+    val innerPlan =
+      Window(Seq(b, c),
+        partitionSpec = Seq(c), orderSpec = b.asc :: Nil,
+        Filter(And(OuterReference(x) === a, b === 3),
+          testRelation))
+    // Both the project list and the partition spec have added the correlated 
variable.
+    val correctAnswer =
+      Window(Seq(b, c, a), partitionSpec = Seq(c, a), orderSpec = b.asc :: Nil,
+        Filter(b === 3,
+          testRelation))
+    check(innerPlan, outerPlan, correctAnswer, Seq(x === a))
+  }
+
+  test("window function with correlated non-equality predicate") {
+    val outerPlan = testRelation2
+    val innerPlan =
+      Window(Seq(b, c),
+        partitionSpec = Seq(c), orderSpec = b.asc :: Nil,
+        Filter(And(OuterReference(x) > a, b === 3),
+          testRelation))
+    // Both the project list and the partition spec have added the correlated 
variable.
+    // The input to the filter is a domain join that produces 'x' values.
+    val correctAnswer =
+    Window(Seq(b, c, x), partitionSpec = Seq(c, x), orderSpec = b.asc :: Nil,
+      Filter(And(b === 3, x > a),
+        DomainJoin(Seq(x), testRelation)))
+    check(innerPlan, outerPlan, correctAnswer, Seq(x <=> x))
+  }
+
+  test("window function with correlated columns inside") {
+    val outerPlan = testRelation2
+    val innerPlan =
+      Window(Seq(b, c),
+        partitionSpec = Seq(c, OuterReference(x)), orderSpec = b.asc :: Nil,
+        Filter(b === 3,
+          testRelation))
+    val e = intercept[java.lang.AssertionError] {
+      DecorrelateInnerQuery(innerPlan, outerPlan.select())
+    }
+    assert(e.getMessage.contains("Correlated column is not allowed in"))
+  }
 }
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out
index 2d1eebc65c6..1ad033d8273 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out
@@ -1450,21 +1450,22 @@ SELECT * FROM t1 JOIN LATERAL
   FROM   t2
   WHERE  t2.c1 >= t1.c1)
 -- !query analysis
-org.apache.spark.sql.catalyst.ExtendedAnalysisException
-{
-  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
-  "sqlState" : "0A000",
-  "messageParameters" : {
-    "treeNode" : "Filter (c1#x >= outer(c1#x))\n+- SubqueryAlias 
spark_catalog.default.t2\n   +- View (`spark_catalog`.`default`.`t2`, 
[c1#x,c2#x])\n      +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as 
int) AS c2#x]\n         +- LocalRelation [col1#x, col2#x]\n"
-  },
-  "queryContext" : [ {
-    "objectType" : "",
-    "objectName" : "",
-    "startIndex" : 34,
-    "stopIndex" : 108,
-    "fragment" : "SELECT sum(t2.c2) over (order by t2.c1)\n  FROM   t2\n  
WHERE  t2.c1 >= t1.c1"
-  } ]
-}
+Project [c1#x, c2#x, sum(c2) OVER (ORDER BY c1 ASC NULLS FIRST RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW)#xL]
++- LateralJoin lateral-subquery#x [c1#x], Inner
+   :  +- SubqueryAlias __auto_generated_subquery_name
+   :     +- Project [sum(c2) OVER (ORDER BY c1 ASC NULLS FIRST RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW)#xL]
+   :        +- Project [c2#x, c1#x, sum(c2) OVER (ORDER BY c1 ASC NULLS FIRST 
RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#xL, sum(c2) OVER (ORDER BY 
c1 ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#xL]
+   :           +- Window [sum(c2#x) windowspecdefinition(c1#x ASC NULLS FIRST, 
specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS 
sum(c2) OVER (ORDER BY c1 ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND 
CURRENT ROW)#xL], [c1#x ASC NULLS FIRST]
+   :              +- Project [c2#x, c1#x]
+   :                 +- Filter (c1#x >= outer(c1#x))
+   :                    +- SubqueryAlias spark_catalog.default.t2
+   :                       +- View (`spark_catalog`.`default`.`t2`, 
[c1#x,c2#x])
+   :                          +- Project [cast(col1#x as int) AS c1#x, 
cast(col2#x as int) AS c2#x]
+   :                             +- LocalRelation [col1#x, col2#x]
+   +- SubqueryAlias spark_catalog.default.t1
+      +- View (`spark_catalog`.`default`.`t1`, [c1#x,c2#x])
+         +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]
+            +- LocalRelation [col1#x, col2#x]
 
 
 -- !query
@@ -2007,21 +2008,29 @@ SELECT * FROM t1 JOIN LATERAL
   SELECT t4.c2
   FROM   t4)
 -- !query analysis
-org.apache.spark.sql.catalyst.ExtendedAnalysisException
-{
-  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
-  "sqlState" : "0A000",
-  "messageParameters" : {
-    "treeNode" : "Filter (c1#x >= outer(c1#x))\n+- SubqueryAlias 
spark_catalog.default.t2\n   +- View (`spark_catalog`.`default`.`t2`, 
[c1#x,c2#x])\n      +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as 
int) AS c2#x]\n         +- LocalRelation [col1#x, col2#x]\n"
-  },
-  "queryContext" : [ {
-    "objectType" : "",
-    "objectName" : "",
-    "startIndex" : 34,
-    "stopIndex" : 108,
-    "fragment" : "SELECT sum(t2.c2) over (order by t2.c1)\n  FROM   t2\n  
WHERE  t2.c1 >= t1.c1"
-  } ]
-}
+Project [c1#x, c2#x, sum(c2) OVER (ORDER BY c1 ASC NULLS FIRST RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW)#xL]
++- LateralJoin lateral-subquery#x [c1#x], Inner
+   :  +- SubqueryAlias __auto_generated_subquery_name
+   :     +- Union false, false
+   :        :- Project [sum(c2) OVER (ORDER BY c1 ASC NULLS FIRST RANGE 
BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#xL]
+   :        :  +- Project [c2#x, c1#x, sum(c2) OVER (ORDER BY c1 ASC NULLS 
FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)#xL, sum(c2) OVER 
(ORDER BY c1 ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT 
ROW)#xL]
+   :        :     +- Window [sum(c2#x) windowspecdefinition(c1#x ASC NULLS 
FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) 
AS sum(c2) OVER (ORDER BY c1 ASC NULLS FIRST RANGE BETWEEN UNBOUNDED PRECEDING 
AND CURRENT ROW)#xL], [c1#x ASC NULLS FIRST]
+   :        :        +- Project [c2#x, c1#x]
+   :        :           +- Filter (c1#x >= outer(c1#x))
+   :        :              +- SubqueryAlias spark_catalog.default.t2
+   :        :                 +- View (`spark_catalog`.`default`.`t2`, 
[c1#x,c2#x])
+   :        :                    +- Project [cast(col1#x as int) AS c1#x, 
cast(col2#x as int) AS c2#x]
+   :        :                       +- LocalRelation [col1#x, col2#x]
+   :        +- Project [cast(c2#x as bigint) AS c2#xL]
+   :           +- Project [c2#x]
+   :              +- SubqueryAlias spark_catalog.default.t4
+   :                 +- View (`spark_catalog`.`default`.`t4`, [c1#x,c2#x])
+   :                    +- Project [cast(col1#x as int) AS c1#x, cast(col2#x 
as int) AS c2#x]
+   :                       +- LocalRelation [col1#x, col2#x]
+   +- SubqueryAlias spark_catalog.default.t1
+      +- View (`spark_catalog`.`default`.`t1`, [c1#x,c2#x])
+         +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]
+            +- LocalRelation [col1#x, col2#x]
 
 
 -- !query
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/exists-subquery/exists-aggregate.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/exists-subquery/exists-aggregate.sql.out
index baeb88169d5..549a87f37d3 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/exists-subquery/exists-aggregate.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/exists-subquery/exists-aggregate.sql.out
@@ -310,3 +310,27 @@ Project [emp_name#x, bonus_amt#x]
             +- Project [emp_name#x, bonus_amt#x]
                +- SubqueryAlias BONUS
                   +- LocalRelation [emp_name#x, bonus_amt#x]
+
+
+-- !query
+SELECT *
+FROM BONUS
+WHERE EXISTS(SELECT RANK() OVER (PARTITION BY hiredate ORDER BY salary) AS s
+                    FROM EMP, DEPT where EMP.dept_id = DEPT.dept_id
+                        AND DEPT.dept_name < BONUS.emp_name)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "(dept_name#x < outer(emp_name#x))\nFilter ((dept_id#x = 
dept_id#x) AND (dept_name#x < outer(emp_name#x)))\n+- Join Inner\n   :- 
SubqueryAlias emp\n   :  +- View (`EMP`, 
[id#x,emp_name#x,hiredate#x,salary#x,dept_id#x])\n   :     +- Project 
[cast(id#x as int) AS id#x, cast(emp_name#x as string) AS emp_name#x, 
cast(hiredate#x as date) AS hiredate#x, cast(salary#x as double) AS salary#x, 
cast(dept_id#x as int) AS dept_id#x]\n   :        +- Project [id#x, emp_name#x, 
hiredat [...]
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 34,
+    "stopIndex" : 224,
+    "fragment" : "SELECT RANK() OVER (PARTITION BY hiredate ORDER BY salary) 
AS s\n                    FROM EMP, DEPT where EMP.dept_id = DEPT.dept_id\n     
                   AND DEPT.dept_name < BONUS.emp_name"
+  } ]
+}
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-group-by.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-group-by.sql.out
index 11c596eee1c..b249a75ab6d 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-group-by.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/in-subquery/in-group-by.sql.out
@@ -639,3 +639,26 @@ Filter isnotnull(min(t1b)#x)
                +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, 
t1h#x, t1i#x]
                   +- SubqueryAlias t1
                      +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, 
t1f#x, t1g#x, t1h#x, t1i#x]
+
+
+-- !query
+select t1a
+from t1
+where t1f IN (SELECT RANK() OVER (partition by t3c  order by t2b) as s
+                             FROM t2, t3 where t2.t2c = t3.t3c and t2.t2a < 
t1.t1a)
+-- !query analysis
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "(t2a#x < outer(t1a#x))\nFilter ((t2c#x = t3c#x) AND (t2a#x < 
outer(t1a#x)))\n+- Join Inner\n   :- SubqueryAlias t2\n   :  +- View (`t2`, 
[t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])\n   :     +- Project 
[cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x 
as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS 
t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, 
cast(t2h#x as timestamp) AS [...]
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 34,
+    "stopIndex" : 172,
+    "fragment" : "SELECT RANK() OVER (partition by t3c  order by t2b) as s\n   
                          FROM t2, t3 where t2.t2c = t3.t3c and t2.t2a < t1.t1a"
+  } ]
+}
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
index 76c9bec5fb8..5b66c8461d0 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
@@ -880,6 +880,165 @@ Project [t1a#x]
                      +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, 
t1f#x, t1g#x, t1h#x, t1i#x]
 
 
+-- !query
+SELECT 1
+FROM t1
+WHERE t1b < (SELECT MAX(tmp.s) FROM (
+             SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s
+               FROM t2 WHERE t2.t2d = t1.t1d) as tmp)
+-- !query analysis
+Project [1 AS 1#x]
++- Filter (cast(t1b#x as bigint) < scalar-subquery#x [t1d#xL])
+   :  +- Aggregate [max(s#xL) AS max(s)#xL]
+   :     +- SubqueryAlias tmp
+   :        +- Project [s#xL]
+   :           +- Project [t2b#x, t2c#x, t2d#xL, s#xL, s#xL]
+   :              +- Window [sum(t2b#x) windowspecdefinition(t2c#x, t2d#xL ASC 
NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), 
currentrow$())) AS s#xL], [t2c#x], [t2d#xL ASC NULLS FIRST]
+   :                 +- Project [t2b#x, t2c#x, t2d#xL]
+   :                    +- Filter (t2d#xL = outer(t1d#xL))
+   :                       +- SubqueryAlias t2
+   :                          +- View (`t2`, 
[t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])
+   :                             +- Project [cast(t2a#x as string) AS t2a#x, 
cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as 
bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS 
t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, 
cast(t2i#x as date) AS t2i#x]
+   :                                +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, 
t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   :                                   +- SubqueryAlias t2
+   :                                      +- LocalRelation [t2a#x, t2b#x, 
t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   +- SubqueryAlias t1
+      +- View (`t1`, [t1a#x,t1b#x,t1c#x,t1d#xL,t1e#x,t1f#x,t1g#x,t1h#x,t1i#x])
+         +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) 
AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, 
cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as 
decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) 
AS t1i#x]
+            +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, 
t1h#x, t1i#x]
+               +- SubqueryAlias t1
+                  +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, 
t1g#x, t1h#x, t1i#x]
+
+
+-- !query
+SELECT 1
+FROM t1
+WHERE t1b < (SELECT MAX(tmp.s) FROM (
+             SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s
+               FROM t2 WHERE t2.t2d <= t1.t1d) as tmp)
+-- !query analysis
+Project [1 AS 1#x]
++- Filter (cast(t1b#x as bigint) < scalar-subquery#x [t1d#xL])
+   :  +- Aggregate [max(s#xL) AS max(s)#xL]
+   :     +- SubqueryAlias tmp
+   :        +- Project [s#xL]
+   :           +- Project [t2b#x, t2c#x, t2d#xL, s#xL, s#xL]
+   :              +- Window [sum(t2b#x) windowspecdefinition(t2c#x, t2d#xL ASC 
NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), 
currentrow$())) AS s#xL], [t2c#x], [t2d#xL ASC NULLS FIRST]
+   :                 +- Project [t2b#x, t2c#x, t2d#xL]
+   :                    +- Filter (t2d#xL <= outer(t1d#xL))
+   :                       +- SubqueryAlias t2
+   :                          +- View (`t2`, 
[t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])
+   :                             +- Project [cast(t2a#x as string) AS t2a#x, 
cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as 
bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS 
t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, 
cast(t2i#x as date) AS t2i#x]
+   :                                +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, 
t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   :                                   +- SubqueryAlias t2
+   :                                      +- LocalRelation [t2a#x, t2b#x, 
t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   +- SubqueryAlias t1
+      +- View (`t1`, [t1a#x,t1b#x,t1c#x,t1d#xL,t1e#x,t1f#x,t1g#x,t1h#x,t1i#x])
+         +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) 
AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, 
cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as 
decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) 
AS t1i#x]
+            +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, 
t1h#x, t1i#x]
+               +- SubqueryAlias t1
+                  +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, 
t1g#x, t1h#x, t1i#x]
+
+
+-- !query
+SELECT t1b
+FROM t1
+WHERE t1b > (SELECT MAX(tmp.s) FROM (
+             SELECT RANK() OVER (partition by t3c, t2b order by t3c) as s
+               FROM t2, t3 where t2.t2c = t3.t3c AND t2.t2a = t1.t1a) as tmp)
+-- !query analysis
+Project [t1b#x]
++- Filter (cast(t1b#x as int) > scalar-subquery#x [t1a#x])
+   :  +- Aggregate [max(s#x) AS max(s)#x]
+   :     +- SubqueryAlias tmp
+   :        +- Project [s#x]
+   :           +- Project [t3c#x, t2b#x, s#x, s#x]
+   :              +- Window [rank(t3c#x) windowspecdefinition(t3c#x, t2b#x, 
t3c#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), 
currentrow$())) AS s#x], [t3c#x, t2b#x], [t3c#x ASC NULLS FIRST]
+   :                 +- Project [t3c#x, t2b#x]
+   :                    +- Filter ((t2c#x = t3c#x) AND (t2a#x = outer(t1a#x)))
+   :                       +- Join Inner
+   :                          :- SubqueryAlias t2
+   :                          :  +- View (`t2`, 
[t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])
+   :                          :     +- Project [cast(t2a#x as string) AS 
t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, 
cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as 
double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as 
timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]
+   :                          :        +- Project [t2a#x, t2b#x, t2c#x, 
t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   :                          :           +- SubqueryAlias t2
+   :                          :              +- LocalRelation [t2a#x, t2b#x, 
t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   :                          +- SubqueryAlias t3
+   :                             +- View (`t3`, 
[t3a#x,t3b#x,t3c#x,t3d#xL,t3e#x,t3f#x,t3g#x,t3h#x,t3i#x])
+   :                                +- Project [cast(t3a#x as string) AS 
t3a#x, cast(t3b#x as smallint) AS t3b#x, cast(t3c#x as int) AS t3c#x, 
cast(t3d#xL as bigint) AS t3d#xL, cast(t3e#x as float) AS t3e#x, cast(t3f#x as 
double) AS t3f#x, cast(t3g#x as decimal(4,0)) AS t3g#x, cast(t3h#x as 
timestamp) AS t3h#x, cast(t3i#x as date) AS t3i#x]
+   :                                   +- Project [t3a#x, t3b#x, t3c#x, 
t3d#xL, t3e#x, t3f#x, t3g#x, t3h#x, t3i#x]
+   :                                      +- SubqueryAlias t3
+   :                                         +- LocalRelation [t3a#x, t3b#x, 
t3c#x, t3d#xL, t3e#x, t3f#x, t3g#x, t3h#x, t3i#x]
+   +- SubqueryAlias t1
+      +- View (`t1`, [t1a#x,t1b#x,t1c#x,t1d#xL,t1e#x,t1f#x,t1g#x,t1h#x,t1i#x])
+         +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) 
AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, 
cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as 
decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) 
AS t1i#x]
+            +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, 
t1h#x, t1i#x]
+               +- SubqueryAlias t1
+                  +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, 
t1g#x, t1h#x, t1i#x]
+
+
+-- !query
+SELECT t1b
+FROM t1
+WHERE t1b > (SELECT MAX(tmp.s) FROM (
+             SELECT RANK() OVER (partition by t3c, t3d order by t3c) as s
+               FROM (SELECT t3b, t3c, max(t3d) as t3d FROM t3 GROUP BY t3b, 
t3c) as g) as tmp)
+ORDER BY t1b
+-- !query analysis
+Sort [t1b#x ASC NULLS FIRST], true
++- Project [t1b#x]
+   +- Filter (cast(t1b#x as int) > scalar-subquery#x [])
+      :  +- Aggregate [max(s#x) AS max(s)#x]
+      :     +- SubqueryAlias tmp
+      :        +- Project [s#x]
+      :           +- Project [t3c#x, t3d#xL, s#x, s#x]
+      :              +- Window [rank(t3c#x) windowspecdefinition(t3c#x, 
t3d#xL, t3c#x ASC NULLS FIRST, specifiedwindowframe(RowFrame, 
unboundedpreceding$(), currentrow$())) AS s#x], [t3c#x, t3d#xL], [t3c#x ASC 
NULLS FIRST]
+      :                 +- Project [t3c#x, t3d#xL]
+      :                    +- SubqueryAlias g
+      :                       +- Aggregate [t3b#x, t3c#x], [t3b#x, t3c#x, 
max(t3d#xL) AS t3d#xL]
+      :                          +- SubqueryAlias t3
+      :                             +- View (`t3`, 
[t3a#x,t3b#x,t3c#x,t3d#xL,t3e#x,t3f#x,t3g#x,t3h#x,t3i#x])
+      :                                +- Project [cast(t3a#x as string) AS 
t3a#x, cast(t3b#x as smallint) AS t3b#x, cast(t3c#x as int) AS t3c#x, 
cast(t3d#xL as bigint) AS t3d#xL, cast(t3e#x as float) AS t3e#x, cast(t3f#x as 
double) AS t3f#x, cast(t3g#x as decimal(4,0)) AS t3g#x, cast(t3h#x as 
timestamp) AS t3h#x, cast(t3i#x as date) AS t3i#x]
+      :                                   +- Project [t3a#x, t3b#x, t3c#x, 
t3d#xL, t3e#x, t3f#x, t3g#x, t3h#x, t3i#x]
+      :                                      +- SubqueryAlias t3
+      :                                         +- LocalRelation [t3a#x, 
t3b#x, t3c#x, t3d#xL, t3e#x, t3f#x, t3g#x, t3h#x, t3i#x]
+      +- SubqueryAlias t1
+         +- View (`t1`, 
[t1a#x,t1b#x,t1c#x,t1d#xL,t1e#x,t1f#x,t1g#x,t1h#x,t1i#x])
+            +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as 
smallint) AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS 
t1d#xL, cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, 
cast(t1g#x as decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, 
cast(t1i#x as date) AS t1i#x]
+               +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, 
t1h#x, t1i#x]
+                  +- SubqueryAlias t1
+                     +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, 
t1f#x, t1g#x, t1h#x, t1i#x]
+
+
+-- !query
+SELECT 1
+FROM t1
+WHERE t1b = (SELECT MAX(tmp.s) FROM (
+             SELECT SUM(t2c) OVER (partition by t2c order by t1.t1d + t2d) as s
+               FROM t2) as tmp)
+-- !query analysis
+Project [1 AS 1#x]
++- Filter (cast(t1b#x as bigint) = scalar-subquery#x [t1d#xL])
+   :  +- Aggregate [max(s#xL) AS max(s)#xL]
+   :     +- SubqueryAlias tmp
+   :        +- Project [s#xL]
+   :           +- Project [t2c#x, _w1#xL, s#xL, s#xL]
+   :              +- Window [sum(t2c#x) windowspecdefinition(t2c#x, _w1#xL ASC 
NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), 
currentrow$())) AS s#xL], [t2c#x], [_w1#xL ASC NULLS FIRST]
+   :                 +- Project [t2c#x, (outer(t1d#xL) + t2d#xL) AS _w1#xL]
+   :                    +- SubqueryAlias t2
+   :                       +- View (`t2`, 
[t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])
+   :                          +- Project [cast(t2a#x as string) AS t2a#x, 
cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as 
bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS 
t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, 
cast(t2i#x as date) AS t2i#x]
+   :                             +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, 
t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   :                                +- SubqueryAlias t2
+   :                                   +- LocalRelation [t2a#x, t2b#x, t2c#x, 
t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   +- SubqueryAlias t1
+      +- View (`t1`, [t1a#x,t1b#x,t1c#x,t1d#xL,t1e#x,t1f#x,t1g#x,t1h#x,t1i#x])
+         +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) 
AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, 
cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as 
decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) 
AS t1i#x]
+            +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, 
t1h#x, t1i#x]
+               +- SubqueryAlias t1
+                  +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, 
t1g#x, t1h#x, t1i#x]
+
+
 -- !query
 CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0)
 -- !query analysis
diff --git a/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql 
b/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql
index 2787a865975..841ca94dad6 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql
@@ -183,7 +183,7 @@ SELECT * FROM t3 JOIN LATERAL (SELECT EXPLODE_OUTER(c2));
 SELECT * FROM t3 JOIN LATERAL (SELECT EXPLODE(c2)) t(c3) ON c1 = c3;
 SELECT * FROM t3 LEFT JOIN LATERAL (SELECT EXPLODE(c2)) t(c3) ON c1 = c3;
 
--- Window func - unsupported
+-- Window func
 SELECT * FROM t1 JOIN LATERAL
   (SELECT sum(t2.c2) over (order by t2.c1)
   FROM   t2
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-aggregate.sql
 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-aggregate.sql
index ae6a9641aae..1c4ef982c66 100644
--- 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-aggregate.sql
+++ 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/exists-subquery/exists-aggregate.sql
@@ -118,3 +118,10 @@ WHERE  NOT EXISTS (SELECT 1
                                       FROM   dept 
                                       WHERE  emp.dept_id = dept.dept_id 
                                       GROUP  BY dept.dept_id));
+
+-- Window functions are not supported in EXISTS subqueries yet
+SELECT *
+FROM BONUS
+WHERE EXISTS(SELECT RANK() OVER (PARTITION BY hiredate ORDER BY salary) AS s
+                    FROM EMP, DEPT where EMP.dept_id = DEPT.dept_id
+                        AND DEPT.dept_name < BONUS.emp_name);
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-group-by.sql
 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-group-by.sql
index 496285e3514..168faa0aee7 100644
--- 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-group-by.sql
+++ 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/in-subquery/in-group-by.sql
@@ -240,5 +240,8 @@ WHERE  t1c IN (SELECT Min(t2c)
 GROUP  BY t1a
 HAVING Min(t1b) IS NOT NULL;
 
-
-
+-- Window functions are not supported in IN subqueries yet
+select t1a
+from t1
+where t1f IN (SELECT RANK() OVER (partition by t3c  order by t2b) as s
+                             FROM t2, t3 where t2.t2c = t3.t3c and t2.t2a < 
t1.t1a);
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql
 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql
index a49f30773ca..65adfdcb43c 100644
--- 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql
+++ 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql
@@ -280,6 +280,44 @@ HAVING   max(t1b) <= (SELECT   max(t2b)
                       WHERE    t2c = t1c
                       GROUP BY t2c);
 
+-- SPARK-44549: window function in the correlated subquery.
+SELECT 1
+FROM t1
+WHERE t1b < (SELECT MAX(tmp.s) FROM (
+             SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s
+               FROM t2 WHERE t2.t2d = t1.t1d) as tmp);
+
+
+-- SPARK-44549: window function in the correlated subquery with non-equi 
predicate.
+SELECT 1
+FROM t1
+WHERE t1b < (SELECT MAX(tmp.s) FROM (
+             SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s
+               FROM t2 WHERE t2.t2d <= t1.t1d) as tmp);
+
+-- SPARK-44549: window function in the correlated subquery over joins.
+SELECT t1b
+FROM t1
+WHERE t1b > (SELECT MAX(tmp.s) FROM (
+             SELECT RANK() OVER (partition by t3c, t2b order by t3c) as s
+               FROM t2, t3 where t2.t2c = t3.t3c AND t2.t2a = t1.t1a) as tmp);
+
+-- SPARK-44549: window function in the correlated subquery over aggregation.
+SELECT t1b
+FROM t1
+WHERE t1b > (SELECT MAX(tmp.s) FROM (
+             SELECT RANK() OVER (partition by t3c, t3d order by t3c) as s
+               FROM (SELECT t3b, t3c, max(t3d) as t3d FROM t3 GROUP BY t3b, 
t3c) as g) as tmp)
+ORDER BY t1b;
+
+
+-- SPARK-44549: correlation in window function itself is not supported yet.
+SELECT 1
+FROM t1
+WHERE t1b = (SELECT MAX(tmp.s) FROM (
+             SELECT SUM(t2c) OVER (partition by t2c order by t1.t1d + t2d) as s
+               FROM t2) as tmp);
+
 -- Set operations in correlation path
 
 CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0);
diff --git a/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out 
b/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
index 33f084f3d86..2053c79e1ab 100644
--- a/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
@@ -897,23 +897,10 @@ SELECT * FROM t1 JOIN LATERAL
   FROM   t2
   WHERE  t2.c1 >= t1.c1)
 -- !query schema
-struct<>
+struct<c1:int,c2:int,sum(c2) OVER (ORDER BY c1 ASC NULLS FIRST RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW):bigint>
 -- !query output
-org.apache.spark.sql.catalyst.ExtendedAnalysisException
-{
-  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
-  "sqlState" : "0A000",
-  "messageParameters" : {
-    "treeNode" : "Filter (c1#x >= outer(c1#x))\n+- SubqueryAlias 
spark_catalog.default.t2\n   +- View (`spark_catalog`.`default`.`t2`, 
[c1#x,c2#x])\n      +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as 
int) AS c2#x]\n         +- LocalRelation [col1#x, col2#x]\n"
-  },
-  "queryContext" : [ {
-    "objectType" : "",
-    "objectName" : "",
-    "startIndex" : 34,
-    "stopIndex" : 108,
-    "fragment" : "SELECT sum(t2.c2) over (order by t2.c1)\n  FROM   t2\n  
WHERE  t2.c1 >= t1.c1"
-  } ]
-}
+0      1       5
+0      1       5
 
 
 -- !query
@@ -1230,23 +1217,18 @@ SELECT * FROM t1 JOIN LATERAL
   SELECT t4.c2
   FROM   t4)
 -- !query schema
-struct<>
+struct<c1:int,c2:int,sum(c2) OVER (ORDER BY c1 ASC NULLS FIRST RANGE BETWEEN 
UNBOUNDED PRECEDING AND CURRENT ROW):bigint>
 -- !query output
-org.apache.spark.sql.catalyst.ExtendedAnalysisException
-{
-  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
-  "sqlState" : "0A000",
-  "messageParameters" : {
-    "treeNode" : "Filter (c1#x >= outer(c1#x))\n+- SubqueryAlias 
spark_catalog.default.t2\n   +- View (`spark_catalog`.`default`.`t2`, 
[c1#x,c2#x])\n      +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as 
int) AS c2#x]\n         +- LocalRelation [col1#x, col2#x]\n"
-  },
-  "queryContext" : [ {
-    "objectType" : "",
-    "objectName" : "",
-    "startIndex" : 34,
-    "stopIndex" : 108,
-    "fragment" : "SELECT sum(t2.c2) over (order by t2.c1)\n  FROM   t2\n  
WHERE  t2.c1 >= t1.c1"
-  } ]
-}
+0      1       1
+0      1       1
+0      1       2
+0      1       3
+0      1       5
+0      1       5
+1      2       1
+1      2       1
+1      2       2
+1      2       3
 
 
 -- !query
diff --git 
a/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-aggregate.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-aggregate.sql.out
index 5a2f3aad715..f62efdcbad4 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-aggregate.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/subquery/exists-subquery/exists-aggregate.sql.out
@@ -178,3 +178,29 @@ struct<emp_name:string,bonus_amt:double>
 -- !query output
 emp 5  1000.0
 emp 6 - no dept        500.0
+
+
+-- !query
+SELECT *
+FROM BONUS
+WHERE EXISTS(SELECT RANK() OVER (PARTITION BY hiredate ORDER BY salary) AS s
+                    FROM EMP, DEPT where EMP.dept_id = DEPT.dept_id
+                        AND DEPT.dept_name < BONUS.emp_name)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "(dept_name#x < outer(emp_name#x))\nFilter ((dept_id#x = 
dept_id#x) AND (dept_name#x < outer(emp_name#x)))\n+- Join Inner\n   :- 
SubqueryAlias emp\n   :  +- View (`EMP`, 
[id#x,emp_name#x,hiredate#x,salary#x,dept_id#x])\n   :     +- Project 
[cast(id#x as int) AS id#x, cast(emp_name#x as string) AS emp_name#x, 
cast(hiredate#x as date) AS hiredate#x, cast(salary#x as double) AS salary#x, 
cast(dept_id#x as int) AS dept_id#x]\n   :        +- Project [id#x, emp_name#x, 
hiredat [...]
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 34,
+    "stopIndex" : 224,
+    "fragment" : "SELECT RANK() OVER (PARTITION BY hiredate ORDER BY salary) 
AS s\n                    FROM EMP, DEPT where EMP.dept_id = DEPT.dept_id\n     
                   AND DEPT.dept_name < BONUS.emp_name"
+  } ]
+}
diff --git 
a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-group-by.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-group-by.sql.out
index 4c715342bee..b2456891f92 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-group-by.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/subquery/in-subquery/in-group-by.sql.out
@@ -352,3 +352,28 @@ struct<t1a:string,min(t1b):smallint>
 t1a    16
 t1b    8
 t1c    8
+
+
+-- !query
+select t1a
+from t1
+where t1f IN (SELECT RANK() OVER (partition by t3c  order by t2b) as s
+                             FROM t2, t3 where t2.t2c = t3.t3c and t2.t2a < 
t1.t1a)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : 
"UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY.CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE",
+  "sqlState" : "0A000",
+  "messageParameters" : {
+    "treeNode" : "(t2a#x < outer(t1a#x))\nFilter ((t2c#x = t3c#x) AND (t2a#x < 
outer(t1a#x)))\n+- Join Inner\n   :- SubqueryAlias t2\n   :  +- View (`t2`, 
[t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])\n   :     +- Project 
[cast(t2a#x as string) AS t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x 
as int) AS t2c#x, cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS 
t2e#x, cast(t2f#x as double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, 
cast(t2h#x as timestamp) AS [...]
+  },
+  "queryContext" : [ {
+    "objectType" : "",
+    "objectName" : "",
+    "startIndex" : 34,
+    "stopIndex" : 172,
+    "fragment" : "SELECT RANK() OVER (partition by t3c  order by t2b) as s\n   
                          FROM t2, t3 where t2.t2c = t3.t3c and t2.t2a < t1.t1a"
+  } ]
+}
diff --git 
a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
index 302c5e6dd7e..e8c85ccb89b 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
@@ -443,6 +443,85 @@ val1b
 val1c
 
 
+-- !query
+SELECT 1
+FROM t1
+WHERE t1b < (SELECT MAX(tmp.s) FROM (
+             SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s
+               FROM t2 WHERE t2.t2d = t1.t1d) as tmp)
+-- !query schema
+struct<1:int>
+-- !query output
+1
+1
+1
+1
+
+
+-- !query
+SELECT 1
+FROM t1
+WHERE t1b < (SELECT MAX(tmp.s) FROM (
+             SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s
+               FROM t2 WHERE t2.t2d <= t1.t1d) as tmp)
+-- !query schema
+struct<1:int>
+-- !query output
+1
+1
+1
+1
+1
+1
+
+
+-- !query
+SELECT t1b
+FROM t1
+WHERE t1b > (SELECT MAX(tmp.s) FROM (
+             SELECT RANK() OVER (partition by t3c, t2b order by t3c) as s
+               FROM t2, t3 where t2.t2c = t3.t3c AND t2.t2a = t1.t1a) as tmp)
+-- !query schema
+struct<t1b:smallint>
+-- !query output
+8
+8
+
+
+-- !query
+SELECT t1b
+FROM t1
+WHERE t1b > (SELECT MAX(tmp.s) FROM (
+             SELECT RANK() OVER (partition by t3c, t3d order by t3c) as s
+               FROM (SELECT t3b, t3c, max(t3d) as t3d FROM t3 GROUP BY t3b, 
t3c) as g) as tmp)
+ORDER BY t1b
+-- !query schema
+struct<t1b:smallint>
+-- !query output
+6
+6
+8
+8
+10
+10
+10
+10
+16
+16
+
+
+-- !query
+SELECT 1
+FROM t1
+WHERE t1b = (SELECT MAX(tmp.s) FROM (
+             SELECT SUM(t2c) OVER (partition by t2c order by t1.t1d + t2d) as s
+               FROM t2) as tmp)
+-- !query schema
+struct<1:int>
+-- !query output
+
+
+
 -- !query
 CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0)
 -- !query schema
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
index 63b17c05b63..17f20dbcdd3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SubquerySuite.scala
@@ -860,7 +860,7 @@ class SubquerySuite extends QueryTest
       checkErrorMatchPVals(
         exception4,
         errorClass = "UNSUPPORTED_SUBQUERY_EXPRESSION_CATEGORY." +
-          "ACCESSING_OUTER_QUERY_COLUMN_IS_NOT_ALLOWED",
+          "CORRELATED_COLUMN_IS_NOT_ALLOWED_IN_PREDICATE",
         parameters = Map("treeNode" -> "(?s).*"),
         sqlState = None,
         context = ExpectedContext(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to