Repository: spark
Updated Branches:
  refs/heads/branch-1.3 fd600cec0 -> 9ebefb1f1


[SPARK-6555] [SQL] Overrides equals() and hashCode() for MetastoreRelation

Also removes temporary workarounds made in #5183 and #5251.

<!-- Reviewable:start -->
[<img src="https://reviewable.io/review_button.png"; height=40 alt="Review on 
Reviewable"/>](https://reviewable.io/reviews/apache/spark/5289)
<!-- Reviewable:end -->

Author: Cheng Lian <l...@databricks.com>

Closes #5289 from liancheng/spark-6555 and squashes the following commits:

d0095ac [Cheng Lian] Removes unused imports
cfafeeb [Cheng Lian] Removes outdated comment
75a2746 [Cheng Lian] Overrides equals() and hashCode() for MetastoreRelation

(cherry picked from commit a7992ffaf1e8adc9d2c225a986fa3162e8e130eb)
Signed-off-by: Michael Armbrust <mich...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ebefb1f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ebefb1f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ebefb1f

Branch: refs/heads/branch-1.3
Commit: 9ebefb1f1c531205b3e13e4b17927f64fb87aadb
Parents: fd600ce
Author: Cheng Lian <l...@databricks.com>
Authored: Tue Mar 31 11:18:25 2015 -0700
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Tue Mar 31 11:18:53 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 42 +++++++++++---------
 .../spark/sql/hive/execution/HivePlanTest.scala |  6 ++-
 2 files changed, 28 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9ebefb1f/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 6a01a23..f20f0ad 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.hive
 import java.io.IOException
 import java.util.{List => JList}
 
+import com.google.common.base.Objects
 import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
 import org.apache.hadoop.hive.metastore.api.{FieldSchema, Partition => 
TPartition, Table => TTable}
 import org.apache.hadoop.hive.metastore.{TableType, Warehouse}
@@ -465,7 +466,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) 
extends Catalog with
               
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
           val parquetRelation = convertToParquetRelation(relation)
           val attributedRewrites = relation.output.zip(parquetRelation.output)
-          (relation -> relation.output, parquetRelation, attributedRewrites)
+          (relation, parquetRelation, attributedRewrites)
 
         // Write path
         case InsertIntoHiveTable(relation: MetastoreRelation, _, _, _)
@@ -476,7 +477,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) 
extends Catalog with
             
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
           val parquetRelation = convertToParquetRelation(relation)
           val attributedRewrites = relation.output.zip(parquetRelation.output)
-          (relation -> relation.output, parquetRelation, attributedRewrites)
+          (relation, parquetRelation, attributedRewrites)
 
         // Read path
         case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
@@ -485,33 +486,28 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
               
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
           val parquetRelation = convertToParquetRelation(relation)
           val attributedRewrites = relation.output.zip(parquetRelation.output)
-          (relation -> relation.output, parquetRelation, attributedRewrites)
+          (relation, parquetRelation, attributedRewrites)
       }
 
-      // Quick fix for SPARK-6450: Notice that we're using both the 
MetastoreRelation instances and
-      // their output attributes as the key of the map. This is because 
MetastoreRelation.equals
-      // doesn't take output attributes into account, thus multiple 
MetastoreRelation instances
-      // pointing to the same table get collapsed into a single entry in the 
map. A proper fix for
-      // this should be overriding equals & hashCode in MetastoreRelation.
       val relationMap = toBeReplaced.map(r => (r._1, r._2)).toMap
       val attributedRewrites = AttributeMap(toBeReplaced.map(_._3).fold(Nil)(_ 
++: _))
 
       // Replaces all `MetastoreRelation`s with corresponding 
`ParquetRelation2`s, and fixes
       // attribute IDs referenced in other nodes.
       plan.transformUp {
-        case r: MetastoreRelation if relationMap.contains(r -> r.output) =>
-          val parquetRelation = relationMap(r -> r.output)
+        case r: MetastoreRelation if relationMap.contains(r) =>
+          val parquetRelation = relationMap(r)
           val alias = r.alias.getOrElse(r.tableName)
           Subquery(alias, parquetRelation)
 
         case InsertIntoTable(r: MetastoreRelation, partition, child, overwrite)
-          if relationMap.contains(r -> r.output) =>
-          val parquetRelation = relationMap(r -> r.output)
+          if relationMap.contains(r) =>
+          val parquetRelation = relationMap(r)
           InsertIntoTable(parquetRelation, partition, child, overwrite)
 
         case InsertIntoHiveTable(r: MetastoreRelation, partition, child, 
overwrite)
-          if relationMap.contains(r -> r.output) =>
-          val parquetRelation = relationMap(r -> r.output)
+          if relationMap.contains(r) =>
+          val parquetRelation = relationMap(r)
           InsertIntoTable(parquetRelation, partition, child, overwrite)
 
         case other => other.transformExpressions {
@@ -707,6 +703,19 @@ private[hive] case class MetastoreRelation
 
   self: Product =>
 
+  override def equals(other: scala.Any): Boolean = other match {
+    case relation: MetastoreRelation =>
+      databaseName == relation.databaseName &&
+        tableName == relation.tableName &&
+        alias == relation.alias &&
+        output == relation.output
+    case _ => false
+  }
+
+  override def hashCode(): Int = {
+    Objects.hashCode(databaseName, tableName, alias, output)
+  }
+
   // TODO: Can we use org.apache.hadoop.hive.ql.metadata.Table as the type of 
table and
   // use org.apache.hadoop.hive.ql.metadata.Partition as the type of elements 
of partitions.
   // Right now, using org.apache.hadoop.hive.ql.metadata.Table and
@@ -786,10 +795,7 @@ private[hive] case class MetastoreRelation
   val columnOrdinals = AttributeMap(attributes.zipWithIndex)
 
   override def newInstance() = {
-    val newCopy = MetastoreRelation(databaseName, tableName, alias)(table, 
partitions)(sqlContext)
-    // The project here is an ugly hack to work around the fact that 
MetastoreRelation's
-    // equals method is broken.  Please remove this when SPARK-6555 is fixed.
-    Project(newCopy.output, newCopy)
+    MetastoreRelation(databaseName, tableName, alias)(table, 
partitions)(sqlContext)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9ebefb1f/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
index c939e6e..bdb53dd 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HivePlanTest.scala
@@ -22,10 +22,12 @@ import org.apache.spark.sql.hive.test.TestHive
 
 class HivePlanTest extends QueryTest {
   import TestHive._
+  import TestHive.implicits._
 
   test("udf constant folding") {
-    val optimized = sql("SELECT cos(null) FROM 
src").queryExecution.optimizedPlan
-    val correctAnswer = sql("SELECT cast(null as double) FROM 
src").queryExecution.optimizedPlan
+    Seq.empty[Tuple1[Int]].toDF("a").registerTempTable("t")
+    val optimized = sql("SELECT cos(null) FROM t").queryExecution.optimizedPlan
+    val correctAnswer = sql("SELECT cast(null as double) FROM 
t").queryExecution.optimizedPlan
 
     comparePlans(optimized, correctAnswer)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to