allisonwang-db commented on a change in pull request #32303:
URL: https://github.com/apache/spark/pull/32303#discussion_r620556049



##########
File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
##########
@@ -2234,6 +2270,74 @@ class Analyzer(override val catalogManager: 
CatalogManager)
     }
   }
 
+  /**
+   * This rule resolves lateral joins.
+   */
+  object ResolveLateralJoin extends Rule[LogicalPlan] {
+    import ResolveReferences._
+
+    /**
+     * Build a project list for Project/Aggregate and expand the star if 
possible by first using
+     * the inner query plan. If failed, use the outer query plan to expand the 
star and wrap all
+     * expanded attributes in [[OuterReference]]s.
+     */
+    private def expandOuterReference(
+        expressions: Seq[NamedExpression],
+        inner: LogicalPlan,
+        outer: LogicalPlan): Seq[NamedExpression] = {
+
+      def expandInner(star: Star): Seq[NamedExpression] =
+        star.expand(inner, resolver)
+
+      // Leave the star unchanged if the outer plan is unable to resolve the 
star.
+      // Otherwise wrap the resolved attributes in outer references.
+      def expandOuter(star: Star): Seq[NamedExpression] = {
+        star.expand(outer, resolver).map {
+          case s: Star => s
+          case other => other
+            .transform { case a: Attribute => OuterReference(a) }
+            .asInstanceOf[NamedExpression]
+        }
+      }
+
+      buildExpandedProjectList(
+        buildExpandedProjectList(expressions, inner, expandInner),
+        outer,
+        expandOuter)
+    }
+
+    /**
+     * Resolve the right sub-tree by first using the right query plan itself, 
and then try
+     * resolving the unresolved attributes and star expressions using the left 
query plan.
+     */
+    private def resolveRightChild(right: LogicalPlan, left: LogicalPlan): 
LogicalPlan = {
+      right.resolveOperators {
+        case p: LogicalPlan if !p.childrenResolved => p
+        case p: Project if containsStar(p.projectList) =>
+          p.copy(projectList = expandOuterReference(p.projectList, p.child, 
left))
+        case a: Aggregate if containsStar(a.aggregateExpressions) =>
+          a.copy(aggregateExpressions =
+            expandOuterReference(a.aggregateExpressions, a.child, left))
+        case p: LogicalPlan if !p.resolved =>
+          p transformExpressions {
+            case u @ UnresolvedAttribute(nameParts) =>
+              withPosition(u) {
+                p.resolveChildren(nameParts, resolver)
+                  .orElse(resolveLiteralFunction(nameParts))
+                  .orElse(resolveOuterReference(nameParts, left))

Review comment:
       Hmm I am getting this optimized plan from the query:
   ```
   scala> sql("SELECT * FROM t1, LATERAL (SELECT c2 FROM t2 WHERE t1.c1 = 
t2.c1) tx(a);").explain(true)
   ...
   == Optimized Logical Plan ==
   Project [c1#0, c2#1, a#8]
   +- Join Inner, (c1#0 = c1#4)
      :- LocalRelation [c1#0, c2#1]
      +- LocalRelation [a#8, c1#4]
   ``` 
   The outer references inside the Filter should have been pulled up as join 
conditions.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to