allisonwang-db commented on code in PR #39479: URL: https://github.com/apache/spark/pull/39479#discussion_r1069000543
########## sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql: ########## @@ -177,6 +177,25 @@ SELECT * FROM t3 JOIN LATERAL (SELECT EXPLODE_OUTER(c2)); SELECT * FROM t3 JOIN LATERAL (SELECT EXPLODE(c2)) t(c3) ON c1 = c3; SELECT * FROM t3 LEFT JOIN LATERAL (SELECT EXPLODE(c2)) t(c3) ON c1 = c3; +-- SPARK-41961: lateral join with table-valued functions +SELECT * FROM LATERAL EXPLODE(ARRAY(1, 2)); Review Comment: Yes for example Postgres can run this query: ``` select * from lateral unnest(array[1, 2, 3]); ``` ########## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeOneRowRelationSubquerySuite.scala: ########## @@ -177,4 +177,27 @@ class OptimizeOneRowRelationSubquerySuite extends PlanTest { val optimized = Optimize.execute(query2.analyze) assertHasDomainJoin(optimized) } + + test("SPARK-41961: optimize lateral subquery with table-valued functions") { + // SELECT * FROM t3 JOIN LATERAL EXPLODE(arr) + val query1 = t3.lateralJoin(UnresolvedTableValuedFunction("explode", $"arr" :: Nil)) + comparePlans( + Optimize.execute(query1.analyze), + t3.generate(Explode($"arr")).analyze) + + // SELECT * FROM t3 JOIN LATERAL EXPLODE(arr) t(v) + val query2 = t3.lateralJoin( + UnresolvedTVFAliases("explode" :: Nil, Review Comment: This `name` field of `UnresolvedTVFAliases` is actually the name of the function instead of the relation. It's used to throw proper error messages in the Analyzer: https://github.com/apache/spark/blob/604d1a55221259118693dfe9b6b0979a67712473/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L2479-L2487 I will update the plan to include a SubqueryAlias. ########## sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/OptimizeOneRowRelationSubquerySuite.scala: ########## @@ -177,4 +177,27 @@ class OptimizeOneRowRelationSubquerySuite extends PlanTest { val optimized = Optimize.execute(query2.analyze) assertHasDomainJoin(optimized) } + + test("SPARK-41961: optimize lateral subquery with table-valued functions") { + // SELECT * FROM t3 JOIN LATERAL EXPLODE(arr) + val query1 = t3.lateralJoin(UnresolvedTableValuedFunction("explode", $"arr" :: Nil)) + comparePlans( + Optimize.execute(query1.analyze), + t3.generate(Explode($"arr")).analyze) + + // SELECT * FROM t3 JOIN LATERAL EXPLODE(arr) t(v) + val query2 = t3.lateralJoin( + UnresolvedTVFAliases("explode" :: Nil, + UnresolvedTableValuedFunction("explode", $"arr" :: Nil), "v" :: Nil)) + comparePlans( + Optimize.execute(query2.analyze), + t3.generate(Explode($"arr")).select($"a", $"b", $"arr", $"col".as("v")).analyze) + + // SELECT col FROM t3 JOIN LATERAL (SELECT * FROM EXPLODE(arr) WHERE col > 0) + val query3 = t3.lateralJoin( + UnresolvedTableValuedFunction("explode", $"arr" :: Nil).where($"col" > 0)) + val optimized = Optimize.execute(query3.analyze) + optimized.exists(_.isInstanceOf[Generate]) + assertHasDomainJoin(optimized) Review Comment: We actually can decorrelate through Generate, as long as it does not have requiredChildOutput: https://github.com/apache/spark/blob/604d1a55221259118693dfe9b6b0979a67712473/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala#L670 But the plan will have a domain join. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org