Github user marmbrus commented on a diff in the pull request:

    https://github.com/apache/spark/pull/10630#discussion_r49227379
  
    --- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
    @@ -388,57 +441,14 @@ class Analyzer(
                 .map(_.asInstanceOf[NamedExpression])
             a.copy(aggregateExpressions = expanded)
     
    -      // Special handling for cases when self-join introduce duplicate 
expression ids.
    +      // Special handling for cases when self-join introduces duplicate 
expression IDs.
           case j @ Join(left, right, _, _) if !j.selfJoinResolved =>
    -        val conflictingAttributes = 
left.outputSet.intersect(right.outputSet)
    -        logDebug(s"Conflicting attributes 
${conflictingAttributes.mkString(",")} in $j")
    -
    -        right.collect {
    -          // Handle base relations that might appear more than once.
    -          case oldVersion: MultiInstanceRelation
    -              if 
oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty =>
    -            val newVersion = oldVersion.newInstance()
    -            (oldVersion, newVersion)
    -
    -          // Handle projects that create conflicting aliases.
    -          case oldVersion @ Project(projectList, _)
    -              if 
findAliases(projectList).intersect(conflictingAttributes).nonEmpty =>
    -            (oldVersion, oldVersion.copy(projectList = 
newAliases(projectList)))
    -
    -          case oldVersion @ Aggregate(_, aggregateExpressions, _)
    -              if 
findAliases(aggregateExpressions).intersect(conflictingAttributes).nonEmpty =>
    -            (oldVersion, oldVersion.copy(aggregateExpressions = 
newAliases(aggregateExpressions)))
    -
    -          case oldVersion: Generate
    -              if 
oldVersion.generatedSet.intersect(conflictingAttributes).nonEmpty =>
    -            val newOutput = oldVersion.generatorOutput.map(_.newInstance())
    -            (oldVersion, oldVersion.copy(generatorOutput = newOutput))
    -
    -          case oldVersion @ Window(_, windowExpressions, _, _, child)
    -              if 
AttributeSet(windowExpressions.map(_.toAttribute)).intersect(conflictingAttributes)
    -                .nonEmpty =>
    -            (oldVersion, oldVersion.copy(windowExpressions = 
newAliases(windowExpressions)))
    -        }
    -        // Only handle first case, others will be fixed on the next pass.
    -        .headOption match {
    -          case None =>
    -            /*
    -             * No result implies that there is a logical plan node that 
produces new references
    -             * that this rule cannot handle. When that is the case, there 
must be another rule
    -             * that resolves these conflicts. Otherwise, the analysis will 
fail.
    -             */
    -            j
    -          case Some((oldRelation, newRelation)) =>
    -            val attributeRewrites = 
AttributeMap(oldRelation.output.zip(newRelation.output))
    -            val newRight = right transformUp {
    -              case r if r == oldRelation => newRelation
    -            } transformUp {
    -              case other => other transformExpressions {
    -                case a: Attribute => attributeRewrites.get(a).getOrElse(a)
    -              }
    -            }
    -            j.copy(right = newRight)
    -        }
    +        j.copy(right = buildRightChild4Deduplication(left, right))
    +
    +      // Special handling for cases when self-intersect introduces 
duplicate expression IDs.
    +      // In Optimizer, Intersect will be converted to semi join.
    +      case i @ Intersect(left, right) if !i.duplicateResolved =>
    +        i.copy(right = buildRightChild4Deduplication(left, right))
    --- End diff --
    
    Do we need to do this for all non-leaf/unary nodes?  I think the only 
reason this wasn't a problem in the past was there were no optimizer rules for 
those nodes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to