Github user dilipbiswal commented on a diff in the pull request:

    https://github.com/apache/spark/pull/11297#discussion_r56459052
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
    @@ -1329,48 +1329,72 @@ class Analyzer(
       }
     
       /**
    -   * Removes natural joins by calculating output columns based on output 
from two sides,
    -   * Then apply a Project on a normal Join to eliminate natural join.
    +   * Removes natural or using joins by calculating output columns based on 
output from two sides,
    +   * Then apply a Project on a normal Join to eliminate natural or using 
join.
        */
    -  object ResolveNaturalJoin extends Rule[LogicalPlan] {
    +  object ResolveNaturalAndUsingJoin extends Rule[LogicalPlan] {
         override def apply(plan: LogicalPlan): LogicalPlan = plan 
resolveOperators {
    +      case j @ Join(left, right, UsingJoin(joinType, usingCols), condition)
    +          if left.resolved && right.resolved && j.duplicateResolved =>
    +        // Resolve the column names referenced in using clause from both 
the legs of join.
    +        val lCols = usingCols.flatMap(col => left.resolveQuoted(col.name, 
resolver))
    +        val rCols = usingCols.flatMap(col => right.resolveQuoted(col.name, 
resolver))
    +        if ((lCols.length == usingCols.length) && (rCols.length == 
usingCols.length)) {
    +          val joinNames = lCols.map(exp => exp.name)
    +          commonNaturalJoinProcessing(left, right, joinType, joinNames, 
None)
    +        } else {
    +          j
    +        }
           case j @ Join(left, right, NaturalJoin(joinType), condition) if 
j.resolvedExceptNatural =>
             // find common column names from both sides
             val joinNames = 
left.output.map(_.name).intersect(right.output.map(_.name))
    -        val leftKeys = joinNames.map(keyName => left.output.find(_.name == 
keyName).get)
    -        val rightKeys = joinNames.map(keyName => right.output.find(_.name 
== keyName).get)
    -        val joinPairs = leftKeys.zip(rightKeys)
    -
    -        // Add joinPairs to joinConditions
    -        val newCondition = (condition ++ joinPairs.map {
    -          case (l, r) => EqualTo(l, r)
    -        }).reduceOption(And)
    -
    -        // columns not in joinPairs
    -        val lUniqueOutput = left.output.filterNot(att => 
leftKeys.contains(att))
    -        val rUniqueOutput = right.output.filterNot(att => 
rightKeys.contains(att))
    -
    -        // the output list looks like: join keys, columns from left, 
columns from right
    -        val projectList = joinType match {
    -          case LeftOuter =>
    -            leftKeys ++ lUniqueOutput ++ 
rUniqueOutput.map(_.withNullability(true))
    -          case RightOuter =>
    -            rightKeys ++ lUniqueOutput.map(_.withNullability(true)) ++ 
rUniqueOutput
    -          case FullOuter =>
    -            // in full outer join, joinCols should be non-null if there is.
    -            val joinedCols = joinPairs.map { case (l, r) => 
Alias(Coalesce(Seq(l, r)), l.name)() }
    -            joinedCols ++
    -              lUniqueOutput.map(_.withNullability(true)) ++
    -              rUniqueOutput.map(_.withNullability(true))
    -          case Inner =>
    -            rightKeys ++ lUniqueOutput ++ rUniqueOutput
    -          case _ =>
    -            sys.error("Unsupported natural join type " + joinType)
    -        }
    -        // use Project to trim unnecessary fields
    -        Project(projectList, Join(left, right, joinType, newCondition))
    +        commonNaturalJoinProcessing(left, right, joinType, joinNames, 
condition)
    +    }
    +  }
    +
    +  private def commonNaturalJoinProcessing(
    +     left: LogicalPlan,
    +     right: LogicalPlan,
    +     joinType: JoinType,
    +     joinNames: Seq[String],
    +     condition: Option[Expression]) = {
    +    val leftKeys = joinNames.map(keyName => left.output.find(_.name == 
keyName).get)
    +    val rightKeys = joinNames.map(keyName => right.output.find(_.name == 
keyName).get)
    +    val joinPairs = leftKeys.zip(rightKeys)
    +
    +    // Add joinPairs to joinConditions
    +    val newCondition = (condition ++ joinPairs.map {
    --- End diff --
    
    @hvanhovell Thank you for your review !! I have made the change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to