cloud-fan commented on a change in pull request #27263: 
[SPARK-30433][SQL][FOLLOW-UP] Optimize collect conflict plans
URL: https://github.com/apache/spark/pull/27263#discussion_r368031213
 
 

 ##########
 File path: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ##########
 @@ -1067,34 +1067,37 @@ class Analyzer(
       logDebug(s"Conflicting attributes ${conflictingAttributes.mkString(",")} 
" +
         s"between $left and $right")
 
-      val conflictPlans = right.collect {
+      def collectConflictPlans(plan: LogicalPlan): Seq[(LogicalPlan, 
LogicalPlan)] = plan match {
         // Handle base relations that might appear more than once.
         case oldVersion: MultiInstanceRelation
             if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty 
=>
           val newVersion = oldVersion.newInstance()
-          (oldVersion, newVersion)
+          Seq((oldVersion, newVersion))
 
         case oldVersion: SerializeFromObject
             if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty 
=>
-          (oldVersion, oldVersion.copy(serializer = 
oldVersion.serializer.map(_.newInstance())))
+          Seq((oldVersion, oldVersion.copy(
+            serializer = oldVersion.serializer.map(_.newInstance()))))
 
         // Handle projects that create conflicting aliases.
         case oldVersion @ Project(projectList, _)
             if 
findAliases(projectList).intersect(conflictingAttributes).nonEmpty =>
-          (oldVersion, oldVersion.copy(projectList = newAliases(projectList)))
+          Seq((oldVersion, oldVersion.copy(projectList = 
newAliases(projectList))))
 
         case oldVersion @ Aggregate(_, aggregateExpressions, _)
             if 
findAliases(aggregateExpressions).intersect(conflictingAttributes).nonEmpty =>
-          (oldVersion, oldVersion.copy(aggregateExpressions = 
newAliases(aggregateExpressions)))
+          Seq((oldVersion, oldVersion.copy(
+            aggregateExpressions = newAliases(aggregateExpressions))))
 
         case oldVersion @ FlatMapGroupsInPandas(_, _, output, _)
             if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty 
=>
-          (oldVersion, oldVersion.copy(output = output.map(_.newInstance())))
+          Seq((oldVersion, oldVersion.copy(output = 
output.map(_.newInstance()))))
 
         case oldVersion: Generate
             if 
oldVersion.producedAttributes.intersect(conflictingAttributes).nonEmpty =>
           val newOutput = oldVersion.generatorOutput.map(_.newInstance())
-          (oldVersion, oldVersion.copy(generatorOutput = newOutput))
+          Seq((oldVersion, oldVersion.copy(generatorOutput = newOutput))) ++
+            oldVersion.children.flatMap(collectConflictPlans)
 
 Review comment:
   I think it's OK if we don't recursive into the children. This is a corner 
case and we can handle it in the next batch.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to