This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 21c27d55b34 [SPARK-36191][SQL] Handle limit and order by in correlated 
scalar (lateral) subqueries
21c27d55b34 is described below

commit 21c27d55b342eb32bdf2d56a81dd9adb23f5526e
Author: Andrey Gubichev <andrey.gubic...@databricks.com>
AuthorDate: Thu Sep 14 09:09:05 2023 +0800

    [SPARK-36191][SQL] Handle limit and order by in correlated scalar (lateral) 
subqueries
    
    ### What changes were proposed in this pull request?
    
    Handle LIMIT/ORDER BY in the correlated scalar (lateral) subqueries by 
rewriting them using ROW_NUMBER() window function.
    
    ### Why are the changes needed?
    Extends our coverage of subqueries
    
    ### Does this PR introduce _any_ user-facing change?
    
    Users are able to run more subqueries now
    
    ### How was this patch tested?
    
    Unit tests and query tests. Results of query tests are verified against 
PostgreSQL.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #42705 from agubichev/SPARK-36191-limit.
    
    Authored-by: Andrey Gubichev <andrey.gubic...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/catalyst/analysis/CheckAnalysis.scala      |   5 +
 .../catalyst/optimizer/DecorrelateInnerQuery.scala |  33 +++
 .../sql/catalyst/analysis/AnalysisErrorSuite.scala |   8 -
 .../optimizer/DecorrelateInnerQuerySuite.scala     | 111 +++++++
 .../analyzer-results/join-lateral.sql.out          | 155 ++++++++++
 .../analyzer-results/postgreSQL/join.sql.out       | 318 +++++++++++++++++++++
 .../scalar-subquery-predicate.sql.out              | 123 ++++++++
 .../resources/sql-tests/inputs/join-lateral.sql    |  27 ++
 .../resources/sql-tests/inputs/postgreSQL/join.sql |  82 +++---
 .../scalar-subquery/scalar-subquery-predicate.sql  |  32 +++
 .../sql-tests/results/join-lateral.sql.out         |  81 ++++++
 .../sql-tests/results/postgreSQL/join.sql.out      |  82 ++++++
 .../scalar-subquery-predicate.sql.out              |  66 +++++
 13 files changed, 1074 insertions(+), 49 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 038cd7d944a..4cfeb60eb04 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -1418,6 +1418,11 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog with QueryErrorsB
           failOnInvalidOuterReference(g)
           checkPlan(g.child, aggregated, canContainOuter)
 
+        // Correlated subquery can have a LIMIT clause
+        case l @ Limit(_, input) =>
+          failOnInvalidOuterReference(l)
+          checkPlan(input, aggregated, canContainOuter)
+
         // Category 4: Any other operators not in the above 3 categories
         // cannot be on a correlation path, that is they are allowed only
         // under a correlation point but they and their descendant operators
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
index a07177f6e8a..1cb7c8f3ac6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
@@ -655,6 +655,39 @@ object DecorrelateInnerQuery extends PredicateHelper {
             val newProject = Project(newProjectList ++ referencesToAdd, 
newChild)
             (newProject, joinCond, outerReferenceMap)
 
+          case Limit(limit, input) =>
+            // LIMIT K (with potential ORDER BY) is decorrelated by computing 
K rows per every
+            // domain value via a row_number() window function. For example, 
for a subquery
+            // (SELECT T2.a FROM T2 WHERE T2.b = OuterReference(x) ORDER BY 
T2.c LIMIT 3)
+            // -- we need to get top 3 values of T2.a (ordering by T2.c) for 
every value of x.
+            // Following our general decorrelation procedure, 'x' is then 
replaced by T2.b, so the
+            // subquery is decorrelated as:
+            // SELECT * FROM (
+            //   SELECT T2.a, row_number() OVER (PARTITION BY T2.b ORDER BY 
T2.c) AS rn FROM T2)
+            // WHERE rn <= 3
+            val (child, ordering) = input match {
+              case Sort(order, _, child) => (child, order)
+              case _ => (input, Seq())
+            }
+            val (newChild, joinCond, outerReferenceMap) =
+              decorrelate(child, parentOuterReferences, aggregated = true, 
underSetOp)
+            val collectedChildOuterReferences = 
collectOuterReferencesInPlanTree(child)
+            // Add outer references to the PARTITION BY clause
+            val partitionFields = 
collectedChildOuterReferences.map(outerReferenceMap(_)).toSeq
+            val orderByFields = replaceOuterReferences(ordering, 
outerReferenceMap)
+
+            val rowNumber = WindowExpression(RowNumber(),
+              WindowSpecDefinition(partitionFields, orderByFields,
+                SpecifiedWindowFrame(RowFrame, UnboundedPreceding, 
CurrentRow)))
+            val rowNumberAlias = Alias(rowNumber, "rn")()
+            // Window function computes row_number() when partitioning by 
correlated references,
+            // and projects all the other fields from the input.
+            val window = Window(Seq(rowNumberAlias),
+              partitionFields, orderByFields, newChild)
+            val filter = Filter(LessThanOrEqual(rowNumberAlias.toAttribute, 
limit), window)
+            val project = Project(newChild.output, filter)
+            (project, joinCond, outerReferenceMap)
+
           case w @ Window(projectList, partitionSpec, orderSpec, child) =>
             val outerReferences = collectOuterReferences(w.expressions)
             assert(outerReferences.isEmpty, s"Correlated column is not allowed 
in window " +
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
index 6fd4ce7e084..2a2b40117bb 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnalysisErrorSuite.scala
@@ -1027,14 +1027,6 @@ class AnalysisErrorSuite extends AnalysisTest {
       LocalRelation(a))
     assertAnalysisError(plan3, "Accessing outer query column is not allowed 
in" :: Nil)
 
-    val plan4 = Filter(
-      Exists(
-        Limit(1,
-          Filter(EqualTo(UnresolvedAttribute("a"), b), LocalRelation(b)))
-      ),
-      LocalRelation(a))
-    assertAnalysisError(plan4, "Accessing outer query column is not allowed 
in" :: Nil)
-
     val plan5 = Filter(
       Exists(
         Sample(0.0, 0.5, false, 1L,
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala
index 0f7b01c0715..1923c7836b2 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuerySuite.scala
@@ -59,6 +59,25 @@ class DecorrelateInnerQuerySuite extends PlanTest {
     joinCond.zip(conditions).foreach(e => compareExpressions(e._1, e._2))
   }
 
+  private def check(
+      outputPlan: LogicalPlan,
+      joinCond: Seq[Expression],
+      correctAnswer: LogicalPlan,
+      conditions: Seq[Expression]): Unit = {
+    assert(!hasOuterReferences(outputPlan))
+    comparePlans(outputPlan, correctAnswer)
+    assert(joinCond.length == conditions.length)
+    joinCond.zip(conditions).foreach(e => compareExpressions(e._1, e._2))
+  }
+
+  // For tests involving window functions: extract and return the ROW_NUMBER 
function
+  // from the 'input' plan.
+  private def getRowNumberFunc(input: LogicalPlan): Alias = {
+    val windowFunction = input.collect({ case w: Window => w }).head
+    windowFunction.expressions.collect(
+      { case w: Alias if w.child.isInstanceOf[WindowExpression] => w }).head
+  }
+
   test("filter with correlated equality predicates only") {
     val outerPlan = testRelation2
     val innerPlan =
@@ -625,4 +644,96 @@ class DecorrelateInnerQuerySuite extends PlanTest {
     }
     assert(e.getMessage.contains("Correlated column is not allowed in"))
   }
+
+  test("SPARK-36191: limit in the correlated subquery") {
+    val outerPlan = testRelation
+    val innerPlan =
+      Project(Seq(x),
+        Limit(1, Filter(OuterReference(a) === x,
+          testRelation2)))
+    val (outputPlan, joinCond) = DecorrelateInnerQuery(innerPlan, 
outerPlan.select())
+
+    val alias = getRowNumberFunc(outputPlan)
+
+    val correctAnswer = Project(Seq(x), Project(Seq(x, y, z),
+      Filter(GreaterThanOrEqual(1, alias.toAttribute),
+        Window(Seq(alias), Seq(x), Nil, testRelation2))))
+    check(outputPlan, joinCond, correctAnswer, Seq(x === a))
+  }
+
+  test("SPARK-36191: limit and order by in the correlated subquery") {
+    val outerPlan = testRelation
+    val innerPlan =
+      Project(Seq(x),
+        Limit(5, Sort(Seq(SortOrder(x, Ascending)), true,
+          Filter(OuterReference(a) > x,
+            testRelation2))))
+
+    val (outputPlan, joinCond) = DecorrelateInnerQuery(innerPlan, 
outerPlan.select())
+
+    val alias = getRowNumberFunc(outputPlan)
+    val rowNumber = WindowExpression(RowNumber(),
+      WindowSpecDefinition(Seq(a), Seq(SortOrder(x, Ascending)),
+        SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)))
+    val rowNumberAlias = Alias(rowNumber, alias.name)()
+
+    val correctAnswer = Project(Seq(x, a), Project(Seq(a, x, y, z),
+      Filter(LessThanOrEqual(rowNumberAlias.toAttribute, 5),
+        Window(Seq(rowNumberAlias), Seq(a), Seq(SortOrder(x, Ascending)),
+          Filter(GreaterThan(a, x),
+            DomainJoin(Seq(a), testRelation2))))))
+    check(outputPlan, joinCond, correctAnswer, Seq(a <=> a))
+  }
+
+  test("SPARK-36191: limit and order by in the correlated subquery with 
aggregation") {
+    val outerPlan = testRelation
+    val minY = Alias(min(y), "min_y")()
+
+    val innerPlan =
+      Project(Seq(x),
+        Limit(5, Sort(Seq(SortOrder(minY.toAttribute, Ascending)), true,
+          Aggregate(Seq(x), Seq(minY, x),
+            Filter(OuterReference(a) > x,
+              testRelation2)))))
+
+    val (outputPlan, joinCond) = DecorrelateInnerQuery(innerPlan, 
outerPlan.select())
+
+    val alias = getRowNumberFunc(outputPlan)
+    val rowNumber = WindowExpression(RowNumber(),
+      WindowSpecDefinition(Seq(a), Seq(SortOrder(minY.toAttribute, Ascending)),
+        SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)))
+    val rowNumberAlias = Alias(rowNumber, alias.name)()
+    val correctAnswer = Project(Seq(x, a), Project(Seq(minY.toAttribute, x, a),
+      Filter(LessThanOrEqual(rowNumberAlias.toAttribute, 5),
+        Window(Seq(rowNumberAlias), Seq(a),
+          Seq(SortOrder(minY.toAttribute, Ascending)),
+          Aggregate(Seq(x, a), Seq(minY, x, a),
+            Filter(GreaterThan(a, x),
+              DomainJoin(Seq(a), testRelation2)))))))
+    check(outputPlan, joinCond, correctAnswer, Seq(a <=> a))
+
+  }
+
+  test("SPARK-36191: order by with correlated attribute") {
+    val outerPlan = testRelation
+    val innerPlan =
+      Project(Seq(x),
+        Limit(5, Sort(Seq(SortOrder(OuterReference(a), Ascending)), true,
+          Filter(OuterReference(a) > x,
+            testRelation2))))
+    val (outputPlan, joinCond) = DecorrelateInnerQuery(innerPlan, 
outerPlan.select())
+
+    val alias = getRowNumberFunc(outputPlan)
+    val rowNumber = WindowExpression(RowNumber(),
+      WindowSpecDefinition(Seq(a), Seq(SortOrder(a, Ascending)),
+        SpecifiedWindowFrame(RowFrame, UnboundedPreceding, CurrentRow)))
+    val rowNumberAlias = Alias(rowNumber, alias.name)()
+
+    val correctAnswer = Project(Seq(x, a), Project(Seq(a, x, y, z),
+      Filter(LessThanOrEqual(rowNumberAlias.toAttribute, 5),
+        Window(Seq(rowNumberAlias), Seq(a), Seq(SortOrder(a, Ascending)),
+          Filter(GreaterThan(a, x),
+            DomainJoin(Seq(a), testRelation2))))))
+    check(outputPlan, joinCond, correctAnswer, Seq(a <=> a))
+  }
 }
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out 
b/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out
index 1ad033d8273..6fdb0dbfeed 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/join-lateral.sql.out
@@ -2861,6 +2861,161 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
 }
 
 
+-- !query
+select * from t1 join lateral (select * from t2 where t1.c1 = t2.c1 and t1.c2 
< t2.c2 limit 1)
+-- !query analysis
+Project [c1#x, c2#x, c1#x, c2#x]
++- LateralJoin lateral-subquery#x [c1#x && c2#x], Inner
+   :  +- SubqueryAlias __auto_generated_subquery_name
+   :     +- GlobalLimit 1
+   :        +- LocalLimit 1
+   :           +- Project [c1#x, c2#x]
+   :              +- Filter ((outer(c1#x) = c1#x) AND (outer(c2#x) < c2#x))
+   :                 +- SubqueryAlias spark_catalog.default.t2
+   :                    +- View (`spark_catalog`.`default`.`t2`, [c1#x,c2#x])
+   :                       +- Project [cast(col1#x as int) AS c1#x, 
cast(col2#x as int) AS c2#x]
+   :                          +- LocalRelation [col1#x, col2#x]
+   +- SubqueryAlias spark_catalog.default.t1
+      +- View (`spark_catalog`.`default`.`t1`, [c1#x,c2#x])
+         +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]
+            +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+select * from t1 join lateral (select * from t4 where t1.c1 <= t4.c1 order by 
t4.c2 limit 10)
+-- !query analysis
+Project [c1#x, c2#x, c1#x, c2#x]
++- LateralJoin lateral-subquery#x [c1#x], Inner
+   :  +- SubqueryAlias __auto_generated_subquery_name
+   :     +- GlobalLimit 10
+   :        +- LocalLimit 10
+   :           +- Sort [c2#x ASC NULLS FIRST], true
+   :              +- Project [c1#x, c2#x]
+   :                 +- Filter (outer(c1#x) <= c1#x)
+   :                    +- SubqueryAlias spark_catalog.default.t4
+   :                       +- View (`spark_catalog`.`default`.`t4`, 
[c1#x,c2#x])
+   :                          +- Project [cast(col1#x as int) AS c1#x, 
cast(col2#x as int) AS c2#x]
+   :                             +- LocalRelation [col1#x, col2#x]
+   +- SubqueryAlias spark_catalog.default.t1
+      +- View (`spark_catalog`.`default`.`t1`, [c1#x,c2#x])
+         +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]
+            +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+select * from t1 join lateral (select c1, min(c2) as m
+                               from t2 where t1.c1 = t2.c1 and t1.c2 < t2.c2
+                               group by t2.c1
+                               order by m)
+-- !query analysis
+Project [c1#x, c2#x, c1#x, m#x]
++- LateralJoin lateral-subquery#x [c1#x && c2#x], Inner
+   :  +- SubqueryAlias __auto_generated_subquery_name
+   :     +- Sort [m#x ASC NULLS FIRST], true
+   :        +- Aggregate [c1#x], [c1#x, min(c2#x) AS m#x]
+   :           +- Filter ((outer(c1#x) = c1#x) AND (outer(c2#x) < c2#x))
+   :              +- SubqueryAlias spark_catalog.default.t2
+   :                 +- View (`spark_catalog`.`default`.`t2`, [c1#x,c2#x])
+   :                    +- Project [cast(col1#x as int) AS c1#x, cast(col2#x 
as int) AS c2#x]
+   :                       +- LocalRelation [col1#x, col2#x]
+   +- SubqueryAlias spark_catalog.default.t1
+      +- View (`spark_catalog`.`default`.`t1`, [c1#x,c2#x])
+         +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]
+            +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+select * from t1 join lateral (select c1, min(c2) as m
+                               from t4 where t1.c1 = t4.c1
+                               group by t4.c1
+                               limit 1)
+-- !query analysis
+Project [c1#x, c2#x, c1#x, m#x]
++- LateralJoin lateral-subquery#x [c1#x], Inner
+   :  +- SubqueryAlias __auto_generated_subquery_name
+   :     +- GlobalLimit 1
+   :        +- LocalLimit 1
+   :           +- Aggregate [c1#x], [c1#x, min(c2#x) AS m#x]
+   :              +- Filter (outer(c1#x) = c1#x)
+   :                 +- SubqueryAlias spark_catalog.default.t4
+   :                    +- View (`spark_catalog`.`default`.`t4`, [c1#x,c2#x])
+   :                       +- Project [cast(col1#x as int) AS c1#x, 
cast(col2#x as int) AS c2#x]
+   :                          +- LocalRelation [col1#x, col2#x]
+   +- SubqueryAlias spark_catalog.default.t1
+      +- View (`spark_catalog`.`default`.`t1`, [c1#x,c2#x])
+         +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]
+            +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+select * from t1 join lateral
+  ((select t4.c2 from t4 where t1.c1 <= t4.c1 order by t4.c2 limit 1)
+   union all
+   (select t4.c1 from t4 where t1.c1 = t4.c1 order by t4.c1 limit 3))
+-- !query analysis
+Project [c1#x, c2#x, c2#x]
++- LateralJoin lateral-subquery#x [c1#x && c1#x], Inner
+   :  +- SubqueryAlias __auto_generated_subquery_name
+   :     +- Union false, false
+   :        :- GlobalLimit 1
+   :        :  +- LocalLimit 1
+   :        :     +- Sort [c2#x ASC NULLS FIRST], true
+   :        :        +- Project [c2#x]
+   :        :           +- Filter (outer(c1#x) <= c1#x)
+   :        :              +- SubqueryAlias spark_catalog.default.t4
+   :        :                 +- View (`spark_catalog`.`default`.`t4`, 
[c1#x,c2#x])
+   :        :                    +- Project [cast(col1#x as int) AS c1#x, 
cast(col2#x as int) AS c2#x]
+   :        :                       +- LocalRelation [col1#x, col2#x]
+   :        +- GlobalLimit 3
+   :           +- LocalLimit 3
+   :              +- Sort [c1#x ASC NULLS FIRST], true
+   :                 +- Project [c1#x]
+   :                    +- Filter (outer(c1#x) = c1#x)
+   :                       +- SubqueryAlias spark_catalog.default.t4
+   :                          +- View (`spark_catalog`.`default`.`t4`, 
[c1#x,c2#x])
+   :                             +- Project [cast(col1#x as int) AS c1#x, 
cast(col2#x as int) AS c2#x]
+   :                                +- LocalRelation [col1#x, col2#x]
+   +- SubqueryAlias spark_catalog.default.t1
+      +- View (`spark_catalog`.`default`.`t1`, [c1#x,c2#x])
+         +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]
+            +- LocalRelation [col1#x, col2#x]
+
+
+-- !query
+select * from t1 join lateral
+  (select * from
+   ((select t4.c2 as t from t4 where t1.c1 <= t4.c1)
+   union all
+   (select t4.c1 as t from t4 where t1.c1 = t4.c1)) as foo
+   order by foo.t limit 5)
+-- !query analysis
+Project [c1#x, c2#x, t#x]
++- LateralJoin lateral-subquery#x [c1#x && c1#x], Inner
+   :  +- SubqueryAlias __auto_generated_subquery_name
+   :     +- GlobalLimit 5
+   :        +- LocalLimit 5
+   :           +- Sort [t#x ASC NULLS FIRST], true
+   :              +- Project [t#x]
+   :                 +- SubqueryAlias foo
+   :                    +- Union false, false
+   :                       :- Project [c2#x AS t#x]
+   :                       :  +- Filter (outer(c1#x) <= c1#x)
+   :                       :     +- SubqueryAlias spark_catalog.default.t4
+   :                       :        +- View (`spark_catalog`.`default`.`t4`, 
[c1#x,c2#x])
+   :                       :           +- Project [cast(col1#x as int) AS 
c1#x, cast(col2#x as int) AS c2#x]
+   :                       :              +- LocalRelation [col1#x, col2#x]
+   :                       +- Project [c1#x AS t#x]
+   :                          +- Filter (outer(c1#x) = c1#x)
+   :                             +- SubqueryAlias spark_catalog.default.t4
+   :                                +- View (`spark_catalog`.`default`.`t4`, 
[c1#x,c2#x])
+   :                                   +- Project [cast(col1#x as int) AS 
c1#x, cast(col2#x as int) AS c2#x]
+   :                                      +- LocalRelation [col1#x, col2#x]
+   +- SubqueryAlias spark_catalog.default.t1
+      +- View (`spark_catalog`.`default`.`t1`, [c1#x,c2#x])
+         +- Project [cast(col1#x as int) AS c1#x, cast(col2#x as int) AS c2#x]
+            +- LocalRelation [col1#x, col2#x]
+
+
 -- !query
 DROP VIEW t1
 -- !query analysis
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/join.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/join.sql.out
index cf5c79333e8..c66326e020e 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/join.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/postgreSQL/join.sql.out
@@ -2678,6 +2678,324 @@ Project [f1#x, q1#xL, q2#xL, f1#x, f1#x]
                         +- LocalRelation [col1#x]
 
 
+-- !query
+select * from
+   text_tbl t1
+   left join int8_tbl i8
+   on i8.q2 = 123,
+   lateral (select i8.q1, t2.f1 from text_tbl t2 limit 1) as ss
+where t1.f1 = ss.f1
+-- !query analysis
+Project [f1#x, q1#xL, q2#xL, q1#xL, f1#x]
++- Filter (f1#x = f1#x)
+   +- LateralJoin lateral-subquery#x [q1#xL], Inner
+      :  +- SubqueryAlias ss
+      :     +- GlobalLimit 1
+      :        +- LocalLimit 1
+      :           +- Project [outer(q1#xL) AS q1#xL, f1#x]
+      :              +- SubqueryAlias t2
+      :                 +- SubqueryAlias text_tbl
+      :                    +- View (`TEXT_TBL`, [f1#x])
+      :                       +- Project [cast(f1#x as string) AS f1#x]
+      :                          +- Project [f1#x]
+      :                             +- SubqueryAlias v
+      :                                +- Project [col1#x AS f1#x]
+      :                                   +- LocalRelation [col1#x]
+      +- Join LeftOuter, (q2#xL = cast(123 as bigint))
+         :- SubqueryAlias t1
+         :  +- SubqueryAlias text_tbl
+         :     +- View (`TEXT_TBL`, [f1#x])
+         :        +- Project [cast(f1#x as string) AS f1#x]
+         :           +- Project [f1#x]
+         :              +- SubqueryAlias v
+         :                 +- Project [col1#x AS f1#x]
+         :                    +- LocalRelation [col1#x]
+         +- SubqueryAlias i8
+            +- SubqueryAlias int8_tbl
+               +- View (`INT8_TBL`, [q1#xL,q2#xL])
+                  +- Project [cast(q1#xL as bigint) AS q1#xL, cast(q2#xL as 
bigint) AS q2#xL]
+                     +- Project [q1#xL, q2#xL]
+                        +- SubqueryAlias v
+                           +- Project [col1#xL AS q1#xL, col2#xL AS q2#xL]
+                              +- LocalRelation [col1#xL, col2#xL]
+
+
+-- !query
+select * from
+  text_tbl t1
+  left join int8_tbl i8
+  on i8.q2 = 123,
+  lateral (select i8.q1, t2.f1 from text_tbl t2 limit 1) as ss
+where t1.f1 = ss.f1
+-- !query analysis
+Project [f1#x, q1#xL, q2#xL, q1#xL, f1#x]
++- Filter (f1#x = f1#x)
+   +- LateralJoin lateral-subquery#x [q1#xL], Inner
+      :  +- SubqueryAlias ss
+      :     +- GlobalLimit 1
+      :        +- LocalLimit 1
+      :           +- Project [outer(q1#xL) AS q1#xL, f1#x]
+      :              +- SubqueryAlias t2
+      :                 +- SubqueryAlias text_tbl
+      :                    +- View (`TEXT_TBL`, [f1#x])
+      :                       +- Project [cast(f1#x as string) AS f1#x]
+      :                          +- Project [f1#x]
+      :                             +- SubqueryAlias v
+      :                                +- Project [col1#x AS f1#x]
+      :                                   +- LocalRelation [col1#x]
+      +- Join LeftOuter, (q2#xL = cast(123 as bigint))
+         :- SubqueryAlias t1
+         :  +- SubqueryAlias text_tbl
+         :     +- View (`TEXT_TBL`, [f1#x])
+         :        +- Project [cast(f1#x as string) AS f1#x]
+         :           +- Project [f1#x]
+         :              +- SubqueryAlias v
+         :                 +- Project [col1#x AS f1#x]
+         :                    +- LocalRelation [col1#x]
+         +- SubqueryAlias i8
+            +- SubqueryAlias int8_tbl
+               +- View (`INT8_TBL`, [q1#xL,q2#xL])
+                  +- Project [cast(q1#xL as bigint) AS q1#xL, cast(q2#xL as 
bigint) AS q2#xL]
+                     +- Project [q1#xL, q2#xL]
+                        +- SubqueryAlias v
+                           +- Project [col1#xL AS q1#xL, col2#xL AS q2#xL]
+                              +- LocalRelation [col1#xL, col2#xL]
+
+
+-- !query
+select * from
+  text_tbl t1
+  left join int8_tbl i8
+  on i8.q2 = 123,
+  lateral (select i8.q1, t2.f1 from text_tbl t2 limit 1) as ss1,
+  lateral (select ss1.* from text_tbl t3 limit 1) as ss2
+where t1.f1 = ss2.f1
+-- !query analysis
+Project [f1#x, q1#xL, q2#xL, q1#xL, f1#x, q1#xL, f1#x]
++- Filter (f1#x = f1#x)
+   +- LateralJoin lateral-subquery#x [q1#xL && f1#x], Inner
+      :  +- SubqueryAlias ss2
+      :     +- GlobalLimit 1
+      :        +- LocalLimit 1
+      :           +- Project [outer(q1#xL) AS q1#xL, outer(f1#x) AS f1#x]
+      :              +- SubqueryAlias t3
+      :                 +- SubqueryAlias text_tbl
+      :                    +- View (`TEXT_TBL`, [f1#x])
+      :                       +- Project [cast(f1#x as string) AS f1#x]
+      :                          +- Project [f1#x]
+      :                             +- SubqueryAlias v
+      :                                +- Project [col1#x AS f1#x]
+      :                                   +- LocalRelation [col1#x]
+      +- LateralJoin lateral-subquery#x [q1#xL], Inner
+         :  +- SubqueryAlias ss1
+         :     +- GlobalLimit 1
+         :        +- LocalLimit 1
+         :           +- Project [outer(q1#xL) AS q1#xL, f1#x]
+         :              +- SubqueryAlias t2
+         :                 +- SubqueryAlias text_tbl
+         :                    +- View (`TEXT_TBL`, [f1#x])
+         :                       +- Project [cast(f1#x as string) AS f1#x]
+         :                          +- Project [f1#x]
+         :                             +- SubqueryAlias v
+         :                                +- Project [col1#x AS f1#x]
+         :                                   +- LocalRelation [col1#x]
+         +- Join LeftOuter, (q2#xL = cast(123 as bigint))
+            :- SubqueryAlias t1
+            :  +- SubqueryAlias text_tbl
+            :     +- View (`TEXT_TBL`, [f1#x])
+            :        +- Project [cast(f1#x as string) AS f1#x]
+            :           +- Project [f1#x]
+            :              +- SubqueryAlias v
+            :                 +- Project [col1#x AS f1#x]
+            :                    +- LocalRelation [col1#x]
+            +- SubqueryAlias i8
+               +- SubqueryAlias int8_tbl
+                  +- View (`INT8_TBL`, [q1#xL,q2#xL])
+                     +- Project [cast(q1#xL as bigint) AS q1#xL, cast(q2#xL as 
bigint) AS q2#xL]
+                        +- Project [q1#xL, q2#xL]
+                           +- SubqueryAlias v
+                              +- Project [col1#xL AS q1#xL, col2#xL AS q2#xL]
+                                 +- LocalRelation [col1#xL, col2#xL]
+
+
+-- !query
+select * from
+  text_tbl t1
+  left join int8_tbl i8
+  on i8.q2 = 123,
+  lateral (select i8.q1, t2.f1 from text_tbl t2 limit 1) as ss1,
+  lateral (select ss1.* from text_tbl t3 limit 1) as ss2
+where t1.f1 = ss2.f1
+-- !query analysis
+Project [f1#x, q1#xL, q2#xL, q1#xL, f1#x, q1#xL, f1#x]
++- Filter (f1#x = f1#x)
+   +- LateralJoin lateral-subquery#x [q1#xL && f1#x], Inner
+      :  +- SubqueryAlias ss2
+      :     +- GlobalLimit 1
+      :        +- LocalLimit 1
+      :           +- Project [outer(q1#xL) AS q1#xL, outer(f1#x) AS f1#x]
+      :              +- SubqueryAlias t3
+      :                 +- SubqueryAlias text_tbl
+      :                    +- View (`TEXT_TBL`, [f1#x])
+      :                       +- Project [cast(f1#x as string) AS f1#x]
+      :                          +- Project [f1#x]
+      :                             +- SubqueryAlias v
+      :                                +- Project [col1#x AS f1#x]
+      :                                   +- LocalRelation [col1#x]
+      +- LateralJoin lateral-subquery#x [q1#xL], Inner
+         :  +- SubqueryAlias ss1
+         :     +- GlobalLimit 1
+         :        +- LocalLimit 1
+         :           +- Project [outer(q1#xL) AS q1#xL, f1#x]
+         :              +- SubqueryAlias t2
+         :                 +- SubqueryAlias text_tbl
+         :                    +- View (`TEXT_TBL`, [f1#x])
+         :                       +- Project [cast(f1#x as string) AS f1#x]
+         :                          +- Project [f1#x]
+         :                             +- SubqueryAlias v
+         :                                +- Project [col1#x AS f1#x]
+         :                                   +- LocalRelation [col1#x]
+         +- Join LeftOuter, (q2#xL = cast(123 as bigint))
+            :- SubqueryAlias t1
+            :  +- SubqueryAlias text_tbl
+            :     +- View (`TEXT_TBL`, [f1#x])
+            :        +- Project [cast(f1#x as string) AS f1#x]
+            :           +- Project [f1#x]
+            :              +- SubqueryAlias v
+            :                 +- Project [col1#x AS f1#x]
+            :                    +- LocalRelation [col1#x]
+            +- SubqueryAlias i8
+               +- SubqueryAlias int8_tbl
+                  +- View (`INT8_TBL`, [q1#xL,q2#xL])
+                     +- Project [cast(q1#xL as bigint) AS q1#xL, cast(q2#xL as 
bigint) AS q2#xL]
+                        +- Project [q1#xL, q2#xL]
+                           +- SubqueryAlias v
+                              +- Project [col1#xL AS q1#xL, col2#xL AS q2#xL]
+                                 +- LocalRelation [col1#xL, col2#xL]
+
+
+-- !query
+select 1 from
+  text_tbl as tt1
+  inner join text_tbl as tt2 on (tt1.f1 = 'foo')
+  left join text_tbl as tt3 on (tt3.f1 = 'foo')
+  left join text_tbl as tt4 on (tt3.f1 = tt4.f1),
+  lateral (select tt4.f1 as c0 from text_tbl as tt5 limit 1) as ss1
+where tt1.f1 = ss1.c0
+-- !query analysis
+Project [1 AS 1#x]
++- Filter (f1#x = c0#x)
+   +- LateralJoin lateral-subquery#x [f1#x], Inner
+      :  +- SubqueryAlias ss1
+      :     +- GlobalLimit 1
+      :        +- LocalLimit 1
+      :           +- Project [outer(f1#x) AS c0#x]
+      :              +- SubqueryAlias tt5
+      :                 +- SubqueryAlias text_tbl
+      :                    +- View (`TEXT_TBL`, [f1#x])
+      :                       +- Project [cast(f1#x as string) AS f1#x]
+      :                          +- Project [f1#x]
+      :                             +- SubqueryAlias v
+      :                                +- Project [col1#x AS f1#x]
+      :                                   +- LocalRelation [col1#x]
+      +- Join LeftOuter, (f1#x = f1#x)
+         :- Join LeftOuter, (f1#x = foo)
+         :  :- Join Inner, (f1#x = foo)
+         :  :  :- SubqueryAlias tt1
+         :  :  :  +- SubqueryAlias text_tbl
+         :  :  :     +- View (`TEXT_TBL`, [f1#x])
+         :  :  :        +- Project [cast(f1#x as string) AS f1#x]
+         :  :  :           +- Project [f1#x]
+         :  :  :              +- SubqueryAlias v
+         :  :  :                 +- Project [col1#x AS f1#x]
+         :  :  :                    +- LocalRelation [col1#x]
+         :  :  +- SubqueryAlias tt2
+         :  :     +- SubqueryAlias text_tbl
+         :  :        +- View (`TEXT_TBL`, [f1#x])
+         :  :           +- Project [cast(f1#x as string) AS f1#x]
+         :  :              +- Project [f1#x]
+         :  :                 +- SubqueryAlias v
+         :  :                    +- Project [col1#x AS f1#x]
+         :  :                       +- LocalRelation [col1#x]
+         :  +- SubqueryAlias tt3
+         :     +- SubqueryAlias text_tbl
+         :        +- View (`TEXT_TBL`, [f1#x])
+         :           +- Project [cast(f1#x as string) AS f1#x]
+         :              +- Project [f1#x]
+         :                 +- SubqueryAlias v
+         :                    +- Project [col1#x AS f1#x]
+         :                       +- LocalRelation [col1#x]
+         +- SubqueryAlias tt4
+            +- SubqueryAlias text_tbl
+               +- View (`TEXT_TBL`, [f1#x])
+                  +- Project [cast(f1#x as string) AS f1#x]
+                     +- Project [f1#x]
+                        +- SubqueryAlias v
+                           +- Project [col1#x AS f1#x]
+                              +- LocalRelation [col1#x]
+
+
+-- !query
+select 1 from
+  text_tbl as tt1
+  inner join text_tbl as tt2 on (tt1.f1 = 'foo')
+  left join text_tbl as tt3 on (tt3.f1 = 'foo')
+  left join text_tbl as tt4 on (tt3.f1 = tt4.f1),
+  lateral (select tt4.f1 as c0 from text_tbl as tt5 limit 1) as ss1
+where tt1.f1 = ss1.c0
+-- !query analysis
+Project [1 AS 1#x]
++- Filter (f1#x = c0#x)
+   +- LateralJoin lateral-subquery#x [f1#x], Inner
+      :  +- SubqueryAlias ss1
+      :     +- GlobalLimit 1
+      :        +- LocalLimit 1
+      :           +- Project [outer(f1#x) AS c0#x]
+      :              +- SubqueryAlias tt5
+      :                 +- SubqueryAlias text_tbl
+      :                    +- View (`TEXT_TBL`, [f1#x])
+      :                       +- Project [cast(f1#x as string) AS f1#x]
+      :                          +- Project [f1#x]
+      :                             +- SubqueryAlias v
+      :                                +- Project [col1#x AS f1#x]
+      :                                   +- LocalRelation [col1#x]
+      +- Join LeftOuter, (f1#x = f1#x)
+         :- Join LeftOuter, (f1#x = foo)
+         :  :- Join Inner, (f1#x = foo)
+         :  :  :- SubqueryAlias tt1
+         :  :  :  +- SubqueryAlias text_tbl
+         :  :  :     +- View (`TEXT_TBL`, [f1#x])
+         :  :  :        +- Project [cast(f1#x as string) AS f1#x]
+         :  :  :           +- Project [f1#x]
+         :  :  :              +- SubqueryAlias v
+         :  :  :                 +- Project [col1#x AS f1#x]
+         :  :  :                    +- LocalRelation [col1#x]
+         :  :  +- SubqueryAlias tt2
+         :  :     +- SubqueryAlias text_tbl
+         :  :        +- View (`TEXT_TBL`, [f1#x])
+         :  :           +- Project [cast(f1#x as string) AS f1#x]
+         :  :              +- Project [f1#x]
+         :  :                 +- SubqueryAlias v
+         :  :                    +- Project [col1#x AS f1#x]
+         :  :                       +- LocalRelation [col1#x]
+         :  +- SubqueryAlias tt3
+         :     +- SubqueryAlias text_tbl
+         :        +- View (`TEXT_TBL`, [f1#x])
+         :           +- Project [cast(f1#x as string) AS f1#x]
+         :              +- Project [f1#x]
+         :                 +- SubqueryAlias v
+         :                    +- Project [col1#x AS f1#x]
+         :                       +- LocalRelation [col1#x]
+         +- SubqueryAlias tt4
+            +- SubqueryAlias text_tbl
+               +- View (`TEXT_TBL`, [f1#x])
+                  +- Project [cast(f1#x as string) AS f1#x]
+                     +- Project [f1#x]
+                        +- SubqueryAlias v
+                           +- Project [col1#x AS f1#x]
+                              +- LocalRelation [col1#x]
+
+
 -- !query
 select * from
   (select 1 as id) as xx
diff --git 
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
 
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
index 5b66c8461d0..d7ba2a8615c 100644
--- 
a/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/analyzer-results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
@@ -910,6 +910,38 @@ Project [1 AS 1#x]
                   +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, 
t1g#x, t1h#x, t1i#x]
 
 
+-- !query
+SELECT 1
+FROM t1
+WHERE t1b < (SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s
+               FROM t2 WHERE t2.t2d = t1.t1d
+             ORDER BY s DESC
+             LIMIT 1)
+-- !query analysis
+Project [1 AS 1#x]
++- Filter (cast(t1b#x as bigint) < scalar-subquery#x [t1d#xL])
+   :  +- GlobalLimit 1
+   :     +- LocalLimit 1
+   :        +- Sort [s#xL DESC NULLS LAST], true
+   :           +- Project [s#xL]
+   :              +- Project [t2b#x, t2c#x, t2d#xL, s#xL, s#xL]
+   :                 +- Window [sum(t2b#x) windowspecdefinition(t2c#x, t2d#xL 
ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), 
currentrow$())) AS s#xL], [t2c#x], [t2d#xL ASC NULLS FIRST]
+   :                    +- Project [t2b#x, t2c#x, t2d#xL]
+   :                       +- Filter (t2d#xL = outer(t1d#xL))
+   :                          +- SubqueryAlias t2
+   :                             +- View (`t2`, 
[t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])
+   :                                +- Project [cast(t2a#x as string) AS 
t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, 
cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as 
double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as 
timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]
+   :                                   +- Project [t2a#x, t2b#x, t2c#x, 
t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   :                                      +- SubqueryAlias t2
+   :                                         +- LocalRelation [t2a#x, t2b#x, 
t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   +- SubqueryAlias t1
+      +- View (`t1`, [t1a#x,t1b#x,t1c#x,t1d#xL,t1e#x,t1f#x,t1g#x,t1h#x,t1i#x])
+         +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) 
AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, 
cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as 
decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) 
AS t1i#x]
+            +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, 
t1h#x, t1i#x]
+               +- SubqueryAlias t1
+                  +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, 
t1g#x, t1h#x, t1i#x]
+
+
 -- !query
 SELECT 1
 FROM t1
@@ -940,6 +972,38 @@ Project [1 AS 1#x]
                   +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, 
t1g#x, t1h#x, t1i#x]
 
 
+-- !query
+SELECT 1
+FROM t1
+WHERE t1b < (SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s
+               FROM t2 WHERE t2.t2d <= t1.t1d
+             ORDER BY s DESC
+             LIMIT 1)
+-- !query analysis
+Project [1 AS 1#x]
++- Filter (cast(t1b#x as bigint) < scalar-subquery#x [t1d#xL])
+   :  +- GlobalLimit 1
+   :     +- LocalLimit 1
+   :        +- Sort [s#xL DESC NULLS LAST], true
+   :           +- Project [s#xL]
+   :              +- Project [t2b#x, t2c#x, t2d#xL, s#xL, s#xL]
+   :                 +- Window [sum(t2b#x) windowspecdefinition(t2c#x, t2d#xL 
ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), 
currentrow$())) AS s#xL], [t2c#x], [t2d#xL ASC NULLS FIRST]
+   :                    +- Project [t2b#x, t2c#x, t2d#xL]
+   :                       +- Filter (t2d#xL <= outer(t1d#xL))
+   :                          +- SubqueryAlias t2
+   :                             +- View (`t2`, 
[t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])
+   :                                +- Project [cast(t2a#x as string) AS 
t2a#x, cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, 
cast(t2d#xL as bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as 
double) AS t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as 
timestamp) AS t2h#x, cast(t2i#x as date) AS t2i#x]
+   :                                   +- Project [t2a#x, t2b#x, t2c#x, 
t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   :                                      +- SubqueryAlias t2
+   :                                         +- LocalRelation [t2a#x, t2b#x, 
t2c#x, t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   +- SubqueryAlias t1
+      +- View (`t1`, [t1a#x,t1b#x,t1c#x,t1d#xL,t1e#x,t1f#x,t1g#x,t1h#x,t1i#x])
+         +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) 
AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, 
cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as 
decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) 
AS t1i#x]
+            +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, 
t1h#x, t1i#x]
+               +- SubqueryAlias t1
+                  +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, 
t1g#x, t1h#x, t1i#x]
+
+
 -- !query
 SELECT t1b
 FROM t1
@@ -1039,6 +1103,65 @@ Project [1 AS 1#x]
                   +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, 
t1g#x, t1h#x, t1i#x]
 
 
+-- !query
+SELECT t1a, t1b
+FROM   t1
+WHERE  t1c = (SELECT t2c
+              FROM   t2
+              WHERE  t2b < t1b
+              ORDER BY t2d LIMIT 1)
+-- !query analysis
+Project [t1a#x, t1b#x]
++- Filter (t1c#x = scalar-subquery#x [t1b#x])
+   :  +- GlobalLimit 1
+   :     +- LocalLimit 1
+   :        +- Project [t2c#x]
+   :           +- Sort [t2d#xL ASC NULLS FIRST], true
+   :              +- Project [t2c#x, t2d#xL]
+   :                 +- Filter (t2b#x < outer(t1b#x))
+   :                    +- SubqueryAlias t2
+   :                       +- View (`t2`, 
[t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])
+   :                          +- Project [cast(t2a#x as string) AS t2a#x, 
cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as 
bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS 
t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, 
cast(t2i#x as date) AS t2i#x]
+   :                             +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, 
t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   :                                +- SubqueryAlias t2
+   :                                   +- LocalRelation [t2a#x, t2b#x, t2c#x, 
t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   +- SubqueryAlias t1
+      +- View (`t1`, [t1a#x,t1b#x,t1c#x,t1d#xL,t1e#x,t1f#x,t1g#x,t1h#x,t1i#x])
+         +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) 
AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, 
cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as 
decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) 
AS t1i#x]
+            +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, 
t1h#x, t1i#x]
+               +- SubqueryAlias t1
+                  +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, 
t1g#x, t1h#x, t1i#x]
+
+
+-- !query
+SELECT t1a, t1b
+FROM   t1
+WHERE  t1c = (SELECT t2c
+              FROM   t2
+              WHERE  t2c = t1c
+              ORDER BY t2c LIMIT 1)
+-- !query analysis
+Project [t1a#x, t1b#x]
++- Filter (t1c#x = scalar-subquery#x [t1c#x])
+   :  +- GlobalLimit 1
+   :     +- LocalLimit 1
+   :        +- Sort [t2c#x ASC NULLS FIRST], true
+   :           +- Project [t2c#x]
+   :              +- Filter (t2c#x = outer(t1c#x))
+   :                 +- SubqueryAlias t2
+   :                    +- View (`t2`, 
[t2a#x,t2b#x,t2c#x,t2d#xL,t2e#x,t2f#x,t2g#x,t2h#x,t2i#x])
+   :                       +- Project [cast(t2a#x as string) AS t2a#x, 
cast(t2b#x as smallint) AS t2b#x, cast(t2c#x as int) AS t2c#x, cast(t2d#xL as 
bigint) AS t2d#xL, cast(t2e#x as float) AS t2e#x, cast(t2f#x as double) AS 
t2f#x, cast(t2g#x as decimal(4,0)) AS t2g#x, cast(t2h#x as timestamp) AS t2h#x, 
cast(t2i#x as date) AS t2i#x]
+   :                          +- Project [t2a#x, t2b#x, t2c#x, t2d#xL, t2e#x, 
t2f#x, t2g#x, t2h#x, t2i#x]
+   :                             +- SubqueryAlias t2
+   :                                +- LocalRelation [t2a#x, t2b#x, t2c#x, 
t2d#xL, t2e#x, t2f#x, t2g#x, t2h#x, t2i#x]
+   +- SubqueryAlias t1
+      +- View (`t1`, [t1a#x,t1b#x,t1c#x,t1d#xL,t1e#x,t1f#x,t1g#x,t1h#x,t1i#x])
+         +- Project [cast(t1a#x as string) AS t1a#x, cast(t1b#x as smallint) 
AS t1b#x, cast(t1c#x as int) AS t1c#x, cast(t1d#xL as bigint) AS t1d#xL, 
cast(t1e#x as float) AS t1e#x, cast(t1f#x as double) AS t1f#x, cast(t1g#x as 
decimal(4,0)) AS t1g#x, cast(t1h#x as timestamp) AS t1h#x, cast(t1i#x as date) 
AS t1i#x]
+            +- Project [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, t1g#x, 
t1h#x, t1i#x]
+               +- SubqueryAlias t1
+                  +- LocalRelation [t1a#x, t1b#x, t1c#x, t1d#xL, t1e#x, t1f#x, 
t1g#x, t1h#x, t1i#x]
+
+
 -- !query
 CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0)
 -- !query analysis
diff --git a/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql 
b/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql
index 841ca94dad6..8bff1f109aa 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql
@@ -504,6 +504,33 @@ SELECT t.* FROM t1 JOIN t3 ON t1.c1 = t3.c1 JOIN LATERAL 
stack(1, t1.c2, t3.c2)
 -- expect error
 SELECT t.* FROM t1, LATERAL stack(c1, c2);
 
+-- SPARK-36191: ORDER BY/LIMIT in the correlated subquery
+select * from t1 join lateral (select * from t2 where t1.c1 = t2.c1 and t1.c2 
< t2.c2 limit 1);
+
+select * from t1 join lateral (select * from t4 where t1.c1 <= t4.c1 order by 
t4.c2 limit 10);
+
+select * from t1 join lateral (select c1, min(c2) as m
+                               from t2 where t1.c1 = t2.c1 and t1.c2 < t2.c2
+                               group by t2.c1
+                               order by m);
+
+select * from t1 join lateral (select c1, min(c2) as m
+                               from t4 where t1.c1 = t4.c1
+                               group by t4.c1
+                               limit 1);
+
+select * from t1 join lateral
+  ((select t4.c2 from t4 where t1.c1 <= t4.c1 order by t4.c2 limit 1)
+   union all
+   (select t4.c1 from t4 where t1.c1 = t4.c1 order by t4.c1 limit 3));
+
+select * from t1 join lateral
+  (select * from
+   ((select t4.c2 as t from t4 where t1.c1 <= t4.c1)
+   union all
+   (select t4.c1 as t from t4 where t1.c1 = t4.c1)) as foo
+   order by foo.t limit 5);
+
 -- clean up
 DROP VIEW t1;
 DROP VIEW t2;
diff --git a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql 
b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql
index dc0b56112d8..adc2af6aca7 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/postgreSQL/join.sql
@@ -1287,53 +1287,53 @@ select * from
 --
 
 -- explain (verbose, costs off)
--- select * from
---   text_tbl t1
---   left join int8_tbl i8
---   on i8.q2 = 123,
---   lateral (select i8.q1, t2.f1 from text_tbl t2 limit 1) as ss
--- where t1.f1 = ss.f1;
+select * from
+   text_tbl t1
+   left join int8_tbl i8
+   on i8.q2 = 123,
+   lateral (select i8.q1, t2.f1 from text_tbl t2 limit 1) as ss
+where t1.f1 = ss.f1;
 
--- select * from
---   text_tbl t1
---   left join int8_tbl i8
---   on i8.q2 = 123,
---   lateral (select i8.q1, t2.f1 from text_tbl t2 limit 1) as ss
--- where t1.f1 = ss.f1;
+select * from
+  text_tbl t1
+  left join int8_tbl i8
+  on i8.q2 = 123,
+  lateral (select i8.q1, t2.f1 from text_tbl t2 limit 1) as ss
+where t1.f1 = ss.f1;
 
 -- explain (verbose, costs off)
--- select * from
---   text_tbl t1
---   left join int8_tbl i8
---   on i8.q2 = 123,
---   lateral (select i8.q1, t2.f1 from text_tbl t2 limit 1) as ss1,
---   lateral (select ss1.* from text_tbl t3 limit 1) as ss2
--- where t1.f1 = ss2.f1;
+select * from
+  text_tbl t1
+  left join int8_tbl i8
+  on i8.q2 = 123,
+  lateral (select i8.q1, t2.f1 from text_tbl t2 limit 1) as ss1,
+  lateral (select ss1.* from text_tbl t3 limit 1) as ss2
+where t1.f1 = ss2.f1;
 
--- select * from
---   text_tbl t1
---   left join int8_tbl i8
---   on i8.q2 = 123,
---   lateral (select i8.q1, t2.f1 from text_tbl t2 limit 1) as ss1,
---   lateral (select ss1.* from text_tbl t3 limit 1) as ss2
--- where t1.f1 = ss2.f1;
+select * from
+  text_tbl t1
+  left join int8_tbl i8
+  on i8.q2 = 123,
+  lateral (select i8.q1, t2.f1 from text_tbl t2 limit 1) as ss1,
+  lateral (select ss1.* from text_tbl t3 limit 1) as ss2
+where t1.f1 = ss2.f1;
 
 -- explain (verbose, costs off)
--- select 1 from
---   text_tbl as tt1
---   inner join text_tbl as tt2 on (tt1.f1 = 'foo')
---   left join text_tbl as tt3 on (tt3.f1 = 'foo')
---   left join text_tbl as tt4 on (tt3.f1 = tt4.f1),
---   lateral (select tt4.f1 as c0 from text_tbl as tt5 limit 1) as ss1
--- where tt1.f1 = ss1.c0;
-
--- select 1 from
---   text_tbl as tt1
---   inner join text_tbl as tt2 on (tt1.f1 = 'foo')
---   left join text_tbl as tt3 on (tt3.f1 = 'foo')
---   left join text_tbl as tt4 on (tt3.f1 = tt4.f1),
---   lateral (select tt4.f1 as c0 from text_tbl as tt5 limit 1) as ss1
--- where tt1.f1 = ss1.c0;
+select 1 from
+  text_tbl as tt1
+  inner join text_tbl as tt2 on (tt1.f1 = 'foo')
+  left join text_tbl as tt3 on (tt3.f1 = 'foo')
+  left join text_tbl as tt4 on (tt3.f1 = tt4.f1),
+  lateral (select tt4.f1 as c0 from text_tbl as tt5 limit 1) as ss1
+where tt1.f1 = ss1.c0;
+
+select 1 from
+  text_tbl as tt1
+  inner join text_tbl as tt2 on (tt1.f1 = 'foo')
+  left join text_tbl as tt3 on (tt3.f1 = 'foo')
+  left join text_tbl as tt4 on (tt3.f1 = tt4.f1),
+  lateral (select tt4.f1 as c0 from text_tbl as tt5 limit 1) as ss1
+where tt1.f1 = ss1.c0;
 
 --
 -- check a case in which a PlaceHolderVar forces join order
diff --git 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql
 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql
index 65adfdcb43c..d2d0ef78171 100644
--- 
a/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql
+++ 
b/sql/core/src/test/resources/sql-tests/inputs/subquery/scalar-subquery/scalar-subquery-predicate.sql
@@ -287,6 +287,13 @@ WHERE t1b < (SELECT MAX(tmp.s) FROM (
              SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s
                FROM t2 WHERE t2.t2d = t1.t1d) as tmp);
 
+-- Same as above but with LIMIT/ORDER BY instead of MAX
+SELECT 1
+FROM t1
+WHERE t1b < (SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s
+               FROM t2 WHERE t2.t2d = t1.t1d
+             ORDER BY s DESC
+             LIMIT 1);
 
 -- SPARK-44549: window function in the correlated subquery with non-equi 
predicate.
 SELECT 1
@@ -295,6 +302,14 @@ WHERE t1b < (SELECT MAX(tmp.s) FROM (
              SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s
                FROM t2 WHERE t2.t2d <= t1.t1d) as tmp);
 
+-- Same as above but with LIMIT/ORDER BY
+SELECT 1
+FROM t1
+WHERE t1b < (SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s
+               FROM t2 WHERE t2.t2d <= t1.t1d
+             ORDER BY s DESC
+             LIMIT 1);
+
 -- SPARK-44549: window function in the correlated subquery over joins.
 SELECT t1b
 FROM t1
@@ -318,6 +333,23 @@ WHERE t1b = (SELECT MAX(tmp.s) FROM (
              SELECT SUM(t2c) OVER (partition by t2c order by t1.t1d + t2d) as s
                FROM t2) as tmp);
 
+-- SPARK-36191: ORDER BY/LIMIT in the correlated subquery, equi-predicate
+SELECT t1a, t1b
+FROM   t1
+WHERE  t1c = (SELECT t2c
+              FROM   t2
+              WHERE  t2b < t1b
+              ORDER BY t2d LIMIT 1);
+
+
+-- SPARK-36191: ORDER BY/LIMIT in the correlated subquery, non-equi-predicate
+SELECT t1a, t1b
+FROM   t1
+WHERE  t1c = (SELECT t2c
+              FROM   t2
+              WHERE  t2c = t1c
+              ORDER BY t2c LIMIT 1);
+
 -- Set operations in correlation path
 
 CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0);
diff --git a/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out 
b/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
index 2053c79e1ab..e24b1ec8dc1 100644
--- a/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/join-lateral.sql.out
@@ -1796,6 +1796,87 @@ org.apache.spark.sql.catalyst.ExtendedAnalysisException
 }
 
 
+-- !query
+select * from t1 join lateral (select * from t2 where t1.c1 = t2.c1 and t1.c2 
< t2.c2 limit 1)
+-- !query schema
+struct<c1:int,c2:int,c1:int,c2:int>
+-- !query output
+0      1       0       2
+
+
+-- !query
+select * from t1 join lateral (select * from t4 where t1.c1 <= t4.c1 order by 
t4.c2 limit 10)
+-- !query schema
+struct<c1:int,c2:int,c1:int,c2:int>
+-- !query output
+0      1       0       1
+0      1       0       2
+0      1       1       1
+0      1       1       3
+1      2       1       1
+1      2       1       3
+
+
+-- !query
+select * from t1 join lateral (select c1, min(c2) as m
+                               from t2 where t1.c1 = t2.c1 and t1.c2 < t2.c2
+                               group by t2.c1
+                               order by m)
+-- !query schema
+struct<c1:int,c2:int,c1:int,m:int>
+-- !query output
+0      1       0       2
+
+
+-- !query
+select * from t1 join lateral (select c1, min(c2) as m
+                               from t4 where t1.c1 = t4.c1
+                               group by t4.c1
+                               limit 1)
+-- !query schema
+struct<c1:int,c2:int,c1:int,m:int>
+-- !query output
+0      1       0       1
+1      2       1       1
+
+
+-- !query
+select * from t1 join lateral
+  ((select t4.c2 from t4 where t1.c1 <= t4.c1 order by t4.c2 limit 1)
+   union all
+   (select t4.c1 from t4 where t1.c1 = t4.c1 order by t4.c1 limit 3))
+-- !query schema
+struct<c1:int,c2:int,c2:int>
+-- !query output
+0      1       0
+0      1       0
+0      1       1
+1      2       1
+1      2       1
+1      2       1
+
+
+-- !query
+select * from t1 join lateral
+  (select * from
+   ((select t4.c2 as t from t4 where t1.c1 <= t4.c1)
+   union all
+   (select t4.c1 as t from t4 where t1.c1 = t4.c1)) as foo
+   order by foo.t limit 5)
+-- !query schema
+struct<c1:int,c2:int,t:int>
+-- !query output
+0      1       0
+0      1       0
+0      1       1
+0      1       1
+0      1       2
+1      2       1
+1      2       1
+1      2       1
+1      2       3
+
+
 -- !query
 DROP VIEW t1
 -- !query schema
diff --git 
a/sql/core/src/test/resources/sql-tests/results/postgreSQL/join.sql.out 
b/sql/core/src/test/resources/sql-tests/results/postgreSQL/join.sql.out
index 7dd59751b3a..55aaff818ea 100644
--- a/sql/core/src/test/resources/sql-tests/results/postgreSQL/join.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/postgreSQL/join.sql.out
@@ -3086,6 +3086,88 @@ doh!     123     456     doh!    NULL
 doh!   123     456     hi de ho neighbor       NULL
 
 
+-- !query
+select * from
+   text_tbl t1
+   left join int8_tbl i8
+   on i8.q2 = 123,
+   lateral (select i8.q1, t2.f1 from text_tbl t2 limit 1) as ss
+where t1.f1 = ss.f1
+-- !query schema
+struct<f1:string,q1:bigint,q2:bigint,q1:bigint,f1:string>
+-- !query output
+doh!   4567890123456789        123     4567890123456789        doh!
+
+
+-- !query
+select * from
+  text_tbl t1
+  left join int8_tbl i8
+  on i8.q2 = 123,
+  lateral (select i8.q1, t2.f1 from text_tbl t2 limit 1) as ss
+where t1.f1 = ss.f1
+-- !query schema
+struct<f1:string,q1:bigint,q2:bigint,q1:bigint,f1:string>
+-- !query output
+doh!   4567890123456789        123     4567890123456789        doh!
+
+
+-- !query
+select * from
+  text_tbl t1
+  left join int8_tbl i8
+  on i8.q2 = 123,
+  lateral (select i8.q1, t2.f1 from text_tbl t2 limit 1) as ss1,
+  lateral (select ss1.* from text_tbl t3 limit 1) as ss2
+where t1.f1 = ss2.f1
+-- !query schema
+struct<f1:string,q1:bigint,q2:bigint,q1:bigint,f1:string,q1:bigint,f1:string>
+-- !query output
+doh!   4567890123456789        123     4567890123456789        doh!    
4567890123456789        doh!
+
+
+-- !query
+select * from
+  text_tbl t1
+  left join int8_tbl i8
+  on i8.q2 = 123,
+  lateral (select i8.q1, t2.f1 from text_tbl t2 limit 1) as ss1,
+  lateral (select ss1.* from text_tbl t3 limit 1) as ss2
+where t1.f1 = ss2.f1
+-- !query schema
+struct<f1:string,q1:bigint,q2:bigint,q1:bigint,f1:string,q1:bigint,f1:string>
+-- !query output
+doh!   4567890123456789        123     4567890123456789        doh!    
4567890123456789        doh!
+
+
+-- !query
+select 1 from
+  text_tbl as tt1
+  inner join text_tbl as tt2 on (tt1.f1 = 'foo')
+  left join text_tbl as tt3 on (tt3.f1 = 'foo')
+  left join text_tbl as tt4 on (tt3.f1 = tt4.f1),
+  lateral (select tt4.f1 as c0 from text_tbl as tt5 limit 1) as ss1
+where tt1.f1 = ss1.c0
+-- !query schema
+struct<1:int>
+-- !query output
+
+
+
+-- !query
+select 1 from
+  text_tbl as tt1
+  inner join text_tbl as tt2 on (tt1.f1 = 'foo')
+  left join text_tbl as tt3 on (tt3.f1 = 'foo')
+  left join text_tbl as tt4 on (tt3.f1 = tt4.f1),
+  lateral (select tt4.f1 as c0 from text_tbl as tt5 limit 1) as ss1
+where tt1.f1 = ss1.c0
+-- !query schema
+struct<1:int>
+-- !query output
+
+
+
 -- !query
 select * from
   (select 1 as id) as xx
diff --git 
a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
index e8c85ccb89b..e714ec0bad0 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/subquery/scalar-subquery/scalar-subquery-predicate.sql.out
@@ -458,6 +458,22 @@ struct<1:int>
 1
 
 
+-- !query
+SELECT 1
+FROM t1
+WHERE t1b < (SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s
+               FROM t2 WHERE t2.t2d = t1.t1d
+             ORDER BY s DESC
+             LIMIT 1)
+-- !query schema
+struct<1:int>
+-- !query output
+1
+1
+1
+1
+
+
 -- !query
 SELECT 1
 FROM t1
@@ -475,6 +491,24 @@ struct<1:int>
 1
 
 
+-- !query
+SELECT 1
+FROM t1
+WHERE t1b < (SELECT SUM(t2b) OVER (partition by t2c order by t2d) as s
+               FROM t2 WHERE t2.t2d <= t1.t1d
+             ORDER BY s DESC
+             LIMIT 1)
+-- !query schema
+struct<1:int>
+-- !query output
+1
+1
+1
+1
+1
+1
+
+
 -- !query
 SELECT t1b
 FROM t1
@@ -522,6 +556,38 @@ struct<1:int>
 
 
 
+-- !query
+SELECT t1a, t1b
+FROM   t1
+WHERE  t1c = (SELECT t2c
+              FROM   t2
+              WHERE  t2b < t1b
+              ORDER BY t2d LIMIT 1)
+-- !query schema
+struct<t1a:string,t1b:smallint>
+-- !query output
+val1a  16
+val1a  16
+
+
+-- !query
+SELECT t1a, t1b
+FROM   t1
+WHERE  t1c = (SELECT t2c
+              FROM   t2
+              WHERE  t2c = t1c
+              ORDER BY t2c LIMIT 1)
+-- !query schema
+struct<t1a:string,t1b:smallint>
+-- !query output
+val1a  16
+val1a  16
+val1b  8
+val1c  8
+val1d  NULL
+val1d  NULL
+
+
 -- !query
 CREATE OR REPLACE TEMP VIEW t0(t0a, t0b) AS VALUES (1, 1), (2, 0)
 -- !query schema


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to