maropu commented on a change in pull request #32303:
URL: https://github.com/apache/spark/pull/32303#discussion_r637627094



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -1663,6 +1675,27 @@ class Analyzer(override val catalogManager: 
CatalogManager)
     }
   }
 
+  /**
+   * Optionally resolve the name parts using the outer query plan and wrap 
resolved attributes
+   * with [[OuterReference]]s.
+   */
+  private def resolveOuterReference(

Review comment:
       nit: `tryResolveOuterReference`?

##########
File path: 
sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
##########
@@ -643,12 +643,12 @@ setQuantifier
     ;
 
 relation
-    : relationPrimary joinRelation*
+    : LATERAL? relationPrimary joinRelation*
     ;
 
 joinRelation
-    : (joinType) JOIN right=relationPrimary joinCriteria?
-    | NATURAL joinType JOIN right=relationPrimary
+    : (joinType) JOIN LATERAL? right=relationPrimary joinCriteria?
+    | NATURAL joinType JOIN LATERAL? right=relationPrimary

Review comment:
       Oh, super cleaner than the previous one. Nice.

##########
File path: sql/core/src/test/resources/sql-tests/inputs/join-lateral.sql
##########
@@ -0,0 +1,97 @@
+-- Test cases for lateral join
+
+CREATE VIEW t1(c1, c2) AS VALUES (0, 1), (1, 2);
+CREATE VIEW t2(c1, c2) AS VALUES (0, 2), (0, 3);
+
+-- lateral join with single column select
+SELECT * FROM t1, LATERAL (SELECT c1);
+SELECT * FROM t1, LATERAL (SELECT c1 FROM t2);
+SELECT * FROM t1, LATERAL (SELECT t1.c1 FROM t2);
+SELECT * FROM t1, LATERAL (SELECT t1.c1 + t2.c1 FROM t2);
+
+-- lateral join with star expansion

Review comment:
       Could you add tests for `quotedRegexColumnNames`, too?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala
##########
@@ -168,6 +168,21 @@ object EliminateOuterJoin extends Rule[LogicalPlan] with 
PredicateHelper {
   }
 }
 
+/**
+ * Rewrite lateral joins by rewriting all dependent joins (if any) inside the 
right
+ * sub-tree of the lateral join and converting the lateral join into a base 
join type.
+ */
+object RewriteLateralJoin extends Rule[LogicalPlan] with PredicateHelper {
+
+  def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
+    case j @ Join(left, right, LateralJoin(joinType), condition, _) =>
+      val conditions = condition.map(splitConjunctivePredicates).getOrElse(Nil)
+      val newRight = DecorrelateInnerQuery.rewriteDomainJoins(left, right, 
conditions)
+      // TODO: handle the COUNT bug

Review comment:
       Can you add the jira number (SPARK-15370) here?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -1607,33 +1622,30 @@ class Analyzer(override val catalogManager: 
CatalogManager)
             }
           }
           f1.copy(arguments = f1.arguments.flatMap {
-            case s: Star => s.expand(child, resolver)
+            case s: Star => expand(s)
             case o => o :: Nil
           })
         case c: CreateNamedStruct if containsStar(c.valExprs) =>
           val newChildren = c.children.grouped(2).flatMap {
-            case Seq(k, s : Star) => CreateStruct(s.expand(child, 
resolver)).children
+            case Seq(k, s : Star) => CreateStruct(expand(s)).children
             case kv => kv
           }
           c.copy(children = newChildren.toList )
         case c: CreateArray if containsStar(c.children) =>
           c.copy(children = c.children.flatMap {
-            case s: Star => s.expand(child, resolver)
+            case s: Star => expand(s)
             case o => o :: Nil
           })
         case p: Murmur3Hash if containsStar(p.children) =>
           p.copy(children = p.children.flatMap {
-            case s: Star => s.expand(child, resolver)
+            case s: Star => expand(s)
             case o => o :: Nil
           })
         case p: XxHash64 if containsStar(p.children) =>
           p.copy(children = p.children.flatMap {
-            case s: Star => s.expand(child, resolver)
+            case s: Star => expand(s)
             case o => o :: Nil
           })
-        // count(*) has been replaced by count(1)
-        case o if containsStar(o.children) =>
-          throw QueryCompilationErrors.invalidStarUsageError(s"expression 
'${o.prettyName}'")

Review comment:
       Did this error handling is moved into `CheckAnalysis` 
https://github.com/apache/spark/pull/32303/files#diff-583171e935b2dc349378063a5841c5b98b30a2d57ac3743a9eccfe7bffcb8f2aR177-R179
 ?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -276,6 +276,7 @@ class Analyzer(override val catalogManager: CatalogManager)
       ResolveAliases ::
       ResolveSubquery ::
       ResolveSubqueryColumnAliases ::
+      ResolveLateralJoin ::

Review comment:
       Since `ResolveLateralJoin` has a similar functionality with 
`ResolveReferences`, how about putting this rule just after `ResolveReferences`?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -2395,6 +2421,70 @@ class Analyzer(override val catalogManager: 
CatalogManager)
     }
   }
 
+  /**
+   * This rule resolves lateral joins.
+   */
+  object ResolveLateralJoin extends Rule[LogicalPlan] {

Review comment:
       `ResolveReferencesInLateralJoin`?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
##########
@@ -174,13 +174,21 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
                 hof.failAnalysis(
                   s"cannot resolve '${hof.sql}' due to argument data type 
mismatch: $message")
             }
+          // Check if there are expressions with children containing star 
expressions.
+          case e if e.children.exists(_.isInstanceOf[Star]) =>
+            e.failAnalysis(s"Invalid usage of '*' in expression 
'${e.prettyName}'")
         }
 
         operator transformExpressionsUp {
           case a: Attribute if !a.resolved =>
             val from = 
operator.inputSet.toSeq.map(_.qualifiedName).mkString(", ")
             a.failAnalysis(s"cannot resolve '${a.sql}' given input columns: 
[$from]")
 
+          case s @ UnresolvedStar(Some(target)) =>

Review comment:
       We still need this error check even if we check it in L178?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
##########
@@ -174,13 +174,21 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
                 hof.failAnalysis(
                   s"cannot resolve '${hof.sql}' due to argument data type 
mismatch: $message")
             }
+          // Check if there are expressions with children containing star 
expressions.
+          case e if e.children.exists(_.isInstanceOf[Star]) =>
+            e.failAnalysis(s"Invalid usage of '*' in expression 
'${e.prettyName}'")

Review comment:
       We cannot use `QueryCompilationErrors.invalidStarUsageError` here?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/DecorrelateInnerQuery.scala
##########
@@ -244,7 +244,8 @@ object DecorrelateInnerQuery extends PredicateHelper {
             case _ => Join(child, domain, Inner, None, JoinHint.NONE)
           }
         } else {
-          throw 
QueryExecutionErrors.cannotRewriteDomainJoinWithConditionsError(conditions, d)

Review comment:
       ditto: It seems no one uses this method now?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
##########
@@ -254,6 +262,27 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
               s"join condition '${condition.sql}' " +
                 s"of type ${condition.dataType.catalogString} is not a 
boolean.")
 
+          case Join(left, right, LateralJoin(_), _, _) =>
+            // Check all outer references in the right operand are a subset of 
output columns
+            // of the left operand.
+            def checkOuterReferences(plan: LogicalPlan, outer: LogicalPlan): 
Unit = {
+              plan match {
+                case Join(left, _, LateralJoin(_), _, _) =>
+                  checkOuterReferences(left, outer)
+                case p: LogicalPlan => p transformExpressions {
+                  case o @ OuterReference(e) if !outer.outputSet.contains(e) =>
+                    val column = 
outer.outputSet.toSeq.map(_.qualifiedName).mkString(", ")
+                    o.failAnalysis(
+                      s"Found an outer column reference '${e.sql}' in a 
lateral subquery " +
+                      s"that is not present in the preceding FROM items of its 
own query " +

Review comment:
       nit: remove `s`

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
##########
@@ -402,10 +402,8 @@ case class UnresolvedStar(target: Option[Seq[String]]) 
extends Star with Unevalu
           throw 
QueryCompilationErrors.starExpandDataTypeNotSupportedError(target.get)
       }
     } else {
-      val from = input.inputSet.map(_.name).mkString(", ")
-      val targetString = target.get.mkString(".")
-      throw 
QueryCompilationErrors.cannotResolveStarExpandGivenInputColumnsError(

Review comment:
       It seems no one uses this method now?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
##########
@@ -174,13 +174,21 @@ trait CheckAnalysis extends PredicateHelper with 
LookupCatalog {
                 hof.failAnalysis(
                   s"cannot resolve '${hof.sql}' due to argument data type 
mismatch: $message")
             }
+          // Check if there are expressions with children containing star 
expressions.
+          case e if e.children.exists(_.isInstanceOf[Star]) =>
+            e.failAnalysis(s"Invalid usage of '*' in expression 
'${e.prettyName}'")

Review comment:
       If we add this rule here, I think we need to update the comment above.
   
https://github.com/apache/spark/pull/32303/files#diff-583171e935b2dc349378063a5841c5b98b30a2d57ac3743a9eccfe7bffcb8f2aL165-L168

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/joinTypes.scala
##########
@@ -107,6 +107,11 @@ case class UsingJoin(tpe: JoinType, usingColumns: 
Seq[String]) extends JoinType
   override def sql: String = "USING " + tpe.sql
 }
 
+case class LateralJoin(tpe: JoinType) extends JoinType {
+  require(Seq(Inner, LeftOuter, Cross).contains(tpe), "Unsupported lateral 
join type " + tpe)

Review comment:
       > Currently INNER, CROSS and LEFT OUTER join types are supported with 
lateral join
   
   If we can support the other join types in future, how about checking this 
requirement in the `AstBuilder` side?

##########
File path: 
sql/core/src/test/resources/sql-tests/inputs/postgreSQL/groupingsets.sql
##########
@@ -499,14 +499,14 @@ SELECT a, b, count(*), max(a), max(b) FROM gstest3 GROUP 
BY GROUPING SETS(a, b,(
 -- COMMIT;
 
 -- More rescan tests
--- [SPARK-27877] ANSI SQL: LATERAL derived table(T491)
+-- Expressions referencing the outer query are not supported outside of 
WHERE/HAVING clauses:
 -- select * from (values (1),(2)) v(a) left join lateral (select v.a, four, 
ten, count(*) from onek group by cube(four,ten)) s on true order by 
v.a,four,ten;
 -- [SPARK-27878] Support ARRAY(sub-SELECT) expressions
 -- select array(select row(v.a,s1.*) from (select two,four, count(*) from onek 
group by cube(two,four) order by two,four) s1) from (values (1),(2)) v(a);
 
 -- Rescan logic changes when there are no empty grouping sets, so test
 -- that too:
--- [SPARK-27877] ANSI SQL: LATERAL derived table(T491)
+-- Expressions referencing the outer query are not supported outside of 
WHERE/HAVING clauses:

Review comment:
       We can support this case in future? If we can, can you file jira for it 
and update the jira number above (`[SPARK-27877] ANSI SQL: LATERAL derived 
table(T491)`)?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -2395,6 +2421,70 @@ class Analyzer(override val catalogManager: 
CatalogManager)
     }
   }
 
+  /**
+   * This rule resolves lateral joins.
+   */
+  object ResolveLateralJoin extends Rule[LogicalPlan] {

Review comment:
       Q: Btw, is it difficult to merge this resolution logic with the 
`ResolveReferences` one?

##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/DeduplicateRelations.scala
##########
@@ -182,6 +182,16 @@ object DeduplicateRelations extends Rule[LogicalPlan] {
           if 
findAliases(projectList).intersect(conflictingAttributes).nonEmpty =>
         Seq((oldVersion, oldVersion.copy(projectList = 
newAliases(projectList))))
 
+      // Handle projects that create conflicting outer references.
+      case oldVersion @ Project(projectList, _)

Review comment:
       cc: @Ngone51 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to