allisonwang-db commented on code in PR #39479:
URL: https://github.com/apache/spark/pull/39479#discussion_r1069000543


##########
sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql:
##########
@@ -177,6 +177,25 @@ SELECT * FROM t3 JOIN LATERAL (SELECT EXPLODE_OUTER(c2));
 SELECT * FROM t3 JOIN LATERAL (SELECT EXPLODE(c2)) t(c3) ON c1 = c3;
 SELECT * FROM t3 LEFT JOIN LATERAL (SELECT EXPLODE(c2)) t(c3) ON c1 = c3;
 
+-- SPARK-41961: lateral join with table-valued functions
+SELECT * FROM LATERAL EXPLODE(ARRAY(1, 2));

Review Comment:
   Yes for example Postgres can run this query:
   ```
   select * from lateral unnest(array[1, 2, 3]);
   ```



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeOneRowRelationSubquerySuite.scala:
##########
@@ -177,4 +177,27 @@ class OptimizeOneRowRelationSubquerySuite extends PlanTest 
{
     val optimized = Optimize.execute(query2.analyze)
     assertHasDomainJoin(optimized)
   }
+
+  test("SPARK-41961: optimize lateral subquery with table-valued functions") {
+    // SELECT * FROM t3 JOIN LATERAL EXPLODE(arr)
+    val query1 = t3.lateralJoin(UnresolvedTableValuedFunction("explode", 
$"arr" :: Nil))
+    comparePlans(
+      Optimize.execute(query1.analyze),
+      t3.generate(Explode($"arr")).analyze)
+
+    // SELECT * FROM t3 JOIN LATERAL EXPLODE(arr) t(v)
+    val query2 = t3.lateralJoin(
+      UnresolvedTVFAliases("explode" :: Nil,

Review Comment:
   This `name` field of `UnresolvedTVFAliases` is actually the name of the 
function instead of the relation. It's used to throw proper error messages in 
the Analyzer:
   
https://github.com/apache/spark/blob/604d1a55221259118693dfe9b6b0979a67712473/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2479-L2487
   
    I will update the plan to include a SubqueryAlias.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeOneRowRelationSubquerySuite.scala:
##########
@@ -177,4 +177,27 @@ class OptimizeOneRowRelationSubquerySuite extends PlanTest 
{
     val optimized = Optimize.execute(query2.analyze)
     assertHasDomainJoin(optimized)
   }
+
+  test("SPARK-41961: optimize lateral subquery with table-valued functions") {
+    // SELECT * FROM t3 JOIN LATERAL EXPLODE(arr)
+    val query1 = t3.lateralJoin(UnresolvedTableValuedFunction("explode", 
$"arr" :: Nil))
+    comparePlans(
+      Optimize.execute(query1.analyze),
+      t3.generate(Explode($"arr")).analyze)
+
+    // SELECT * FROM t3 JOIN LATERAL EXPLODE(arr) t(v)
+    val query2 = t3.lateralJoin(
+      UnresolvedTVFAliases("explode" :: Nil,
+        UnresolvedTableValuedFunction("explode", $"arr" :: Nil), "v" :: Nil))
+    comparePlans(
+      Optimize.execute(query2.analyze),
+      t3.generate(Explode($"arr")).select($"a", $"b", $"arr", 
$"col".as("v")).analyze)
+
+    // SELECT col FROM t3 JOIN LATERAL (SELECT * FROM EXPLODE(arr) WHERE col > 
0)
+    val query3 = t3.lateralJoin(
+      UnresolvedTableValuedFunction("explode", $"arr" :: Nil).where($"col" > 
0))
+    val optimized = Optimize.execute(query3.analyze)
+    optimized.exists(_.isInstanceOf[Generate])
+    assertHasDomainJoin(optimized)

Review Comment:
   We actually can decorrelate through Generate, as long as it does not have 
requiredChildOutput:
   
https://github.com/apache/spark/blob/604d1a55221259118693dfe9b6b0979a67712473/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala#L670
   But the plan will have a domain join.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to