This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 4e80f8c  Revert "[SPARK-34178][SQL] Copy tags for the new node created 
by MultiInstanceRelation.newInstance"
4e80f8c is described below

commit 4e80f8cf49b18dcd4132f74253b01521a0cf65f3
Author: HyukjinKwon <gurwls...@apache.org>
AuthorDate: Thu Jan 21 09:55:52 2021 +0900

    Revert "[SPARK-34178][SQL] Copy tags for the new node created by 
MultiInstanceRelation.newInstance"
    
    This reverts commit 89443ab1118b0e07acd639609094961f783b01e1.
---
 .../org/apache/spark/sql/catalyst/analysis/Analyzer.scala    |  6 +-----
 .../scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala |  2 +-
 .../test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala | 12 +-----------
 3 files changed, 3 insertions(+), 17 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 2dbabfc..fbe6041 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1012,10 +1012,7 @@ class Analyzer(
           }
           val key = catalog.name +: ident.namespace :+ ident.name
           AnalysisContext.get.relationCache.get(key).map(_.transform {
-            case multi: MultiInstanceRelation =>
-              val newRelation = multi.newInstance()
-              newRelation.copyTagsFrom(multi)
-              newRelation
+            case multi: MultiInstanceRelation => multi.newInstance()
           }).orElse {
             loaded.foreach(AnalysisContext.get.relationCache.update(key, _))
             loaded
@@ -1167,7 +1164,6 @@ class Analyzer(
         case oldVersion: MultiInstanceRelation
             if oldVersion.outputSet.intersect(conflictingAttributes).nonEmpty 
=>
           val newVersion = oldVersion.newInstance()
-          newVersion.copyTagsFrom(oldVersion)
           Seq((oldVersion, newVersion))
 
         case oldVersion: SerializeFromObject
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 68bbd1b..4dc3627 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -91,7 +91,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
    */
   private val tags: mutable.Map[TreeNodeTag[_], Any] = mutable.Map.empty
 
-  def copyTagsFrom(other: BaseType): Unit = {
+  protected def copyTagsFrom(other: BaseType): Unit = {
     // SPARK-32753: it only makes sense to copy tags to a new node
     // but it's too expensive to detect other cases likes node removal
     // so we make a compromise here to copy tags to node with no tags
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
index 0cf81b4..a49f95f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameJoinSuite.scala
@@ -25,7 +25,6 @@ import 
org.apache.spark.sql.catalyst.plans.logical.{BROADCAST, Filter, HintInfo,
 import org.apache.spark.sql.connector.catalog.CatalogManager
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
-import 
org.apache.spark.sql.execution.analysis.DetectAmbiguousSelfJoin.LogicalPlanWithDatasetId
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
 import org.apache.spark.sql.execution.joins.BroadcastHashJoinExec
@@ -265,16 +264,7 @@ class DataFrameJoinSuite extends QueryTest
     withSQLConf(SQLConf.CROSS_JOINS_ENABLED.key -> "false") {
       val df = spark.range(2)
       // this throws an exception before the fix
-      val plan = df.join(df, df("id") <=> 
df("id")).queryExecution.optimizedPlan
-
-      plan match {
-        // SPARK-34178: we can't match the plan before the fix due to
-        // the right side plan doesn't contains dataset id.
-        case Join(
-          LogicalPlanWithDatasetId(_, leftId),
-          LogicalPlanWithDatasetId(_, rightId), _, _, _) =>
-          assert(leftId === rightId)
-      }
+      df.join(df, df("id") <=> df("id")).queryExecution.optimizedPlan
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to