Repository: spark
Updated Branches:
  refs/heads/branch-1.3 419865475 -> a15a0a02c


[SPARK-5839][SQL]HiveMetastoreCatalog does not recognize table names and 
aliases of data source tables.

JIRA: https://issues.apache.org/jira/browse/SPARK-5839

Author: Yin Huai <yh...@databricks.com>

Closes #4626 from yhuai/SPARK-5839 and squashes the following commits:

f779d85 [Yin Huai] Use subqeury to wrap replaced ParquetRelation.
2695f13 [Yin Huai] Merge remote-tracking branch 'upstream/master' into 
SPARK-5839
f1ba6ca [Yin Huai] Address comment.
2c7fa08 [Yin Huai] Use Subqueries to wrap a data source table.

(cherry picked from commit f3ff1eb2985ff3e1567645b898f6b42e4b01f237)
Signed-off-by: Michael Armbrust <mich...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a15a0a02
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a15a0a02
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a15a0a02

Branch: refs/heads/branch-1.3
Commit: a15a0a02c654cd70bb84f6d610e1c178c9cb98a6
Parents: 4198654
Author: Yin Huai <yh...@databricks.com>
Authored: Mon Feb 16 15:54:01 2015 -0800
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Mon Feb 16 15:54:13 2015 -0800

----------------------------------------------------------------------
 .../spark/sql/parquet/ParquetQuerySuite.scala   |  5 +--
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 18 +++++++++--
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 34 ++++++++++++++++++++
 3 files changed, 53 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a15a0a02/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
index 9318c15..8b4d05e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/parquet/ParquetQuerySuite.scala
@@ -30,9 +30,10 @@ import org.apache.spark.sql.test.TestSQLContext._
 class ParquetQuerySuiteBase extends QueryTest with ParquetTest {
   val sqlContext = TestSQLContext
 
-  test("simple projection") {
+  test("simple select queries") {
     withParquetTable((0 until 10).map(i => (i, i.toString)), "t") {
-      checkAnswer(sql("SELECT _1 FROM t"), (0 until 10).map(Row.apply(_)))
+      checkAnswer(sql("SELECT _1 FROM t where t._1 > 5"), (6 until 
10).map(Row.apply(_)))
+      checkAnswer(sql("SELECT _1 FROM t as tmp where tmp._1 < 5"), (0 until 
5).map(Row.apply(_)))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a15a0a02/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index 72211fe..87bc9fe 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -160,7 +160,15 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
     }
 
     if (table.getProperty("spark.sql.sources.provider") != null) {
-      cachedDataSourceTables(QualifiedTableName(databaseName, 
tblName).toLowerCase)
+      val dataSourceTable =
+        cachedDataSourceTables(QualifiedTableName(databaseName, 
tblName).toLowerCase)
+      // Then, if alias is specified, wrap the table with a Subquery using the 
alias.
+      // Othersie, wrap the table with a Subquery using the table name.
+      val withAlias =
+        alias.map(a => Subquery(a, dataSourceTable)).getOrElse(
+          Subquery(tableIdent.last, dataSourceTable))
+
+      withAlias
     } else if (table.isView) {
       // if the unresolved relation is from hive view
       // parse the text into logic node.
@@ -433,7 +441,13 @@ private[hive] class HiveMetastoreCatalog(hive: 
HiveContext) extends Catalog with
         val attributedRewrites = 
AttributeMap(relation.output.zip(parquetRelation.output))
 
         lastPlan.transformUp {
-          case r: MetastoreRelation if r == relation => parquetRelation
+          case r: MetastoreRelation if r == relation => {
+            val withAlias =
+              r.alias.map(a => Subquery(a, parquetRelation)).getOrElse(
+                Subquery(r.tableName, parquetRelation))
+
+            withAlias
+          }
           case other => other.transformExpressions {
             case a: Attribute if a.resolved => attributedRewrites.getOrElse(a, 
a)
           }

http://git-wip-us.apache.org/repos/asf/spark/blob/a15a0a02/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 375aae5..0263e3b 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -401,6 +401,40 @@ class MetastoreDataSourcesSuite extends QueryTest with 
BeforeAndAfterEach {
     sql("DROP TABLE jsonTable").collect().foreach(println)
   }
 
+  test("SPARK-5839 HiveMetastoreCatalog does not recognize table aliases of 
data source tables.") {
+    val originalDefaultSource = conf.defaultDataSourceName
+
+    val rdd = sparkContext.parallelize((1 to 10).map(i => s"""{"a":$i, 
"b":"str${i}"}"""))
+    val df = jsonRDD(rdd)
+
+    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, "org.apache.spark.sql.json")
+    // Save the df as a managed table (by not specifiying the path).
+    df.saveAsTable("savedJsonTable")
+
+    checkAnswer(
+      sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
+      (1 to 4).map(i => Row(i, s"str${i}")))
+
+    checkAnswer(
+      sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"),
+      (6 to 10).map(i => Row(i, s"str${i}")))
+
+    invalidateTable("savedJsonTable")
+
+    checkAnswer(
+      sql("SELECT * FROM savedJsonTable where savedJsonTable.a < 5"),
+      (1 to 4).map(i => Row(i, s"str${i}")))
+
+    checkAnswer(
+      sql("SELECT * FROM savedJsonTable tmp where tmp.a > 5"),
+      (6 to 10).map(i => Row(i, s"str${i}")))
+
+    // Drop table will also delete the data.
+    sql("DROP TABLE savedJsonTable")
+
+    conf.setConf(SQLConf.DEFAULT_DATA_SOURCE_NAME, originalDefaultSource)
+  }
+
   test("save table") {
     val originalDefaultSource = conf.defaultDataSourceName
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to