[SPARK-13923][SQL] Implement SessionCatalog

## What changes were proposed in this pull request?

As part of the effort to merge `SQLContext` and `HiveContext`, this patch 
implements an internal catalog called `SessionCatalog` that handles temporary 
functions and tables and delegates metastore operations to `ExternalCatalog`. 
Currently, this is still dead code, but in the future it will be part of 
`SessionState` and will replace `o.a.s.sql.catalyst.analysis.Catalog`.

A recent patch #11573 parses Hive commands ourselves in Spark, but still passes 
the entire query text to Hive. In a future patch, we will use `SessionCatalog` 
to implement the parsed commands.

## How was this patch tested?

800+ lines of tests in `SessionCatalogSuite`.

Author: Andrew Or <and...@databricks.com>

Closes #11750 from andrewor14/temp-catalog.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca9ef86c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca9ef86c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca9ef86c

Branch: refs/heads/master
Commit: ca9ef86c84ee84263f437a979017898f4bed0feb
Parents: 92b7057
Author: Andrew Or <and...@databricks.com>
Authored: Wed Mar 16 18:02:43 2016 -0700
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Mar 16 18:02:43 2016 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/TableIdentifier.scala    |  35 -
 .../sql/catalyst/catalog/InMemoryCatalog.scala  |  51 +-
 .../sql/catalyst/catalog/SessionCatalog.scala   | 469 ++++++++++
 .../spark/sql/catalyst/catalog/interface.scala  |  29 +-
 .../apache/spark/sql/catalyst/identifiers.scala |  68 ++
 .../sql/catalyst/catalog/CatalogTestCases.scala | 171 ++--
 .../catalyst/catalog/InMemoryCatalogSuite.scala |   9 +-
 .../catalyst/catalog/SessionCatalogSuite.scala  | 864 +++++++++++++++++++
 .../org/apache/spark/sql/hive/HiveCatalog.scala |   9 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |  26 +-
 .../org/apache/spark/sql/hive/HiveQl.scala      |  14 +-
 .../spark/sql/hive/client/HiveClient.scala      |   2 +-
 .../spark/sql/hive/client/HiveClientImpl.scala  |  15 +-
 .../hive/execution/CreateTableAsSelect.scala    |   4 +-
 .../sql/hive/execution/CreateViewAsSelect.scala |   4 +-
 .../spark/sql/hive/HiveCatalogSuite.scala       |  13 +-
 .../org/apache/spark/sql/hive/HiveQlSuite.scala |  16 +-
 .../sql/hive/MetastoreDataSourcesSuite.scala    |   3 +-
 .../spark/sql/hive/client/VersionsSuite.scala   |   4 +-
 19 files changed, 1604 insertions(+), 202 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
deleted file mode 100644
index 4d4e4de..0000000
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.catalyst
-
-/**
- * Identifies a `table` in `database`.  If `database` is not defined, the 
current database is used.
- */
-private[sql] case class TableIdentifier(table: String, database: 
Option[String]) {
-  def this(table: String) = this(table, None)
-
-  override def toString: String = quotedString
-
-  def quotedString: String = database.map(db => 
s"`$db`.`$table`").getOrElse(s"`$table`")
-
-  def unquotedString: String = database.map(db => 
s"$db.$table").getOrElse(table)
-}
-
-private[sql] object TableIdentifier {
-  def apply(tableName: String): TableIdentifier = new 
TableIdentifier(tableName)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index f3fa795..7ead1dd 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog
 import scala.collection.mutable
 
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 
 
 /**
@@ -68,19 +69,20 @@ class InMemoryCatalog extends ExternalCatalog {
 
   private def requireFunctionExists(db: String, funcName: String): Unit = {
     if (!existsFunction(db, funcName)) {
-      throw new AnalysisException(s"Function $funcName does not exist in $db 
database")
+      throw new AnalysisException(s"Function '$funcName' does not exist in 
database '$db'")
     }
   }
 
   private def requireTableExists(db: String, table: String): Unit = {
     if (!existsTable(db, table)) {
-      throw new AnalysisException(s"Table $table does not exist in $db 
database")
+      throw new AnalysisException(s"Table '$table' does not exist in database 
'$db'")
     }
   }
 
   private def requirePartitionExists(db: String, table: String, spec: 
TablePartitionSpec): Unit = {
     if (!existsPartition(db, table, spec)) {
-      throw new AnalysisException(s"Partition does not exist in database $db 
table $table: $spec")
+      throw new AnalysisException(
+        s"Partition does not exist in database '$db' table '$table': '$spec'")
     }
   }
 
@@ -93,7 +95,7 @@ class InMemoryCatalog extends ExternalCatalog {
       ignoreIfExists: Boolean): Unit = synchronized {
     if (catalog.contains(dbDefinition.name)) {
       if (!ignoreIfExists) {
-        throw new AnalysisException(s"Database ${dbDefinition.name} already 
exists.")
+        throw new AnalysisException(s"Database '${dbDefinition.name}' already 
exists.")
       }
     } else {
       catalog.put(dbDefinition.name, new DatabaseDesc(dbDefinition))
@@ -108,17 +110,17 @@ class InMemoryCatalog extends ExternalCatalog {
       if (!cascade) {
         // If cascade is false, make sure the database is empty.
         if (catalog(db).tables.nonEmpty) {
-          throw new AnalysisException(s"Database $db is not empty. One or more 
tables exist.")
+          throw new AnalysisException(s"Database '$db' is not empty. One or 
more tables exist.")
         }
         if (catalog(db).functions.nonEmpty) {
-          throw new AnalysisException(s"Database $db is not empty. One or more 
functions exist.")
+          throw new AnalysisException(s"Database '$db' is not empty. One or 
more functions exist.")
         }
       }
       // Remove the database.
       catalog.remove(db)
     } else {
       if (!ignoreIfNotExists) {
-        throw new AnalysisException(s"Database $db does not exist")
+        throw new AnalysisException(s"Database '$db' does not exist")
       }
     }
   }
@@ -156,12 +158,13 @@ class InMemoryCatalog extends ExternalCatalog {
       tableDefinition: CatalogTable,
       ignoreIfExists: Boolean): Unit = synchronized {
     requireDbExists(db)
-    if (existsTable(db, tableDefinition.name)) {
+    val table = tableDefinition.name.table
+    if (existsTable(db, table)) {
       if (!ignoreIfExists) {
-        throw new AnalysisException(s"Table ${tableDefinition.name} already 
exists in $db database")
+        throw new AnalysisException(s"Table '$table' already exists in 
database '$db'")
       }
     } else {
-      catalog(db).tables.put(tableDefinition.name, new 
TableDesc(tableDefinition))
+      catalog(db).tables.put(table, new TableDesc(tableDefinition))
     }
   }
 
@@ -174,7 +177,7 @@ class InMemoryCatalog extends ExternalCatalog {
       catalog(db).tables.remove(table)
     } else {
       if (!ignoreIfNotExists) {
-        throw new AnalysisException(s"Table $table does not exist in $db 
database")
+        throw new AnalysisException(s"Table '$table' does not exist in 
database '$db'")
       }
     }
   }
@@ -182,14 +185,14 @@ class InMemoryCatalog extends ExternalCatalog {
   override def renameTable(db: String, oldName: String, newName: String): Unit 
= synchronized {
     requireTableExists(db, oldName)
     val oldDesc = catalog(db).tables(oldName)
-    oldDesc.table = oldDesc.table.copy(name = newName)
+    oldDesc.table = oldDesc.table.copy(name = TableIdentifier(newName, 
Some(db)))
     catalog(db).tables.put(newName, oldDesc)
     catalog(db).tables.remove(oldName)
   }
 
   override def alterTable(db: String, tableDefinition: CatalogTable): Unit = 
synchronized {
-    requireTableExists(db, tableDefinition.name)
-    catalog(db).tables(tableDefinition.name).table = tableDefinition
+    requireTableExists(db, tableDefinition.name.table)
+    catalog(db).tables(tableDefinition.name.table).table = tableDefinition
   }
 
   override def getTable(db: String, table: String): CatalogTable = 
synchronized {
@@ -222,8 +225,8 @@ class InMemoryCatalog extends ExternalCatalog {
       val dupSpecs = parts.collect { case p if existingParts.contains(p.spec) 
=> p.spec }
       if (dupSpecs.nonEmpty) {
         val dupSpecsStr = dupSpecs.mkString("\n===\n")
-        throw new AnalysisException(
-          s"The following partitions already exist in database $db table 
$table:\n$dupSpecsStr")
+        throw new AnalysisException("The following partitions already exist in 
database " +
+          s"'$db' table '$table':\n$dupSpecsStr")
       }
     }
     parts.foreach { p => existingParts.put(p.spec, p) }
@@ -240,8 +243,8 @@ class InMemoryCatalog extends ExternalCatalog {
       val missingSpecs = partSpecs.collect { case s if 
!existingParts.contains(s) => s }
       if (missingSpecs.nonEmpty) {
         val missingSpecsStr = missingSpecs.mkString("\n===\n")
-        throw new AnalysisException(
-          s"The following partitions do not exist in database $db table 
$table:\n$missingSpecsStr")
+        throw new AnalysisException("The following partitions do not exist in 
database " +
+          s"'$db' table '$table':\n$missingSpecsStr")
       }
     }
     partSpecs.foreach(existingParts.remove)
@@ -292,10 +295,10 @@ class InMemoryCatalog extends ExternalCatalog {
 
   override def createFunction(db: String, func: CatalogFunction): Unit = 
synchronized {
     requireDbExists(db)
-    if (existsFunction(db, func.name)) {
-      throw new AnalysisException(s"Function $func already exists in $db 
database")
+    if (existsFunction(db, func.name.funcName)) {
+      throw new AnalysisException(s"Function '$func' already exists in '$db' 
database")
     } else {
-      catalog(db).functions.put(func.name, func)
+      catalog(db).functions.put(func.name.funcName, func)
     }
   }
 
@@ -306,14 +309,14 @@ class InMemoryCatalog extends ExternalCatalog {
 
   override def renameFunction(db: String, oldName: String, newName: String): 
Unit = synchronized {
     requireFunctionExists(db, oldName)
-    val newFunc = getFunction(db, oldName).copy(name = newName)
+    val newFunc = getFunction(db, oldName).copy(name = 
FunctionIdentifier(newName, Some(db)))
     catalog(db).functions.remove(oldName)
     catalog(db).functions.put(newName, newFunc)
   }
 
   override def alterFunction(db: String, funcDefinition: CatalogFunction): 
Unit = synchronized {
-    requireFunctionExists(db, funcDefinition.name)
-    catalog(db).functions.put(funcDefinition.name, funcDefinition)
+    requireFunctionExists(db, funcDefinition.name.funcName)
+    catalog(db).functions.put(funcDefinition.name.funcName, funcDefinition)
   }
 
   override def getFunction(db: String, funcName: String): CatalogFunction = 
synchronized {

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
new file mode 100644
index 0000000..4dec042
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -0,0 +1,469 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.catalog
+
+import java.util.concurrent.ConcurrentHashMap
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias}
+
+
+/**
+ * An internal catalog that is used by a Spark Session. This internal catalog 
serves as a
+ * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages 
temporary
+ * tables and functions of the Spark Session that it belongs to.
+ */
+class SessionCatalog(externalCatalog: ExternalCatalog) {
+  import ExternalCatalog._
+
+  private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan]
+  private[this] val tempFunctions = new ConcurrentHashMap[String, 
CatalogFunction]
+
+  // Note: we track current database here because certain operations do not 
explicitly
+  // specify the database (e.g. DROP TABLE my_table). In these cases we must 
first
+  // check whether the temporary table or function exists, then, if not, 
operate on
+  // the corresponding item in the current database.
+  private[this] var currentDb = "default"
+
+  // 
----------------------------------------------------------------------------
+  // Databases
+  // 
----------------------------------------------------------------------------
+  // All methods in this category interact directly with the underlying 
catalog.
+  // 
----------------------------------------------------------------------------
+
+  def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): 
Unit = {
+    externalCatalog.createDatabase(dbDefinition, ignoreIfExists)
+  }
+
+  def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): 
Unit = {
+    externalCatalog.dropDatabase(db, ignoreIfNotExists, cascade)
+  }
+
+  def alterDatabase(dbDefinition: CatalogDatabase): Unit = {
+    externalCatalog.alterDatabase(dbDefinition)
+  }
+
+  def getDatabase(db: String): CatalogDatabase = {
+    externalCatalog.getDatabase(db)
+  }
+
+  def databaseExists(db: String): Boolean = {
+    externalCatalog.databaseExists(db)
+  }
+
+  def listDatabases(): Seq[String] = {
+    externalCatalog.listDatabases()
+  }
+
+  def listDatabases(pattern: String): Seq[String] = {
+    externalCatalog.listDatabases(pattern)
+  }
+
+  def getCurrentDatabase: String = currentDb
+
+  def setCurrentDatabase(db: String): Unit = {
+    if (!databaseExists(db)) {
+      throw new AnalysisException(s"cannot set current database to 
non-existent '$db'")
+    }
+    currentDb = db
+  }
+
+  // 
----------------------------------------------------------------------------
+  // Tables
+  // 
----------------------------------------------------------------------------
+  // There are two kinds of tables, temporary tables and metastore tables.
+  // Temporary tables are isolated across sessions and do not belong to any
+  // particular database. Metastore tables can be used across multiple
+  // sessions as their metadata is persisted in the underlying catalog.
+  // 
----------------------------------------------------------------------------
+
+  // ----------------------------------------------------
+  // | Methods that interact with metastore tables only |
+  // ----------------------------------------------------
+
+  /**
+   * Create a metastore table in the database specified in `tableDefinition`.
+   * If no such database is specified, create it in the current database.
+   */
+  def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): 
Unit = {
+    val db = tableDefinition.name.database.getOrElse(currentDb)
+    val newTableDefinition = tableDefinition.copy(
+      name = TableIdentifier(tableDefinition.name.table, Some(db)))
+    externalCatalog.createTable(db, newTableDefinition, ignoreIfExists)
+  }
+
+  /**
+   * Alter the metadata of an existing metastore table identified by 
`tableDefinition`.
+   *
+   * If no database is specified in `tableDefinition`, assume the table is in 
the
+   * current database.
+   *
+   * Note: If the underlying implementation does not support altering a 
certain field,
+   * this becomes a no-op.
+   */
+  def alterTable(tableDefinition: CatalogTable): Unit = {
+    val db = tableDefinition.name.database.getOrElse(currentDb)
+    val newTableDefinition = tableDefinition.copy(
+      name = TableIdentifier(tableDefinition.name.table, Some(db)))
+    externalCatalog.alterTable(db, newTableDefinition)
+  }
+
+  /**
+   * Retrieve the metadata of an existing metastore table.
+   * If no database is specified, assume the table is in the current database.
+   */
+  def getTable(name: TableIdentifier): CatalogTable = {
+    val db = name.database.getOrElse(currentDb)
+    externalCatalog.getTable(db, name.table)
+  }
+
+  // -------------------------------------------------------------
+  // | Methods that interact with temporary and metastore tables |
+  // -------------------------------------------------------------
+
+  /**
+   * Create a temporary table.
+   */
+  def createTempTable(
+      name: String,
+      tableDefinition: LogicalPlan,
+      ignoreIfExists: Boolean): Unit = {
+    if (tempTables.containsKey(name) && !ignoreIfExists) {
+      throw new AnalysisException(s"Temporary table '$name' already exists.")
+    }
+    tempTables.put(name, tableDefinition)
+  }
+
+  /**
+   * Rename a table.
+   *
+   * If a database is specified in `oldName`, this will rename the table in 
that database.
+   * If no database is specified, this will first attempt to rename a 
temporary table with
+   * the same name, then, if that does not exist, rename the table in the 
current database.
+   *
+   * This assumes the database specified in `oldName` matches the one 
specified in `newName`.
+   */
+  def renameTable(oldName: TableIdentifier, newName: TableIdentifier): Unit = {
+    if (oldName.database != newName.database) {
+      throw new AnalysisException("rename does not support moving tables 
across databases")
+    }
+    val db = oldName.database.getOrElse(currentDb)
+    if (oldName.database.isDefined || !tempTables.containsKey(oldName.table)) {
+      externalCatalog.renameTable(db, oldName.table, newName.table)
+    } else {
+      val table = tempTables.remove(oldName.table)
+      tempTables.put(newName.table, table)
+    }
+  }
+
+  /**
+   * Drop a table.
+   *
+   * If a database is specified in `name`, this will drop the table from that 
database.
+   * If no database is specified, this will first attempt to drop a temporary 
table with
+   * the same name, then, if that does not exist, drop the table from the 
current database.
+   */
+  def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = {
+    val db = name.database.getOrElse(currentDb)
+    if (name.database.isDefined || !tempTables.containsKey(name.table)) {
+      externalCatalog.dropTable(db, name.table, ignoreIfNotExists)
+    } else {
+      tempTables.remove(name.table)
+    }
+  }
+
+  /**
+   * Return a [[LogicalPlan]] that represents the given table.
+   *
+   * If a database is specified in `name`, this will return the table from 
that database.
+   * If no database is specified, this will first attempt to return a 
temporary table with
+   * the same name, then, if that does not exist, return the table from the 
current database.
+   */
+  def lookupRelation(name: TableIdentifier, alias: Option[String] = None): 
LogicalPlan = {
+    val db = name.database.getOrElse(currentDb)
+    val relation =
+      if (name.database.isDefined || !tempTables.containsKey(name.table)) {
+        val metadata = externalCatalog.getTable(db, name.table)
+        CatalogRelation(db, metadata, alias)
+      } else {
+        tempTables.get(name.table)
+      }
+    val tableWithQualifiers = SubqueryAlias(name.table, relation)
+    // If an alias was specified by the lookup, wrap the plan in a subquery so 
that
+    // attributes are properly qualified with this alias.
+    alias.map(a => SubqueryAlias(a, 
tableWithQualifiers)).getOrElse(tableWithQualifiers)
+  }
+
+  /**
+   * List all tables in the specified database, including temporary tables.
+   */
+  def listTables(db: String): Seq[TableIdentifier] = {
+    val dbTables = externalCatalog.listTables(db).map { t => 
TableIdentifier(t, Some(db)) }
+    val _tempTables = tempTables.keys().asScala.map { t => TableIdentifier(t) }
+    dbTables ++ _tempTables
+  }
+
+  /**
+   * List all matching tables in the specified database, including temporary 
tables.
+   */
+  def listTables(db: String, pattern: String): Seq[TableIdentifier] = {
+    val dbTables =
+      externalCatalog.listTables(db, pattern).map { t => TableIdentifier(t, 
Some(db)) }
+    val regex = pattern.replaceAll("\\*", ".*").r
+    val _tempTables = tempTables.keys().asScala
+      .filter { t => regex.pattern.matcher(t).matches() }
+      .map { t => TableIdentifier(t) }
+    dbTables ++ _tempTables
+  }
+
+  /**
+   * Return a temporary table exactly as it was stored.
+   * For testing only.
+   */
+  private[catalog] def getTempTable(name: String): Option[LogicalPlan] = {
+    Option(tempTables.get(name))
+  }
+
+  // 
----------------------------------------------------------------------------
+  // Partitions
+  // 
----------------------------------------------------------------------------
+  // All methods in this category interact directly with the underlying 
catalog.
+  // These methods are concerned with only metastore tables.
+  // 
----------------------------------------------------------------------------
+
+  // TODO: We need to figure out how these methods interact with our data 
source
+  // tables. For such tables, we do not store values of partitioning columns in
+  // the metastore. For now, partition values of a data source table will be
+  // automatically discovered when we load the table.
+
+  /**
+   * Create partitions in an existing table, assuming it exists.
+   * If no database is specified, assume the table is in the current database.
+   */
+  def createPartitions(
+      tableName: TableIdentifier,
+      parts: Seq[CatalogTablePartition],
+      ignoreIfExists: Boolean): Unit = {
+    val db = tableName.database.getOrElse(currentDb)
+    externalCatalog.createPartitions(db, tableName.table, parts, 
ignoreIfExists)
+  }
+
+  /**
+   * Drop partitions from a table, assuming they exist.
+   * If no database is specified, assume the table is in the current database.
+   */
+  def dropPartitions(
+      tableName: TableIdentifier,
+      parts: Seq[TablePartitionSpec],
+      ignoreIfNotExists: Boolean): Unit = {
+    val db = tableName.database.getOrElse(currentDb)
+    externalCatalog.dropPartitions(db, tableName.table, parts, 
ignoreIfNotExists)
+  }
+
+  /**
+   * Override the specs of one or many existing table partitions, assuming 
they exist.
+   *
+   * This assumes index i of `specs` corresponds to index i of `newSpecs`.
+   * If no database is specified, assume the table is in the current database.
+   */
+  def renamePartitions(
+      tableName: TableIdentifier,
+      specs: Seq[TablePartitionSpec],
+      newSpecs: Seq[TablePartitionSpec]): Unit = {
+    val db = tableName.database.getOrElse(currentDb)
+    externalCatalog.renamePartitions(db, tableName.table, specs, newSpecs)
+  }
+
+  /**
+   * Alter one or many table partitions whose specs that match those specified 
in `parts`,
+   * assuming the partitions exist.
+   *
+   * If no database is specified, assume the table is in the current database.
+   *
+   * Note: If the underlying implementation does not support altering a 
certain field,
+   * this becomes a no-op.
+   */
+  def alterPartitions(tableName: TableIdentifier, parts: 
Seq[CatalogTablePartition]): Unit = {
+    val db = tableName.database.getOrElse(currentDb)
+    externalCatalog.alterPartitions(db, tableName.table, parts)
+  }
+
+  /**
+   * Retrieve the metadata of a table partition, assuming it exists.
+   * If no database is specified, assume the table is in the current database.
+   */
+  def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): 
CatalogTablePartition = {
+    val db = tableName.database.getOrElse(currentDb)
+    externalCatalog.getPartition(db, tableName.table, spec)
+  }
+
+  /**
+   * List all partitions in a table, assuming it exists.
+   * If no database is specified, assume the table is in the current database.
+   */
+  def listPartitions(tableName: TableIdentifier): Seq[CatalogTablePartition] = 
{
+    val db = tableName.database.getOrElse(currentDb)
+    externalCatalog.listPartitions(db, tableName.table)
+  }
+
+  // 
----------------------------------------------------------------------------
+  // Functions
+  // 
----------------------------------------------------------------------------
+  // There are two kinds of functions, temporary functions and metastore
+  // functions (permanent UDFs). Temporary functions are isolated across
+  // sessions. Metastore functions can be used across multiple sessions as
+  // their metadata is persisted in the underlying catalog.
+  // 
----------------------------------------------------------------------------
+
+  // -------------------------------------------------------
+  // | Methods that interact with metastore functions only |
+  // -------------------------------------------------------
+
+  /**
+   * Create a metastore function in the database specified in `funcDefinition`.
+   * If no such database is specified, create it in the current database.
+   */
+  def createFunction(funcDefinition: CatalogFunction): Unit = {
+    val db = funcDefinition.name.database.getOrElse(currentDb)
+    val newFuncDefinition = funcDefinition.copy(
+      name = FunctionIdentifier(funcDefinition.name.funcName, Some(db)))
+    externalCatalog.createFunction(db, newFuncDefinition)
+  }
+
+  /**
+   * Drop a metastore function.
+   * If no database is specified, assume the function is in the current 
database.
+   */
+  def dropFunction(name: FunctionIdentifier): Unit = {
+    val db = name.database.getOrElse(currentDb)
+    externalCatalog.dropFunction(db, name.funcName)
+  }
+
+  /**
+   * Alter a metastore function whose name that matches the one specified in 
`funcDefinition`.
+   *
+   * If no database is specified in `funcDefinition`, assume the function is 
in the
+   * current database.
+   *
+   * Note: If the underlying implementation does not support altering a 
certain field,
+   * this becomes a no-op.
+   */
+  def alterFunction(funcDefinition: CatalogFunction): Unit = {
+    val db = funcDefinition.name.database.getOrElse(currentDb)
+    val newFuncDefinition = funcDefinition.copy(
+      name = FunctionIdentifier(funcDefinition.name.funcName, Some(db)))
+    externalCatalog.alterFunction(db, newFuncDefinition)
+  }
+
+  // ----------------------------------------------------------------
+  // | Methods that interact with temporary and metastore functions |
+  // ----------------------------------------------------------------
+
+  /**
+   * Create a temporary function.
+   * This assumes no database is specified in `funcDefinition`.
+   */
+  def createTempFunction(funcDefinition: CatalogFunction, ignoreIfExists: 
Boolean): Unit = {
+    require(funcDefinition.name.database.isEmpty,
+      "attempted to create a temporary function while specifying a database")
+    val name = funcDefinition.name.funcName
+    if (tempFunctions.containsKey(name) && !ignoreIfExists) {
+      throw new AnalysisException(s"Temporary function '$name' already 
exists.")
+    }
+    tempFunctions.put(name, funcDefinition)
+  }
+
+  /**
+   * Drop a temporary function.
+   */
+  // TODO: The reason that we distinguish dropFunction and dropTempFunction is 
that
+  // Hive has DROP FUNCTION and DROP TEMPORARY FUNCTION. We may want to 
consolidate
+  // dropFunction and dropTempFunction.
+  def dropTempFunction(name: String, ignoreIfNotExists: Boolean): Unit = {
+    if (!tempFunctions.containsKey(name) && !ignoreIfNotExists) {
+      throw new AnalysisException(
+        s"Temporary function '$name' cannot be dropped because it does not 
exist!")
+    }
+    tempFunctions.remove(name)
+  }
+
+  /**
+   * Rename a function.
+   *
+   * If a database is specified in `oldName`, this will rename the function in 
that database.
+   * If no database is specified, this will first attempt to rename a 
temporary function with
+   * the same name, then, if that does not exist, rename the function in the 
current database.
+   *
+   * This assumes the database specified in `oldName` matches the one 
specified in `newName`.
+   */
+  def renameFunction(oldName: FunctionIdentifier, newName: 
FunctionIdentifier): Unit = {
+    if (oldName.database != newName.database) {
+      throw new AnalysisException("rename does not support moving functions 
across databases")
+    }
+    val db = oldName.database.getOrElse(currentDb)
+    if (oldName.database.isDefined || 
!tempFunctions.containsKey(oldName.funcName)) {
+      externalCatalog.renameFunction(db, oldName.funcName, newName.funcName)
+    } else {
+      val func = tempFunctions.remove(oldName.funcName)
+      val newFunc = func.copy(name = func.name.copy(funcName = 
newName.funcName))
+      tempFunctions.put(newName.funcName, newFunc)
+    }
+  }
+
+  /**
+   * Retrieve the metadata of an existing function.
+   *
+   * If a database is specified in `name`, this will return the function in 
that database.
+   * If no database is specified, this will first attempt to return a 
temporary function with
+   * the same name, then, if that does not exist, return the function in the 
current database.
+   */
+  def getFunction(name: FunctionIdentifier): CatalogFunction = {
+    val db = name.database.getOrElse(currentDb)
+    if (name.database.isDefined || !tempFunctions.containsKey(name.funcName)) {
+      externalCatalog.getFunction(db, name.funcName)
+    } else {
+      tempFunctions.get(name.funcName)
+    }
+  }
+
+  // TODO: implement lookupFunction that returns something from the registry 
itself
+
+  /**
+   * List all matching functions in the specified database, including 
temporary functions.
+   */
+  def listFunctions(db: String, pattern: String): Seq[FunctionIdentifier] = {
+    val dbFunctions =
+      externalCatalog.listFunctions(db, pattern).map { f => 
FunctionIdentifier(f, Some(db)) }
+    val regex = pattern.replaceAll("\\*", ".*").r
+    val _tempFunctions = tempFunctions.keys().asScala
+      .filter { f => regex.pattern.matcher(f).matches() }
+      .map { f => FunctionIdentifier(f) }
+    dbFunctions ++ _tempFunctions
+  }
+
+  /**
+   * Return a temporary function. For testing only.
+   */
+  private[catalog] def getTempFunction(name: String): Option[CatalogFunction] 
= {
+    Option(tempFunctions.get(name))
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index db34af3..c4e4961 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -20,6 +20,9 @@ package org.apache.spark.sql.catalyst.catalog
 import javax.annotation.Nullable
 
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan}
 
 
 /**
@@ -167,7 +170,7 @@ abstract class ExternalCatalog {
  * @param name name of the function
  * @param className fully qualified class name, e.g. 
"org.apache.spark.util.MyFunc"
  */
-case class CatalogFunction(name: String, className: String)
+case class CatalogFunction(name: FunctionIdentifier, className: String)
 
 
 /**
@@ -211,8 +214,7 @@ case class CatalogTablePartition(
  * future once we have a better understanding of how we want to handle skewed 
columns.
  */
 case class CatalogTable(
-    specifiedDatabase: Option[String],
-    name: String,
+    name: TableIdentifier,
     tableType: CatalogTableType,
     storage: CatalogStorageFormat,
     schema: Seq[CatalogColumn],
@@ -226,12 +228,12 @@ case class CatalogTable(
     viewText: Option[String] = None) {
 
   /** Return the database this table was specified to belong to, assuming it 
exists. */
-  def database: String = specifiedDatabase.getOrElse {
+  def database: String = name.database.getOrElse {
     throw new AnalysisException(s"table $name did not specify database")
   }
 
   /** Return the fully qualified name of this table, assuming the database was 
specified. */
-  def qualifiedName: String = s"$database.$name"
+  def qualifiedName: String = name.unquotedString
 
   /** Syntactic sugar to update a field in `storage`. */
   def withNewStorage(
@@ -272,3 +274,20 @@ object ExternalCatalog {
    */
   type TablePartitionSpec = Map[String, String]
 }
+
+
+/**
+ * A [[LogicalPlan]] that wraps [[CatalogTable]].
+ */
+case class CatalogRelation(
+    db: String,
+    metadata: CatalogTable,
+    alias: Option[String] = None)
+  extends LeafNode {
+
+  // TODO: implement this
+  override def output: Seq[Attribute] = Seq.empty
+
+  require(metadata.name.database == Some(db),
+    "provided database does not much the one specified in the table 
definition")
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
new file mode 100644
index 0000000..87f4d1b
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst
+
+
+/**
+ * An identifier that optionally specifies a database.
+ *
+ * Format (unquoted): "name" or "db.name"
+ * Format (quoted): "`name`" or "`db`.`name`"
+ */
+sealed trait IdentifierWithDatabase {
+  val name: String
+  def database: Option[String]
+  def quotedString: String = database.map(db => 
s"`$db`.`$name`").getOrElse(s"`$name`")
+  def unquotedString: String = database.map(db => s"$db.$name").getOrElse(name)
+  override def toString: String = quotedString
+}
+
+
+/**
+ * Identifies a table in a database.
+ * If `database` is not defined, the current database is used.
+ */
+case class TableIdentifier(table: String, database: Option[String])
+  extends IdentifierWithDatabase {
+
+  override val name: String = table
+
+  def this(name: String) = this(name, None)
+
+}
+
+object TableIdentifier {
+  def apply(tableName: String): TableIdentifier = new 
TableIdentifier(tableName)
+}
+
+
+/**
+ * Identifies a function in a database.
+ * If `database` is not defined, the current database is used.
+ */
+case class FunctionIdentifier(funcName: String, database: Option[String])
+  extends IdentifierWithDatabase {
+
+  override val name: String = funcName
+
+  def this(name: String) = this(name, None)
+}
+
+object FunctionIdentifier {
+  def apply(funcName: String): FunctionIdentifier = new 
FunctionIdentifier(funcName)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
index b03ba81..a1ea619 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala
@@ -21,6 +21,8 @@ import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
+import org.apache.spark.util.Utils
 
 
 /**
@@ -29,23 +31,10 @@ import org.apache.spark.sql.AnalysisException
  * Implementations of the [[ExternalCatalog]] interface can create test suites 
by extending this.
  */
 abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach {
-  private lazy val storageFormat = CatalogStorageFormat(
-    locationUri = None,
-    inputFormat = Some(tableInputFormat),
-    outputFormat = Some(tableOutputFormat),
-    serde = None,
-    serdeProperties = Map.empty)
-  private lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), 
storageFormat)
-  private lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), 
storageFormat)
-  private lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), 
storageFormat)
-  private val funcClass = "org.apache.spark.myFunc"
-
-  // Things subclasses should override
-  protected val tableInputFormat: String = 
"org.apache.park.serde.MyInputFormat"
-  protected val tableOutputFormat: String = 
"org.apache.park.serde.MyOutputFormat"
-  protected def newUriForDatabase(): String = "uri"
+  protected val utils: CatalogTestUtils
+  import utils._
+
   protected def resetState(): Unit = { }
-  protected def newEmptyCatalog(): ExternalCatalog
 
   // Clear all state after each test
   override def afterEach(): Unit = {
@@ -56,62 +45,6 @@ abstract class CatalogTestCases extends SparkFunSuite with 
BeforeAndAfterEach {
     }
   }
 
-  /**
-   * Creates a basic catalog, with the following structure:
-   *
-   * default
-   * db1
-   * db2
-   *   - tbl1
-   *   - tbl2
-   *     - part1
-   *     - part2
-   *   - func1
-   */
-  private def newBasicCatalog(): ExternalCatalog = {
-    val catalog = newEmptyCatalog()
-    // When testing against a real catalog, the default database may already 
exist
-    catalog.createDatabase(newDb("default"), ignoreIfExists = true)
-    catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
-    catalog.createDatabase(newDb("db2"), ignoreIfExists = false)
-    catalog.createTable("db2", newTable("tbl1", "db2"), ignoreIfExists = false)
-    catalog.createTable("db2", newTable("tbl2", "db2"), ignoreIfExists = false)
-    catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists 
= false)
-    catalog.createFunction("db2", newFunc("func1"))
-    catalog
-  }
-
-  private def newFunc(): CatalogFunction = CatalogFunction("funcname", 
funcClass)
-
-  private def newDb(name: String): CatalogDatabase = {
-    CatalogDatabase(name, name + " description", newUriForDatabase(), 
Map.empty)
-  }
-
-  private def newTable(name: String, db: String): CatalogTable = {
-    CatalogTable(
-      specifiedDatabase = Some(db),
-      name = name,
-      tableType = CatalogTableType.EXTERNAL_TABLE,
-      storage = storageFormat,
-      schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", 
"string")),
-      partitionColumns = Seq(CatalogColumn("a", "int"), CatalogColumn("b", 
"string")))
-  }
-
-  private def newFunc(name: String): CatalogFunction = CatalogFunction(name, 
funcClass)
-
-  /**
-   * Whether the catalog's table partitions equal the ones given.
-   * Note: Hive sets some random serde things, so we just compare the specs 
here.
-   */
-  private def catalogPartitionsEqual(
-      catalog: ExternalCatalog,
-      db: String,
-      table: String,
-      parts: Seq[CatalogTablePartition]): Boolean = {
-    catalog.listPartitions(db, table).map(_.spec).toSet == 
parts.map(_.spec).toSet
-  }
-
-
   // --------------------------------------------------------------------------
   // Databases
   // --------------------------------------------------------------------------
@@ -277,7 +210,7 @@ abstract class CatalogTestCases extends SparkFunSuite with 
BeforeAndAfterEach {
   }
 
   test("get table") {
-    assert(newBasicCatalog().getTable("db2", "tbl1").name == "tbl1")
+    assert(newBasicCatalog().getTable("db2", "tbl1").name.table == "tbl1")
   }
 
   test("get table when database/table does not exist") {
@@ -409,7 +342,7 @@ abstract class CatalogTestCases extends SparkFunSuite with 
BeforeAndAfterEach {
 
   test("alter partitions") {
     val catalog = newBasicCatalog()
-    try{
+    try {
       // Note: Before altering table partitions in Hive, you *must* set the 
current database
       // to the one that contains the table of interest. Otherwise you will 
end up with the
       // most helpful error message ever: "Unable to alter partition. alter is 
not possible."
@@ -498,7 +431,8 @@ abstract class CatalogTestCases extends SparkFunSuite with 
BeforeAndAfterEach {
 
   test("get function") {
     val catalog = newBasicCatalog()
-    assert(catalog.getFunction("db2", "func1") == newFunc("func1"))
+    assert(catalog.getFunction("db2", "func1") ==
+      CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass))
     intercept[AnalysisException] {
       catalog.getFunction("db2", "does_not_exist")
     }
@@ -517,7 +451,7 @@ abstract class CatalogTestCases extends SparkFunSuite with 
BeforeAndAfterEach {
     assert(catalog.getFunction("db2", "func1").className == funcClass)
     catalog.renameFunction("db2", "func1", newName)
     intercept[AnalysisException] { catalog.getFunction("db2", "func1") }
-    assert(catalog.getFunction("db2", newName).name == newName)
+    assert(catalog.getFunction("db2", newName).name.funcName == newName)
     assert(catalog.getFunction("db2", newName).className == funcClass)
     intercept[AnalysisException] { catalog.renameFunction("db2", 
"does_not_exist", "me") }
   }
@@ -553,3 +487,88 @@ abstract class CatalogTestCases extends SparkFunSuite with 
BeforeAndAfterEach {
   }
 
 }
+
+
+/**
+ * A collection of utility fields and methods for tests related to the 
[[ExternalCatalog]].
+ */
+abstract class CatalogTestUtils {
+
+  // Unimplemented methods
+  val tableInputFormat: String
+  val tableOutputFormat: String
+  def newEmptyCatalog(): ExternalCatalog
+
+  // These fields must be lazy because they rely on fields that are not 
implemented yet
+  lazy val storageFormat = CatalogStorageFormat(
+    locationUri = None,
+    inputFormat = Some(tableInputFormat),
+    outputFormat = Some(tableOutputFormat),
+    serde = None,
+    serdeProperties = Map.empty)
+  lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), 
storageFormat)
+  lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), 
storageFormat)
+  lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), 
storageFormat)
+  lazy val funcClass = "org.apache.spark.myFunc"
+
+  /**
+   * Creates a basic catalog, with the following structure:
+   *
+   * default
+   * db1
+   * db2
+   *   - tbl1
+   *   - tbl2
+   *     - part1
+   *     - part2
+   *   - func1
+   */
+  def newBasicCatalog(): ExternalCatalog = {
+    val catalog = newEmptyCatalog()
+    // When testing against a real catalog, the default database may already 
exist
+    catalog.createDatabase(newDb("default"), ignoreIfExists = true)
+    catalog.createDatabase(newDb("db1"), ignoreIfExists = false)
+    catalog.createDatabase(newDb("db2"), ignoreIfExists = false)
+    catalog.createTable("db2", newTable("tbl1", "db2"), ignoreIfExists = false)
+    catalog.createTable("db2", newTable("tbl2", "db2"), ignoreIfExists = false)
+    catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists 
= false)
+    catalog.createFunction("db2", newFunc("func1", Some("db2")))
+    catalog
+  }
+
+  def newFunc(): CatalogFunction = newFunc("funcName")
+
+  def newUriForDatabase(): String = Utils.createTempDir().getAbsolutePath
+
+  def newDb(name: String): CatalogDatabase = {
+    CatalogDatabase(name, name + " description", newUriForDatabase(), 
Map.empty)
+  }
+
+  def newTable(name: String, db: String): CatalogTable = newTable(name, 
Some(db))
+
+  def newTable(name: String, database: Option[String] = None): CatalogTable = {
+    CatalogTable(
+      name = TableIdentifier(name, database),
+      tableType = CatalogTableType.EXTERNAL_TABLE,
+      storage = storageFormat,
+      schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", 
"string")),
+      partitionColumns = Seq(CatalogColumn("a", "int"), CatalogColumn("b", 
"string")))
+  }
+
+  def newFunc(name: String, database: Option[String] = None): CatalogFunction 
= {
+    CatalogFunction(FunctionIdentifier(name, database), funcClass)
+  }
+
+  /**
+   * Whether the catalog's table partitions equal the ones given.
+   * Note: Hive sets some random serde things, so we just compare the specs 
here.
+   */
+  def catalogPartitionsEqual(
+      catalog: ExternalCatalog,
+      db: String,
+      table: String,
+      parts: Seq[CatalogTablePartition]): Boolean = {
+    catalog.listPartitions(db, table).map(_.spec).toSet == 
parts.map(_.spec).toSet
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
index 9531758..63a7b2c 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala
@@ -17,7 +17,14 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
+
 /** Test suite for the [[InMemoryCatalog]]. */
 class InMemoryCatalogSuite extends CatalogTestCases {
-  override protected def newEmptyCatalog(): ExternalCatalog = new 
InMemoryCatalog
+
+  protected override val utils: CatalogTestUtils = new CatalogTestUtils {
+    override val tableInputFormat: String = 
"org.apache.park.SequenceFileInputFormat"
+    override val tableOutputFormat: String = 
"org.apache.park.SequenceFileOutputFormat"
+    override def newEmptyCatalog(): ExternalCatalog = new InMemoryCatalog
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to