Repository: spark
Updated Branches:
  refs/heads/master 0c9231858 -> 2e6256002


[SPARK-19265][SQL] make table relation cache general and does not depend on hive

## What changes were proposed in this pull request?

We have a table relation plan cache in `HiveMetastoreCatalog`, which caches a 
lot of things: file status, resolved data source, inferred schema, etc.

However, it doesn't make sense to limit this cache with hive support, we should 
move it to SQL core module so that users can use this cache without hive 
support.

It can also reduce the size of `HiveMetastoreCatalog`, so that it's easier to 
remove it eventually.

main changes:
1. move the table relation cache to `SessionCatalog`
2. `SessionCatalog.lookupRelation` will return `SimpleCatalogRelation` and the 
analyzer will convert it to `LogicalRelation` or `MetastoreRelation` later, 
then `HiveSessionCatalog` doesn't need to override `lookupRelation` anymore
3. `FindDataSourceTable` will read/write the table relation cache.

## How was this patch tested?

existing tests.

Author: Wenchen Fan <wenc...@databricks.com>

Closes #16621 from cloud-fan/plan-cache.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2e625600
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2e625600
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2e625600

Branch: refs/heads/master
Commit: 2e62560024999c215cf2373fc9a8070bb2ad5c58
Parents: 0c92318
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Thu Jan 19 00:07:48 2017 -0800
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Thu Jan 19 00:07:48 2017 -0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   | 33 +++++--
 .../apache/spark/sql/catalyst/identifiers.scala |  4 +-
 .../catalyst/catalog/SessionCatalogSuite.scala  |  6 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |  4 +-
 .../command/AnalyzeColumnCommand.scala          |  3 +-
 .../execution/command/AnalyzeTableCommand.scala |  3 +-
 .../spark/sql/execution/command/tables.scala    |  2 +-
 .../datasources/DataSourceStrategy.scala        | 60 ++++++------
 .../apache/spark/sql/internal/CatalogImpl.scala |  2 +-
 .../org/apache/spark/sql/DataFrameSuite.scala   | 11 ---
 .../spark/sql/execution/command/DDLSuite.scala  |  3 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   | 98 ++------------------
 .../spark/sql/hive/HiveSessionCatalog.scala     | 37 +-------
 .../spark/sql/hive/HiveSessionState.scala       |  2 +
 .../apache/spark/sql/hive/HiveStrategies.scala  | 17 +++-
 .../CreateHiveTableAsSelectCommand.scala        |  8 +-
 .../apache/spark/sql/hive/test/TestHive.scala   |  2 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    | 22 +++++
 .../apache/spark/sql/hive/StatisticsSuite.scala |  8 +-
 .../sql/hive/execution/SQLQuerySuite.scala      | 17 ++--
 20 files changed, 144 insertions(+), 198 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 8008fcd..e9543f7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -21,13 +21,13 @@ import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.mutable
 
+import com.google.common.cache.{Cache, CacheBuilder}
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.{CatalystConf, SimpleCatalystConf}
-import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst._
 import org.apache.spark.sql.catalyst.analysis._
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.expressions.{Expression, ExpressionInfo}
@@ -118,6 +118,14 @@ class SessionCatalog(
   }
 
   /**
+   * A cache of qualified table name to table relation plan.
+   */
+  val tableRelationCache: Cache[QualifiedTableName, LogicalPlan] = {
+    // TODO: create a config instead of hardcode 1000 here.
+    CacheBuilder.newBuilder().maximumSize(1000).build[QualifiedTableName, 
LogicalPlan]()
+  }
+
+  /**
    * This method is used to make the given path qualified before we
    * store this path in the underlying external catalog. So, when a path
    * does not contain a scheme, this path will not be changed after the default
@@ -573,7 +581,7 @@ class SessionCatalog(
       val relationAlias = alias.getOrElse(table)
       if (db == globalTempViewManager.database) {
         globalTempViewManager.get(table).map { viewDef =>
-          SubqueryAlias(relationAlias, viewDef, Some(name))
+          SubqueryAlias(relationAlias, viewDef, None)
         }.getOrElse(throw new NoSuchTableException(db, table))
       } else if (name.database.isDefined || !tempTables.contains(table)) {
         val metadata = externalCatalog.getTable(db, table)
@@ -586,12 +594,12 @@ class SessionCatalog(
             desc = metadata,
             output = metadata.schema.toAttributes,
             child = parser.parsePlan(viewText))
-          SubqueryAlias(relationAlias, child, Option(name))
+          SubqueryAlias(relationAlias, child, Some(name.copy(table = table, 
database = Some(db))))
         } else {
           SubqueryAlias(relationAlias, SimpleCatalogRelation(metadata), None)
         }
       } else {
-        SubqueryAlias(relationAlias, tempTables(table), Option(name))
+        SubqueryAlias(relationAlias, tempTables(table), None)
       }
     }
   }
@@ -651,14 +659,21 @@ class SessionCatalog(
    * Refresh the cache entry for a metastore table, if any.
    */
   def refreshTable(name: TableIdentifier): Unit = synchronized {
+    val dbName = formatDatabaseName(name.database.getOrElse(currentDb))
+    val tableName = formatTableName(name.table)
+
     // Go through temporary tables and invalidate them.
-    // If the database is defined, this is definitely not a temp table.
+    // If the database is defined, this may be a global temporary view.
     // If the database is not defined, there is a good chance this is a temp 
table.
     if (name.database.isEmpty) {
-      tempTables.get(formatTableName(name.table)).foreach(_.refresh())
-    } else if (formatDatabaseName(name.database.get) == 
globalTempViewManager.database) {
-      
globalTempViewManager.get(formatTableName(name.table)).foreach(_.refresh())
+      tempTables.get(tableName).foreach(_.refresh())
+    } else if (dbName == globalTempViewManager.database) {
+      globalTempViewManager.get(tableName).foreach(_.refresh())
     }
+
+    // Also invalidate the table relation cache.
+    val qualifiedTableName = QualifiedTableName(dbName, tableName)
+    tableRelationCache.invalidate(qualifiedTableName)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
index 834897b..26697e9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
@@ -60,9 +60,11 @@ case class TableIdentifier(table: String, database: 
Option[String])
   override val identifier: String = table
 
   def this(table: String) = this(table, None)
-
 }
 
+/** A fully qualified identifier for a table (i.e., database.tableName) */
+case class QualifiedTableName(database: String, name: String)
+
 object TableIdentifier {
   def apply(tableName: String): TableIdentifier = new 
TableIdentifier(tableName)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 7a7de25..f935de6 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -436,7 +436,7 @@ class SessionCatalogSuite extends PlanTest {
       == SubqueryAlias("tbl1", SimpleCatalogRelation(metastoreTable1), None))
     // Otherwise, we'll first look up a temporary table with the same name
     assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
-      == SubqueryAlias("tbl1", tempTable1, Some(TableIdentifier("tbl1"))))
+      == SubqueryAlias("tbl1", tempTable1, None))
     // Then, if that does not exist, look up the relation in the current 
database
     sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = 
false, purge = false)
     assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1"))
@@ -462,7 +462,7 @@ class SessionCatalogSuite extends PlanTest {
     val tmpView = Range(1, 10, 2, 10)
     catalog.createTempView("vw1", tmpView, overrideIfExists = false)
     val plan = catalog.lookupRelation(TableIdentifier("vw1"), Option("range"))
-    assert(plan == SubqueryAlias("range", tmpView, 
Option(TableIdentifier("vw1"))))
+    assert(plan == SubqueryAlias("range", tmpView, None))
   }
 
   test("look up view relation") {
@@ -479,7 +479,7 @@ class SessionCatalogSuite extends PlanTest {
     // Look up a view using current database of the session catalog.
     sessionCatalog.setCurrentDatabase("db3")
     comparePlans(sessionCatalog.lookupRelation(TableIdentifier("view1")),
-      SubqueryAlias("view1", view, Some(TableIdentifier("view1"))))
+      SubqueryAlias("view1", view, Some(TableIdentifier("view1", 
Some("db3")))))
   }
 
   test("table exists") {

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 7fc03bd..ff1f017 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -386,7 +386,9 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
           case relation: CatalogRelation if 
DDLUtils.isHiveTable(relation.catalogTable) =>
             relation.catalogTable.identifier
         }
-        EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) 
match {
+
+        val tableRelation = 
df.sparkSession.table(tableIdentWithDB).queryExecution.analyzed
+        EliminateSubqueryAliases(tableRelation) match {
           // check if the table is a data source table (the relation is a 
BaseRelation).
           case LogicalRelation(dest: BaseRelation, _, _) if 
srcRelations.contains(dest) =>
             throw new AnalysisException(

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 1340c9b..d024a36 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -40,7 +40,8 @@ case class AnalyzeColumnCommand(
     val sessionState = sparkSession.sessionState
     val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
     val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
-    val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+    val relation =
+      
EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)
 
     // Compute total size
     val (catalogTable: CatalogTable, sizeInBytes: Long) = relation match {

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 4a994e3..30b6cc7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -41,7 +41,8 @@ case class AnalyzeTableCommand(
     val sessionState = sparkSession.sessionState
     val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
     val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
-    val relation = 
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB))
+    val relation =
+      
EliminateSubqueryAliases(sparkSession.table(tableIdentWithDB).queryExecution.analyzed)
 
     relation match {
       case relation: CatalogRelation =>

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 2468948..1b596c9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -450,7 +450,7 @@ case class DescribeTableCommand(
       if (metadata.schema.isEmpty) {
         // In older version(prior to 2.1) of Spark, the table schema can be 
empty and should be
         // inferred at runtime. We should still support it.
-        describeSchema(catalog.lookupRelation(metadata.identifier).schema, 
result)
+        describeSchema(sparkSession.table(metadata.identifier).schema, result)
       } else {
         describeSchema(metadata.schema, result)
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
index 3d3db06..21b07ee 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceStrategy.scala
@@ -17,18 +17,17 @@
 
 package org.apache.spark.sql.execution.datasources
 
-import scala.collection.mutable.ArrayBuffer
+import java.util.concurrent.Callable
 
-import org.apache.hadoop.fs.Path
+import scala.collection.mutable.ArrayBuffer
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, 
InternalRow}
+import org.apache.spark.sql.catalyst.{CatalystConf, CatalystTypeConverters, 
InternalRow, QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.CatalystTypeConverters.convertToScala
 import org.apache.spark.sql.catalyst.analysis._
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
CatalogTablePartition, SimpleCatalogRelation}
-import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
SimpleCatalogRelation}
 import org.apache.spark.sql.catalyst.expressions
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning.PhysicalOperation
@@ -37,6 +36,7 @@ import 
org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, UnknownPa
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.execution.{RowDataSourceScanExec, SparkPlan}
 import org.apache.spark.sql.execution.command._
+import org.apache.spark.sql.internal.StaticSQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
@@ -215,37 +215,43 @@ case class DataSourceAnalysis(conf: CatalystConf) extends 
Rule[LogicalPlan] {
 
 
 /**
- * Replaces [[SimpleCatalogRelation]] with data source table if its table 
property contains data
- * source information.
+ * Replaces [[SimpleCatalogRelation]] with data source table if its table 
provider is not hive.
  */
 class FindDataSourceTable(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
-  private def readDataSourceTable(
-      sparkSession: SparkSession,
-      simpleCatalogRelation: SimpleCatalogRelation): LogicalPlan = {
-    val table = simpleCatalogRelation.catalogTable
-    val pathOption = table.storage.locationUri.map("path" -> _)
-    val dataSource =
-      DataSource(
-        sparkSession,
-        userSpecifiedSchema = Some(table.schema),
-        partitionColumns = table.partitionColumnNames,
-        bucketSpec = table.bucketSpec,
-        className = table.provider.get,
-        options = table.storage.properties ++ pathOption)
-
-    LogicalRelation(
-      dataSource.resolveRelation(),
-      expectedOutputAttributes = Some(simpleCatalogRelation.output),
-      catalogTable = Some(table))
+  private def readDataSourceTable(table: CatalogTable): LogicalPlan = {
+    val qualifiedTableName = QualifiedTableName(table.database, 
table.identifier.table)
+    val cache = sparkSession.sessionState.catalog.tableRelationCache
+    val withHiveSupport =
+      sparkSession.sparkContext.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) 
== "hive"
+
+    cache.get(qualifiedTableName, new Callable[LogicalPlan]() {
+      override def call(): LogicalPlan = {
+        val pathOption = table.storage.locationUri.map("path" -> _)
+        val dataSource =
+          DataSource(
+            sparkSession,
+            // In older version(prior to 2.1) of Spark, the table schema can 
be empty and should be
+            // inferred at runtime. We should still support it.
+            userSpecifiedSchema = if (table.schema.isEmpty) None else 
Some(table.schema),
+            partitionColumns = table.partitionColumnNames,
+            bucketSpec = table.bucketSpec,
+            className = table.provider.get,
+            options = table.storage.properties ++ pathOption,
+            // TODO: improve `InMemoryCatalog` and remove this limitation.
+            catalogTable = if (withHiveSupport) Some(table) else None)
+
+        LogicalRelation(dataSource.resolveRelation(), catalogTable = 
Some(table))
+      }
+    })
   }
 
   override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
     case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
         if DDLUtils.isDatasourceTable(s.metadata) =>
-      i.copy(table = readDataSourceTable(sparkSession, s))
+      i.copy(table = readDataSourceTable(s.metadata))
 
     case s: SimpleCatalogRelation if DDLUtils.isDatasourceTable(s.metadata) =>
-      readDataSourceTable(sparkSession, s)
+      readDataSourceTable(s.metadata)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 9136a83..3d9f418 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -440,7 +440,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
 
     // If this table is cached as an InMemoryRelation, drop the original
     // cached version and make the new version cached lazily.
-    val logicalPlan = 
sparkSession.sessionState.catalog.lookupRelation(tableIdent)
+    val logicalPlan = sparkSession.table(tableIdent).queryExecution.analyzed
     // Use lookupCachedData directly since RefreshTable also takes 
databaseName.
     val isCached = 
sparkSession.sharedState.cacheManager.lookupCachedData(logicalPlan).nonEmpty
     if (isCached) {

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
index f4df80f..621a46a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala
@@ -1626,17 +1626,6 @@ class DataFrameSuite extends QueryTest with 
SharedSQLContext {
     assert(d.size == d.distinct.size)
   }
 
-  test("SPARK-17625: data source table in InMemoryCatalog should guarantee 
output consistency") {
-    val tableName = "tbl"
-    withTable(tableName) {
-      spark.range(10).select('id as 'i, 'id as 'j).write.saveAsTable(tableName)
-      val relation = 
spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName))
-      val expr = relation.resolve("i")
-      val qe = spark.sessionState.executePlan(Project(Seq(expr), relation))
-      qe.assertAnalyzed()
-    }
-  }
-
   private def verifyNullabilityInFilterExec(
       df: DataFrame,
       expr: String,

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index 97990a6..b4c9e27 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -1790,7 +1790,7 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
   }
 
   test("SET LOCATION for managed table") {
-    withTable("src") {
+    withTable("tbl") {
       withTempDir { dir =>
         sql("CREATE TABLE tbl(i INT) USING parquet")
         sql("INSERT INTO tbl SELECT 1")
@@ -1799,6 +1799,7 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
           .getTableMetadata(TableIdentifier("tbl")).storage.locationUri.get
 
         sql(s"ALTER TABLE tbl SET LOCATION '${dir.getCanonicalPath}'")
+        spark.catalog.refreshTable("tbl")
         // SET LOCATION won't move data from previous table path to new table 
path.
         assert(spark.table("tbl").count() == 0)
         // the previous table path should be still there.

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index e4b1f6a..faa76b7 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -17,17 +17,15 @@
 
 package org.apache.spark.sql.hive
 
-import com.google.common.cache.{CacheBuilder, CacheLoader, LoadingCache}
 import com.google.common.util.concurrent.Striped
 import org.apache.hadoop.fs.Path
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources._
 import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, 
ParquetOptions}
 import org.apache.spark.sql.hive.orc.OrcFileFormat
@@ -41,9 +39,7 @@ import org.apache.spark.sql.types._
  */
 private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends 
Logging {
   private val sessionState = 
sparkSession.sessionState.asInstanceOf[HiveSessionState]
-
-  /** A fully qualified identifier for a table (i.e., database.tableName) */
-  case class QualifiedTableName(database: String, name: String)
+  private lazy val tableRelationCache = 
sparkSession.sessionState.catalog.tableRelationCache
 
   private def getCurrentDatabase: String = 
sessionState.catalog.getCurrentDatabase
 
@@ -65,45 +61,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
     }
   }
 
-  /** A cache of Spark SQL data source tables that have been accessed. */
-  protected[hive] val cachedDataSourceTables: LoadingCache[QualifiedTableName, 
LogicalPlan] = {
-    val cacheLoader = new CacheLoader[QualifiedTableName, LogicalPlan]() {
-      override def load(in: QualifiedTableName): LogicalPlan = {
-        logDebug(s"Creating new cached data source for $in")
-        val table = 
sparkSession.sharedState.externalCatalog.getTable(in.database, in.name)
-
-        val pathOption = table.storage.locationUri.map("path" -> _)
-        val dataSource =
-          DataSource(
-            sparkSession,
-            // In older version(prior to 2.1) of Spark, the table schema can 
be empty and should be
-            // inferred at runtime. We should still support it.
-            userSpecifiedSchema = if (table.schema.isEmpty) None else 
Some(table.schema),
-            partitionColumns = table.partitionColumnNames,
-            bucketSpec = table.bucketSpec,
-            className = table.provider.get,
-            options = table.storage.properties ++ pathOption,
-            catalogTable = Some(table))
-
-        LogicalRelation(dataSource.resolveRelation(), catalogTable = 
Some(table))
-      }
-    }
-
-    CacheBuilder.newBuilder().maximumSize(1000).build(cacheLoader)
-  }
-
-  def refreshTable(tableIdent: TableIdentifier): Unit = {
-    // refreshTable does not eagerly reload the cache. It just invalidate the 
cache.
-    // Next time when we use the table, it will be populated in the cache.
-    // Since we also cache ParquetRelations converted from Hive Parquet tables 
and
-    // adding converted ParquetRelations into the cache is not defined in the 
load function
-    // of the cache (instead, we add the cache entry in 
convertToParquetRelation),
-    // it is better at here to invalidate the cache to avoid confusing waring 
logs from the
-    // cache loader (e.g. cannot find data source provider, which is only 
defined for
-    // data source table.).
-    cachedDataSourceTables.invalidate(getQualifiedTableName(tableIdent))
-  }
-
   def hiveDefaultTableFilePath(tableIdent: TableIdentifier): String = {
     // Code based on: hiveWarehouse.getTablePath(currentDatabase, tableName)
     val QualifiedTableName(dbName, tblName) = getQualifiedTableName(tableIdent)
@@ -111,45 +68,6 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
     new Path(new Path(dbLocation), tblName).toString
   }
 
-  /**
-   * Returns a [[LogicalPlan]] that represents the given table or view from 
Hive metastore.
-   *
-   * @param tableIdent The name of the table/view that we look up.
-   * @param alias The alias name of the table/view that we look up.
-   * @return a [[LogicalPlan]] that represents the given table or view from 
Hive metastore.
-   */
-  def lookupRelation(
-      tableIdent: TableIdentifier,
-      alias: Option[String]): LogicalPlan = {
-    val qualifiedTableName = getQualifiedTableName(tableIdent)
-    val table = sparkSession.sharedState.externalCatalog.getTable(
-      qualifiedTableName.database, qualifiedTableName.name)
-
-    if (DDLUtils.isDatasourceTable(table)) {
-      val dataSourceTable = cachedDataSourceTables(qualifiedTableName)
-      val qualifiedTable = SubqueryAlias(qualifiedTableName.name, 
dataSourceTable, None)
-      // Then, if alias is specified, wrap the table with a Subquery using the 
alias.
-      // Otherwise, wrap the table with a Subquery using the table name.
-      alias.map(a => SubqueryAlias(a, qualifiedTable, 
None)).getOrElse(qualifiedTable)
-    } else if (table.tableType == CatalogTableType.VIEW) {
-      val tableIdentifier = table.identifier
-      val viewText = table.viewText.getOrElse(sys.error("Invalid view without 
text."))
-      // The relation is a view, so we wrap the relation by:
-      // 1. Add a [[View]] operator over the relation to keep track of the 
view desc;
-      // 2. Wrap the logical plan in a [[SubqueryAlias]] which tracks the name 
of the view.
-      val child = View(
-        desc = table,
-        output = table.schema.toAttributes,
-        child = sparkSession.sessionState.sqlParser.parsePlan(viewText))
-      SubqueryAlias(alias.getOrElse(tableIdentifier.table), child, 
Option(tableIdentifier))
-    } else {
-      val qualifiedTable =
-        MetastoreRelation(
-          qualifiedTableName.database, qualifiedTableName.name)(table, 
sparkSession)
-      alias.map(a => SubqueryAlias(a, qualifiedTable, 
None)).getOrElse(qualifiedTable)
-    }
-  }
-
   private def getCached(
       tableIdentifier: QualifiedTableName,
       pathsInMetastore: Seq[Path],
@@ -159,7 +77,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
       expectedBucketSpec: Option[BucketSpec],
       partitionSchema: Option[StructType]): Option[LogicalRelation] = {
 
-    cachedDataSourceTables.getIfPresent(tableIdentifier) match {
+    tableRelationCache.getIfPresent(tableIdentifier) match {
       case null => None // Cache miss
       case logical @ LogicalRelation(relation: HadoopFsRelation, _, _) =>
         val cachedRelationFileFormatClass = relation.fileFormat.getClass
@@ -178,7 +96,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
               Some(logical)
             } else {
               // If the cached relation is not updated, we invalidate it right 
away.
-              cachedDataSourceTables.invalidate(tableIdentifier)
+              tableRelationCache.invalidate(tableIdentifier)
               None
             }
           case _ =>
@@ -187,7 +105,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
                 s"should be stored as $expectedFileFormat. However, we are 
getting " +
                 s"a ${relation.fileFormat} from the metastore cache. This 
cached " +
                 s"entry will be invalidated.")
-            cachedDataSourceTables.invalidate(tableIdentifier)
+            tableRelationCache.invalidate(tableIdentifier)
             None
         }
       case other =>
@@ -195,7 +113,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
           s"${metastoreRelation.databaseName}.${metastoreRelation.tableName} 
should be stored " +
             s"as $expectedFileFormat. However, we are getting a $other from 
the metastore cache. " +
             s"This cached entry will be invalidated.")
-        cachedDataSourceTables.invalidate(tableIdentifier)
+        tableRelationCache.invalidate(tableIdentifier)
         None
     }
   }
@@ -270,7 +188,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
 
           val created = LogicalRelation(relation,
             catalogTable = Some(metastoreRelation.catalogTable))
-          cachedDataSourceTables.put(tableIdentifier, created)
+          tableRelationCache.put(tableIdentifier, created)
           created
         }
 
@@ -298,7 +216,7 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
                 className = fileType).resolveRelation(),
               catalogTable = Some(metastoreRelation.catalogTable))
 
-          cachedDataSourceTables.put(tableIdentifier, created)
+          tableRelationCache.put(tableIdentifier, created)
           created
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
index b3cbbed..44ef5cc 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionCatalog.scala
@@ -27,12 +27,12 @@ import 
org.apache.hadoop.hive.ql.udf.generic.{AbstractGenericUDAFResolver, Gener
 
 import org.apache.spark.sql.{AnalysisException, SparkSession}
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
-import org.apache.spark.sql.catalyst.analysis.{FunctionRegistry, 
NoSuchTableException}
+import org.apache.spark.sql.catalyst.analysis.FunctionRegistry
 import org.apache.spark.sql.catalyst.analysis.FunctionRegistry.FunctionBuilder
 import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, 
GlobalTempViewManager, SessionCatalog}
 import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, 
ExpressionInfo}
 import org.apache.spark.sql.catalyst.parser.ParserInterface
-import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.catalyst.rules.Rule
 import org.apache.spark.sql.hive.HiveShim.HiveFunctionWrapper
 import org.apache.spark.sql.internal.SQLConf
@@ -58,28 +58,6 @@ private[sql] class HiveSessionCatalog(
     hadoopConf,
     parser) {
 
-  override def lookupRelation(name: TableIdentifier, alias: Option[String] = 
None): LogicalPlan = {
-    synchronized {
-      val table = formatTableName(name.table)
-      val db = formatDatabaseName(name.database.getOrElse(currentDb))
-      if (db == globalTempViewManager.database) {
-        val relationAlias = alias.getOrElse(table)
-        globalTempViewManager.get(table).map { viewDef =>
-          SubqueryAlias(relationAlias, viewDef, Some(name))
-        }.getOrElse(throw new NoSuchTableException(db, table))
-      } else if (name.database.isDefined || !tempTables.contains(table)) {
-        val newName = name.copy(database = Some(db), table = table)
-        metastoreCatalog.lookupRelation(newName, alias)
-      } else {
-        val relation = tempTables(table)
-        val tableWithQualifiers = SubqueryAlias(table, relation, None)
-        // If an alias was specified by the lookup, wrap the plan in a 
subquery so that
-        // attributes are properly qualified with this alias.
-        alias.map(a => SubqueryAlias(a, tableWithQualifiers, 
None)).getOrElse(tableWithQualifiers)
-      }
-    }
-  }
-
   // ----------------------------------------------------------------
   // | Methods and fields for interacting with HiveMetastoreCatalog |
   // ----------------------------------------------------------------
@@ -93,15 +71,6 @@ private[sql] class HiveSessionCatalog(
   val ParquetConversions: Rule[LogicalPlan] = 
metastoreCatalog.ParquetConversions
   val OrcConversions: Rule[LogicalPlan] = metastoreCatalog.OrcConversions
 
-  override def refreshTable(name: TableIdentifier): Unit = {
-    super.refreshTable(name)
-    metastoreCatalog.refreshTable(name)
-  }
-
-  def invalidateCache(): Unit = {
-    metastoreCatalog.cachedDataSourceTables.invalidateAll()
-  }
-
   def hiveDefaultTableFilePath(name: TableIdentifier): String = {
     metastoreCatalog.hiveDefaultTableFilePath(name)
   }
@@ -109,7 +78,7 @@ private[sql] class HiveSessionCatalog(
   // For testing only
   private[hive] def getCachedDataSourceTable(table: TableIdentifier): 
LogicalPlan = {
     val key = metastoreCatalog.getQualifiedTableName(table)
-    metastoreCatalog.cachedDataSourceTables.getIfPresent(key)
+    sparkSession.sessionState.catalog.tableRelationCache.getIfPresent(key)
   }
 
   override def makeFunctionBuilder(funcName: String, className: String): 
FunctionBuilder = {

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
index 4e30d03..d3cef6e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveSessionState.scala
@@ -67,6 +67,8 @@ private[hive] class HiveSessionState(sparkSession: 
SparkSession)
         DataSourceAnalysis(conf) ::
         new DetermineHiveSerde(conf) ::
         new HiveAnalysis(sparkSession) ::
+        new FindDataSourceTable(sparkSession) ::
+        new FindHiveSerdeTable(sparkSession) ::
         new ResolveDataSource(sparkSession) :: Nil
 
       override val extendedCheckRules = Seq(PreWriteCheck(conf, catalog))

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
index 7987a0a..b649612 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveStrategies.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.hive
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.catalog.CatalogStorageFormat
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
SimpleCatalogRelation}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.planning._
 import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, 
LogicalPlan, ScriptTransformation}
@@ -127,6 +127,21 @@ class HiveAnalysis(session: SparkSession) extends 
Rule[LogicalPlan] {
   }
 }
 
+/**
+ * Replaces [[SimpleCatalogRelation]] with [[MetastoreRelation]] if its table 
provider is hive.
+ */
+class FindHiveSerdeTable(session: SparkSession) extends Rule[LogicalPlan] {
+  override def apply(plan: LogicalPlan): LogicalPlan = plan transform {
+    case i @ InsertIntoTable(s: SimpleCatalogRelation, _, _, _, _)
+        if DDLUtils.isHiveTable(s.metadata) =>
+      i.copy(table =
+        MetastoreRelation(s.metadata.database, 
s.metadata.identifier.table)(s.metadata, session))
+
+    case s: SimpleCatalogRelation if DDLUtils.isHiveTable(s.metadata) =>
+      MetastoreRelation(s.metadata.database, 
s.metadata.identifier.table)(s.metadata, session)
+  }
+}
+
 private[hive] trait HiveStrategies {
   // Possibly being too clever with types here... or not clever enough.
   self: SparkPlanner =>

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
index ef5a5a0..ccc2d64 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateHiveTableAsSelectCommand.scala
@@ -20,8 +20,8 @@ package org.apache.spark.sql.hive.execution
 import scala.util.control.NonFatal
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
-import org.apache.spark.sql.catalyst.catalog.CatalogTable
-import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, 
LogicalPlan}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, 
SimpleCatalogRelation}
+import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoTable, 
LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.command.RunnableCommand
 import org.apache.spark.sql.hive.MetastoreRelation
 
@@ -73,7 +73,9 @@ case class CreateHiveTableAsSelectCommand(
 
       // Get the Metastore Relation
       sparkSession.sessionState.catalog.lookupRelation(tableIdentifier) match {
-        case r: MetastoreRelation => r
+        case SubqueryAlias(_, r: SimpleCatalogRelation, _) =>
+          val tableMeta = r.metadata
+          MetastoreRelation(tableMeta.database, 
tableMeta.identifier.table)(tableMeta, sparkSession)
       }
     }
     // TODO ideally, we should get the output data ready first and then

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index dcb8e49..3267c23 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -431,7 +431,7 @@ private[hive] class TestHiveSparkSession(
       sharedState.cacheManager.clearCache()
       loadedTables.clear()
       sessionState.catalog.clearTempTables()
-      sessionState.catalog.invalidateCache()
+      sessionState.catalog.tableRelationCache.invalidateAll()
 
       sessionState.metadataHive.reset()
 

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 081f6f6..f0e2c93 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -1322,4 +1322,26 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
       sparkSession.sparkContext.conf.set(DEBUG_MODE, previousValue)
     }
   }
+
+  test("SPARK-18464: support old table which doesn't store schema in table 
properties") {
+    withTable("old") {
+      withTempPath { path =>
+        Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath)
+        val tableDesc = CatalogTable(
+          identifier = TableIdentifier("old", Some("default")),
+          tableType = CatalogTableType.EXTERNAL,
+          storage = CatalogStorageFormat.empty.copy(
+            properties = Map("path" -> path.getAbsolutePath)
+          ),
+          schema = new StructType(),
+          properties = Map(
+            HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet"))
+        hiveClient.createTable(tableDesc, ignoreIfExists = false)
+
+        checkAnswer(spark.table("old"), Row(1, "a"))
+
+        checkAnswer(sql("DESC old"), Row("i", "int", null) :: Row("j", 
"string", null) :: Nil)
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 0053aa1..e2fcd2f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -62,7 +62,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase 
with TestHiveSingleto
 
         spark.conf.set(SQLConf.ENABLE_FALL_BACK_TO_HDFS_FOR_STATS.key, true)
 
-        val relation = 
spark.sessionState.catalog.lookupRelation(TableIdentifier("csv_table"))
+        val relation = 
spark.table("csv_table").queryExecution.analyzed.children.head
           .asInstanceOf[MetastoreRelation]
 
         val properties = relation.hiveQlTable.getParameters
@@ -80,7 +80,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase 
with TestHiveSingleto
 
   test("analyze MetastoreRelations") {
     def queryTotalSize(tableName: String): BigInt =
-      
spark.sessionState.catalog.lookupRelation(TableIdentifier(tableName)).stats(conf).sizeInBytes
+      spark.table(tableName).queryExecution.analyzed.stats(conf).sizeInBytes
 
     // Non-partitioned table
     sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
@@ -451,7 +451,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase 
with TestHiveSingleto
         sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
       }
       // Table lookup will make the table cached.
-      catalog.lookupRelation(tableIndent)
+      spark.table(tableIndent)
       statsBeforeUpdate = catalog.getCachedDataSourceTable(tableIndent)
         .asInstanceOf[LogicalRelation].catalogTable.get.stats.get
 
@@ -461,7 +461,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase 
with TestHiveSingleto
       } else {
         sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS")
       }
-      catalog.lookupRelation(tableIndent)
+      spark.table(tableIndent)
       statsAfterUpdate = catalog.getCachedDataSourceTable(tableIndent)
         .asInstanceOf[LogicalRelation].catalogTable.get.stats.get
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/2e625600/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 953e291..104b525 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{EliminateSubqueryAliases, 
FunctionRegistry, NoSuchPartitionException}
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType
 import org.apache.spark.sql.catalyst.parser.ParseException
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, 
LogicalRelation}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.hive.{HiveUtils, MetastoreRelation}
@@ -513,8 +514,12 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
       isDataSourceTable: Boolean,
       format: String,
       userSpecifiedLocation: Option[String] = None): Unit = {
-    val relation = EliminateSubqueryAliases(
-      sessionState.catalog.lookupRelation(TableIdentifier(tableName)))
+    var relation: LogicalPlan = null
+    withSQLConf(
+      HiveUtils.CONVERT_METASTORE_PARQUET.key -> "false",
+      HiveUtils.CONVERT_METASTORE_ORC.key -> "false") {
+      relation = 
EliminateSubqueryAliases(spark.table(tableName).queryExecution.analyzed)
+    }
     val catalogTable =
       sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
     relation match {
@@ -1021,13 +1026,11 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
     // generates an invalid query plan.
     val rdd = sparkContext.makeRDD((1 to 5).map(i => s"""{"a":[$i, ${i + 
1}]}"""))
     read.json(rdd).createOrReplaceTempView("data")
-    val originalConf = sessionState.conf.convertCTAS
-    setConf(SQLConf.CONVERT_CTAS, false)
 
-    try {
+    withSQLConf(SQLConf.CONVERT_CTAS.key -> "false") {
       sql("CREATE TABLE explodeTest (key bigInt)")
       table("explodeTest").queryExecution.analyzed match {
-        case metastoreRelation: MetastoreRelation => // OK
+        case SubqueryAlias(_, r: MetastoreRelation, _) => // OK
         case _ =>
           fail("To correctly test the fix of SPARK-5875, explodeTest should be 
a MetastoreRelation")
       }
@@ -1040,8 +1043,6 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
 
       sql("DROP TABLE explodeTest")
       dropTempTable("data")
-    } finally {
-      setConf(SQLConf.CONVERT_CTAS, originalConf)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to