Repository: spark
Updated Branches:
  refs/heads/master 8bde03bf9 -> b50b34f56


[SPARK-17609][SQL] SessionCatalog.tableExists should not check temp view

## What changes were proposed in this pull request?

After #15054 , there is no place in Spark SQL that need 
`SessionCatalog.tableExists` to check temp views, so this PR makes 
`SessionCatalog.tableExists` only check permanent table/view and removes some 
hacks.

This PR also improves the `getTempViewOrPermanentTableMetadata` that is 
introduced in  #15054 , to make the code simpler.

## How was this patch tested?

existing tests

Author: Wenchen Fan <wenc...@databricks.com>

Closes #15160 from cloud-fan/exists.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b50b34f5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b50b34f5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b50b34f5

Branch: refs/heads/master
Commit: b50b34f5611a1f182ba9b6eaf86c666bbd9f9eb0
Parents: 8bde03b
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Thu Sep 22 12:52:09 2016 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Sep 22 12:52:09 2016 +0800

----------------------------------------------------------------------
 .../sql/catalyst/catalog/SessionCatalog.scala   | 70 ++++++++++----------
 .../catalyst/catalog/SessionCatalogSuite.scala  | 30 +++++----
 .../org/apache/spark/sql/DataFrameWriter.scala  |  9 +--
 .../command/createDataSourceTables.scala        | 15 ++---
 .../spark/sql/execution/command/ddl.scala       | 43 +++++-------
 .../spark/sql/execution/command/tables.scala    | 17 +----
 .../apache/spark/sql/internal/CatalogImpl.scala |  6 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |  2 +-
 .../spark/sql/hive/execution/HiveDDLSuite.scala |  4 +-
 9 files changed, 81 insertions(+), 115 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b50b34f5/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index ef29c75..8c01c7a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -246,6 +246,16 @@ class SessionCatalog(
   }
 
   /**
+   * Return whether a table/view with the specified name exists. If no 
database is specified, check
+   * with current database.
+   */
+  def tableExists(name: TableIdentifier): Boolean = synchronized {
+    val db = formatDatabaseName(name.database.getOrElse(currentDb))
+    val table = formatTableName(name.table)
+    externalCatalog.tableExists(db, table)
+  }
+
+  /**
    * Retrieve the metadata of an existing permanent table/view. If no database 
is specified,
    * assume the table/view is in the current database. If the specified 
table/view is not found
    * in the database then a [[NoSuchTableException]] is thrown.
@@ -271,24 +281,6 @@ class SessionCatalog(
   }
 
   /**
-   * Retrieve the metadata of an existing temporary view or permanent 
table/view.
-   * If the temporary view does not exist, tries to get the metadata an 
existing permanent
-   * table/view. If no database is specified, assume the table/view is in the 
current database.
-   * If the specified table/view is not found in the database then a 
[[NoSuchTableException]] is
-   * thrown.
-   */
-  def getTempViewOrPermanentTableMetadata(name: String): CatalogTable = 
synchronized {
-    val table = formatTableName(name)
-    getTempView(table).map { plan =>
-      CatalogTable(
-        identifier = TableIdentifier(table),
-        tableType = CatalogTableType.VIEW,
-        storage = CatalogStorageFormat.empty,
-        schema = plan.output.toStructType)
-    }.getOrElse(getTableMetadata(TableIdentifier(name)))
-  }
-
-  /**
    * Load files stored in given path into an existing metastore table.
    * If no database is specified, assume the table is in the current database.
    * If the specified table is not found in the database then a 
[[NoSuchTableException]] is thrown.
@@ -369,6 +361,30 @@ class SessionCatalog(
   // -------------------------------------------------------------
 
   /**
+   * Retrieve the metadata of an existing temporary view or permanent 
table/view.
+   *
+   * If a database is specified in `name`, this will return the metadata of 
table/view in that
+   * database.
+   * If no database is specified, this will first attempt to get the metadata 
of a temporary view
+   * with the same name, then, if that does not exist, return the metadata of 
table/view in the
+   * current database.
+   */
+  def getTempViewOrPermanentTableMetadata(name: TableIdentifier): CatalogTable 
= synchronized {
+    val table = formatTableName(name.table)
+    if (name.database.isDefined) {
+      getTableMetadata(name)
+    } else {
+      getTempView(table).map { plan =>
+        CatalogTable(
+          identifier = TableIdentifier(table),
+          tableType = CatalogTableType.VIEW,
+          storage = CatalogStorageFormat.empty,
+          schema = plan.output.toStructType)
+      }.getOrElse(getTableMetadata(name))
+    }
+  }
+
+  /**
    * Rename a table.
    *
    * If a database is specified in `oldName`, this will rename the table in 
that database.
@@ -450,24 +466,6 @@ class SessionCatalog(
   }
 
   /**
-   * Return whether a table/view with the specified name exists.
-   *
-   * Note: If a database is explicitly specified, then this will return 
whether the table/view
-   * exists in that particular database instead. In that case, even if there 
is a temporary
-   * table with the same name, we will return false if the specified database 
does not
-   * contain the table/view.
-   */
-  def tableExists(name: TableIdentifier): Boolean = synchronized {
-    val db = formatDatabaseName(name.database.getOrElse(currentDb))
-    val table = formatTableName(name.table)
-    if (isTemporaryTable(name)) {
-      true
-    } else {
-      externalCatalog.tableExists(db, table)
-    }
-  }
-
-  /**
    * Return whether a table with the specified name is a temporary table.
    *
    * Note: The temporary table cache is checked only when database is not

http://git-wip-us.apache.org/repos/asf/spark/blob/b50b34f5/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
index 384a730..915ed8f 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala
@@ -425,35 +425,37 @@ class SessionCatalogSuite extends SparkFunSuite {
     assert(!catalog.tableExists(TableIdentifier("tbl2", Some("db1"))))
     // If database is explicitly specified, do not check temporary tables
     val tempTable = Range(1, 10, 1, 10)
-    catalog.createTempView("tbl3", tempTable, overrideIfExists = false)
     assert(!catalog.tableExists(TableIdentifier("tbl3", Some("db2"))))
     // If database is not explicitly specified, check the current database
     catalog.setCurrentDatabase("db2")
     assert(catalog.tableExists(TableIdentifier("tbl1")))
     assert(catalog.tableExists(TableIdentifier("tbl2")))
-    assert(catalog.tableExists(TableIdentifier("tbl3")))
-  }
 
-  test("tableExists on temporary views") {
-    val catalog = new SessionCatalog(newBasicCatalog())
-    val tempTable = Range(1, 10, 2, 10)
-    assert(!catalog.tableExists(TableIdentifier("view1")))
-    assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
-    catalog.createTempView("view1", tempTable, overrideIfExists = false)
-    assert(catalog.tableExists(TableIdentifier("view1")))
-    assert(!catalog.tableExists(TableIdentifier("view1", Some("default"))))
+    catalog.createTempView("tbl3", tempTable, overrideIfExists = false)
+    // tableExists should not check temp view.
+    assert(!catalog.tableExists(TableIdentifier("tbl3")))
   }
 
   test("getTempViewOrPermanentTableMetadata on temporary views") {
     val catalog = new SessionCatalog(newBasicCatalog())
     val tempTable = Range(1, 10, 2, 10)
     intercept[NoSuchTableException] {
-      catalog.getTempViewOrPermanentTableMetadata("view1")
+      catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1"))
+    }.getMessage
+
+    intercept[NoSuchTableException] {
+      catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", 
Some("default")))
     }.getMessage
 
     catalog.createTempView("view1", tempTable, overrideIfExists = false)
-    assert(catalog.getTempViewOrPermanentTableMetadata("view1").identifier ==
-      TableIdentifier("view1"), "the temporary view `view1` should exist")
+    assert(catalog.getTempViewOrPermanentTableMetadata(
+      TableIdentifier("view1")).identifier.table == "view1")
+    assert(catalog.getTempViewOrPermanentTableMetadata(
+      TableIdentifier("view1")).schema(0).name == "id")
+
+    intercept[NoSuchTableException] {
+      catalog.getTempViewOrPermanentTableMetadata(TableIdentifier("view1", 
Some("default")))
+    }.getMessage
   }
 
   test("list tables without pattern") {

http://git-wip-us.apache.org/repos/asf/spark/blob/b50b34f5/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 9e343b5..64d3422 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -361,12 +361,7 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
       throw new AnalysisException("Cannot create hive serde table with 
saveAsTable API")
     }
 
-    val sessionState = df.sparkSession.sessionState
-    val db = 
tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
-    val tableIdentWithDB = tableIdent.copy(database = Some(db))
-    // Pass a table identifier with database part, so that `tableExists` won't 
check temp views
-    // unexpectedly.
-    val tableExists = sessionState.catalog.tableExists(tableIdentWithDB)
+    val tableExists = 
df.sparkSession.sessionState.catalog.tableExists(tableIdent)
 
     (tableExists, mode) match {
       case (true, SaveMode.Ignore) =>
@@ -392,7 +387,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
           bucketSpec = getBucketSpec
         )
         val cmd = CreateTable(tableDesc, mode, Some(df.logicalPlan))
-        sessionState.executePlan(cmd).toRdd
+        df.sparkSession.sessionState.executePlan(cmd).toRdd
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b50b34f5/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
index d8e20b0..a04a13e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
@@ -47,15 +47,11 @@ case class CreateDataSourceTableCommand(table: 
CatalogTable, ignoreIfExists: Boo
     assert(table.provider.isDefined)
 
     val sessionState = sparkSession.sessionState
-    val db = 
table.identifier.database.getOrElse(sessionState.catalog.getCurrentDatabase)
-    val tableIdentWithDB = table.identifier.copy(database = Some(db))
-    // Pass a table identifier with database part, so that `tableExists` won't 
check temp views
-    // unexpectedly.
-    if (sessionState.catalog.tableExists(tableIdentWithDB)) {
+    if (sessionState.catalog.tableExists(table.identifier)) {
       if (ignoreIfExists) {
         return Seq.empty[Row]
       } else {
-        throw new AnalysisException(s"Table ${tableIdentWithDB.unquotedString} 
already exists.")
+        throw new AnalysisException(s"Table ${table.identifier.unquotedString} 
already exists.")
       }
     }
 
@@ -146,8 +142,6 @@ case class CreateDataSourceTableAsSelectCommand(
 
     var createMetastoreTable = false
     var existingSchema = Option.empty[StructType]
-    // Pass a table identifier with database part, so that `tableExists` won't 
check temp views
-    // unexpectedly.
     if (sparkSession.sessionState.catalog.tableExists(tableIdentWithDB)) {
       // Check if we need to throw an exception or just return.
       mode match {
@@ -172,8 +166,9 @@ case class CreateDataSourceTableAsSelectCommand(
           // TODO: Check that options from the resolved relation match the 
relation that we are
           // inserting into (i.e. using the same compression).
 
-          EliminateSubqueryAliases(
-            sessionState.catalog.lookupRelation(tableIdentWithDB)) match {
+          // Pass a table identifier with database part, so that 
`lookupRelation` won't get temp
+          // views unexpectedly.
+          
EliminateSubqueryAliases(sessionState.catalog.lookupRelation(tableIdentWithDB)) 
match {
             case l @ LogicalRelation(_: InsertableRelation | _: 
HadoopFsRelation, _, _) =>
               // check if the file formats match
               l.relation match {

http://git-wip-us.apache.org/repos/asf/spark/blob/b50b34f5/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index b57b2d2..01ac898 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -183,32 +183,25 @@ case class DropTableCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
-    if (!catalog.tableExists(tableName)) {
-      if (!ifExists) {
-        val objectName = if (isView) "View" else "Table"
-        throw new AnalysisException(s"$objectName to drop '$tableName' does 
not exist")
-      }
-    } else {
-      // If the command DROP VIEW is to drop a table or DROP TABLE is to drop 
a view
-      // issue an exception.
-      catalog.getTableMetadataOption(tableName).map(_.tableType match {
-        case CatalogTableType.VIEW if !isView =>
-          throw new AnalysisException(
-            "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
-        case o if o != CatalogTableType.VIEW && isView =>
-          throw new AnalysisException(
-            s"Cannot drop a table with DROP VIEW. Please use DROP TABLE 
instead")
-        case _ =>
-      })
-      try {
-        sparkSession.sharedState.cacheManager.uncacheQuery(
-          sparkSession.table(tableName.quotedString))
-      } catch {
-        case NonFatal(e) => log.warn(e.toString, e)
-      }
-      catalog.refreshTable(tableName)
-      catalog.dropTable(tableName, ifExists, purge)
+    // If the command DROP VIEW is to drop a table or DROP TABLE is to drop a 
view
+    // issue an exception.
+    catalog.getTableMetadataOption(tableName).map(_.tableType match {
+      case CatalogTableType.VIEW if !isView =>
+        throw new AnalysisException(
+          "Cannot drop a view with DROP TABLE. Please use DROP VIEW instead")
+      case o if o != CatalogTableType.VIEW && isView =>
+        throw new AnalysisException(
+          s"Cannot drop a table with DROP VIEW. Please use DROP TABLE instead")
+      case _ =>
+    })
+    try {
+      sparkSession.sharedState.cacheManager.uncacheQuery(
+        sparkSession.table(tableName.quotedString))
+    } catch {
+      case NonFatal(e) => log.warn(e.toString, e)
     }
+    catalog.refreshTable(tableName)
+    catalog.dropTable(tableName, ifExists, purge)
     Seq.empty[Row]
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/b50b34f5/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 94b46c5..0f61629 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -59,16 +59,7 @@ case class CreateTableLikeCommand(
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
-    if (!catalog.tableExists(sourceTable)) {
-      throw new AnalysisException(
-        s"Source table in CREATE TABLE LIKE does not exist: '$sourceTable'")
-    }
-
-    val sourceTableDesc = if (sourceTable.database.isDefined) {
-      catalog.getTableMetadata(sourceTable)
-    } else {
-      catalog.getTempViewOrPermanentTableMetadata(sourceTable.table)
-    }
+    val sourceTableDesc = 
catalog.getTempViewOrPermanentTableMetadata(sourceTable)
 
     // Storage format
     val newStorage =
@@ -602,11 +593,7 @@ case class ShowColumnsCommand(tableName: TableIdentifier) 
extends RunnableComman
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val catalog = sparkSession.sessionState.catalog
-    val table = if (tableName.database.isDefined) {
-      catalog.getTableMetadata(tableName)
-    } else {
-      catalog.getTempViewOrPermanentTableMetadata(tableName.table)
-    }
+    val table = catalog.getTempViewOrPermanentTableMetadata(tableName)
     table.schema.map { c =>
       Row(c.name)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/b50b34f5/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
index 6fecda2..f252535 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala
@@ -151,11 +151,7 @@ class CatalogImpl(sparkSession: SparkSession) extends 
Catalog {
   }
 
   private def listColumns(tableIdentifier: TableIdentifier): Dataset[Column] = 
{
-    val tableMetadata = if (tableIdentifier.database.isDefined) {
-      sessionCatalog.getTableMetadata(tableIdentifier)
-    } else {
-      sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier.table)
-    }
+    val tableMetadata = 
sessionCatalog.getTempViewOrPermanentTableMetadata(tableIdentifier)
 
     val partitionColumnNames = tableMetadata.partitionColumnNames.toSet
     val bucketColumnNames = 
tableMetadata.bucketSpec.map(_.bucketColumnNames).getOrElse(Nil).toSet

http://git-wip-us.apache.org/repos/asf/spark/blob/b50b34f5/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
index 7143adf..8ae6868 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
@@ -515,7 +515,7 @@ class MetastoreDataSourcesSuite extends QueryTest with 
SQLTestUtils with TestHiv
           assert(
             intercept[AnalysisException] {
               sparkSession.catalog.createExternalTable("createdJsonTable", 
jsonFilePath.toString)
-            }.getMessage.contains("Table default.createdJsonTable already 
exists."),
+            }.getMessage.contains("Table createdJsonTable already exists."),
             "We should complain that createdJsonTable already exists")
         }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b50b34f5/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 38482f6..c927e5d 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -678,8 +678,8 @@ class HiveDDLSuite
           .createTempView(sourceViewName)
         sql(s"CREATE TABLE $targetTabName LIKE $sourceViewName")
 
-        val sourceTable =
-          
spark.sessionState.catalog.getTempViewOrPermanentTableMetadata(sourceViewName)
+        val sourceTable = 
spark.sessionState.catalog.getTempViewOrPermanentTableMetadata(
+          TableIdentifier(sourceViewName))
         val targetTable = spark.sessionState.catalog.getTableMetadata(
           TableIdentifier(targetTabName, Some("default")))
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to