[SPARK-13923][SQL] Implement SessionCatalog ## What changes were proposed in this pull request?
As part of the effort to merge `SQLContext` and `HiveContext`, this patch implements an internal catalog called `SessionCatalog` that handles temporary functions and tables and delegates metastore operations to `ExternalCatalog`. Currently, this is still dead code, but in the future it will be part of `SessionState` and will replace `o.a.s.sql.catalyst.analysis.Catalog`. A recent patch #11573 parses Hive commands ourselves in Spark, but still passes the entire query text to Hive. In a future patch, we will use `SessionCatalog` to implement the parsed commands. ## How was this patch tested? 800+ lines of tests in `SessionCatalogSuite`. Author: Andrew Or <and...@databricks.com> Closes #11750 from andrewor14/temp-catalog. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ca9ef86c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ca9ef86c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ca9ef86c Branch: refs/heads/master Commit: ca9ef86c84ee84263f437a979017898f4bed0feb Parents: 92b7057 Author: Andrew Or <and...@databricks.com> Authored: Wed Mar 16 18:02:43 2016 -0700 Committer: Yin Huai <yh...@databricks.com> Committed: Wed Mar 16 18:02:43 2016 -0700 ---------------------------------------------------------------------- .../spark/sql/catalyst/TableIdentifier.scala | 35 - .../sql/catalyst/catalog/InMemoryCatalog.scala | 51 +- .../sql/catalyst/catalog/SessionCatalog.scala | 469 ++++++++++ .../spark/sql/catalyst/catalog/interface.scala | 29 +- .../apache/spark/sql/catalyst/identifiers.scala | 68 ++ .../sql/catalyst/catalog/CatalogTestCases.scala | 171 ++-- .../catalyst/catalog/InMemoryCatalogSuite.scala | 9 +- .../catalyst/catalog/SessionCatalogSuite.scala | 864 +++++++++++++++++++ .../org/apache/spark/sql/hive/HiveCatalog.scala | 9 +- .../spark/sql/hive/HiveMetastoreCatalog.scala | 26 +- .../org/apache/spark/sql/hive/HiveQl.scala | 14 +- .../spark/sql/hive/client/HiveClient.scala | 2 +- .../spark/sql/hive/client/HiveClientImpl.scala | 15 +- .../hive/execution/CreateTableAsSelect.scala | 4 +- .../sql/hive/execution/CreateViewAsSelect.scala | 4 +- .../spark/sql/hive/HiveCatalogSuite.scala | 13 +- .../org/apache/spark/sql/hive/HiveQlSuite.scala | 16 +- .../sql/hive/MetastoreDataSourcesSuite.scala | 3 +- .../spark/sql/hive/client/VersionsSuite.scala | 4 +- 19 files changed, 1604 insertions(+), 202 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala deleted file mode 100644 index 4d4e4de..0000000 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/TableIdentifier.scala +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.catalyst - -/** - * Identifies a `table` in `database`. If `database` is not defined, the current database is used. - */ -private[sql] case class TableIdentifier(table: String, database: Option[String]) { - def this(table: String) = this(table, None) - - override def toString: String = quotedString - - def quotedString: String = database.map(db => s"`$db`.`$table`").getOrElse(s"`$table`") - - def unquotedString: String = database.map(db => s"$db.$table").getOrElse(table) -} - -private[sql] object TableIdentifier { - def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName) -} http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index f3fa795..7ead1dd 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.catalog import scala.collection.mutable import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} /** @@ -68,19 +69,20 @@ class InMemoryCatalog extends ExternalCatalog { private def requireFunctionExists(db: String, funcName: String): Unit = { if (!existsFunction(db, funcName)) { - throw new AnalysisException(s"Function $funcName does not exist in $db database") + throw new AnalysisException(s"Function '$funcName' does not exist in database '$db'") } } private def requireTableExists(db: String, table: String): Unit = { if (!existsTable(db, table)) { - throw new AnalysisException(s"Table $table does not exist in $db database") + throw new AnalysisException(s"Table '$table' does not exist in database '$db'") } } private def requirePartitionExists(db: String, table: String, spec: TablePartitionSpec): Unit = { if (!existsPartition(db, table, spec)) { - throw new AnalysisException(s"Partition does not exist in database $db table $table: $spec") + throw new AnalysisException( + s"Partition does not exist in database '$db' table '$table': '$spec'") } } @@ -93,7 +95,7 @@ class InMemoryCatalog extends ExternalCatalog { ignoreIfExists: Boolean): Unit = synchronized { if (catalog.contains(dbDefinition.name)) { if (!ignoreIfExists) { - throw new AnalysisException(s"Database ${dbDefinition.name} already exists.") + throw new AnalysisException(s"Database '${dbDefinition.name}' already exists.") } } else { catalog.put(dbDefinition.name, new DatabaseDesc(dbDefinition)) @@ -108,17 +110,17 @@ class InMemoryCatalog extends ExternalCatalog { if (!cascade) { // If cascade is false, make sure the database is empty. if (catalog(db).tables.nonEmpty) { - throw new AnalysisException(s"Database $db is not empty. One or more tables exist.") + throw new AnalysisException(s"Database '$db' is not empty. One or more tables exist.") } if (catalog(db).functions.nonEmpty) { - throw new AnalysisException(s"Database $db is not empty. One or more functions exist.") + throw new AnalysisException(s"Database '$db' is not empty. One or more functions exist.") } } // Remove the database. catalog.remove(db) } else { if (!ignoreIfNotExists) { - throw new AnalysisException(s"Database $db does not exist") + throw new AnalysisException(s"Database '$db' does not exist") } } } @@ -156,12 +158,13 @@ class InMemoryCatalog extends ExternalCatalog { tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = synchronized { requireDbExists(db) - if (existsTable(db, tableDefinition.name)) { + val table = tableDefinition.name.table + if (existsTable(db, table)) { if (!ignoreIfExists) { - throw new AnalysisException(s"Table ${tableDefinition.name} already exists in $db database") + throw new AnalysisException(s"Table '$table' already exists in database '$db'") } } else { - catalog(db).tables.put(tableDefinition.name, new TableDesc(tableDefinition)) + catalog(db).tables.put(table, new TableDesc(tableDefinition)) } } @@ -174,7 +177,7 @@ class InMemoryCatalog extends ExternalCatalog { catalog(db).tables.remove(table) } else { if (!ignoreIfNotExists) { - throw new AnalysisException(s"Table $table does not exist in $db database") + throw new AnalysisException(s"Table '$table' does not exist in database '$db'") } } } @@ -182,14 +185,14 @@ class InMemoryCatalog extends ExternalCatalog { override def renameTable(db: String, oldName: String, newName: String): Unit = synchronized { requireTableExists(db, oldName) val oldDesc = catalog(db).tables(oldName) - oldDesc.table = oldDesc.table.copy(name = newName) + oldDesc.table = oldDesc.table.copy(name = TableIdentifier(newName, Some(db))) catalog(db).tables.put(newName, oldDesc) catalog(db).tables.remove(oldName) } override def alterTable(db: String, tableDefinition: CatalogTable): Unit = synchronized { - requireTableExists(db, tableDefinition.name) - catalog(db).tables(tableDefinition.name).table = tableDefinition + requireTableExists(db, tableDefinition.name.table) + catalog(db).tables(tableDefinition.name.table).table = tableDefinition } override def getTable(db: String, table: String): CatalogTable = synchronized { @@ -222,8 +225,8 @@ class InMemoryCatalog extends ExternalCatalog { val dupSpecs = parts.collect { case p if existingParts.contains(p.spec) => p.spec } if (dupSpecs.nonEmpty) { val dupSpecsStr = dupSpecs.mkString("\n===\n") - throw new AnalysisException( - s"The following partitions already exist in database $db table $table:\n$dupSpecsStr") + throw new AnalysisException("The following partitions already exist in database " + + s"'$db' table '$table':\n$dupSpecsStr") } } parts.foreach { p => existingParts.put(p.spec, p) } @@ -240,8 +243,8 @@ class InMemoryCatalog extends ExternalCatalog { val missingSpecs = partSpecs.collect { case s if !existingParts.contains(s) => s } if (missingSpecs.nonEmpty) { val missingSpecsStr = missingSpecs.mkString("\n===\n") - throw new AnalysisException( - s"The following partitions do not exist in database $db table $table:\n$missingSpecsStr") + throw new AnalysisException("The following partitions do not exist in database " + + s"'$db' table '$table':\n$missingSpecsStr") } } partSpecs.foreach(existingParts.remove) @@ -292,10 +295,10 @@ class InMemoryCatalog extends ExternalCatalog { override def createFunction(db: String, func: CatalogFunction): Unit = synchronized { requireDbExists(db) - if (existsFunction(db, func.name)) { - throw new AnalysisException(s"Function $func already exists in $db database") + if (existsFunction(db, func.name.funcName)) { + throw new AnalysisException(s"Function '$func' already exists in '$db' database") } else { - catalog(db).functions.put(func.name, func) + catalog(db).functions.put(func.name.funcName, func) } } @@ -306,14 +309,14 @@ class InMemoryCatalog extends ExternalCatalog { override def renameFunction(db: String, oldName: String, newName: String): Unit = synchronized { requireFunctionExists(db, oldName) - val newFunc = getFunction(db, oldName).copy(name = newName) + val newFunc = getFunction(db, oldName).copy(name = FunctionIdentifier(newName, Some(db))) catalog(db).functions.remove(oldName) catalog(db).functions.put(newName, newFunc) } override def alterFunction(db: String, funcDefinition: CatalogFunction): Unit = synchronized { - requireFunctionExists(db, funcDefinition.name) - catalog(db).functions.put(funcDefinition.name, funcDefinition) + requireFunctionExists(db, funcDefinition.name.funcName) + catalog(db).functions.put(funcDefinition.name.funcName, funcDefinition) } override def getFunction(db: String, funcName: String): CatalogFunction = synchronized { http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala new file mode 100644 index 0000000..4dec042 --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -0,0 +1,469 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.catalog + +import java.util.concurrent.ConcurrentHashMap + +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, SubqueryAlias} + + +/** + * An internal catalog that is used by a Spark Session. This internal catalog serves as a + * proxy to the underlying metastore (e.g. Hive Metastore) and it also manages temporary + * tables and functions of the Spark Session that it belongs to. + */ +class SessionCatalog(externalCatalog: ExternalCatalog) { + import ExternalCatalog._ + + private[this] val tempTables = new ConcurrentHashMap[String, LogicalPlan] + private[this] val tempFunctions = new ConcurrentHashMap[String, CatalogFunction] + + // Note: we track current database here because certain operations do not explicitly + // specify the database (e.g. DROP TABLE my_table). In these cases we must first + // check whether the temporary table or function exists, then, if not, operate on + // the corresponding item in the current database. + private[this] var currentDb = "default" + + // ---------------------------------------------------------------------------- + // Databases + // ---------------------------------------------------------------------------- + // All methods in this category interact directly with the underlying catalog. + // ---------------------------------------------------------------------------- + + def createDatabase(dbDefinition: CatalogDatabase, ignoreIfExists: Boolean): Unit = { + externalCatalog.createDatabase(dbDefinition, ignoreIfExists) + } + + def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = { + externalCatalog.dropDatabase(db, ignoreIfNotExists, cascade) + } + + def alterDatabase(dbDefinition: CatalogDatabase): Unit = { + externalCatalog.alterDatabase(dbDefinition) + } + + def getDatabase(db: String): CatalogDatabase = { + externalCatalog.getDatabase(db) + } + + def databaseExists(db: String): Boolean = { + externalCatalog.databaseExists(db) + } + + def listDatabases(): Seq[String] = { + externalCatalog.listDatabases() + } + + def listDatabases(pattern: String): Seq[String] = { + externalCatalog.listDatabases(pattern) + } + + def getCurrentDatabase: String = currentDb + + def setCurrentDatabase(db: String): Unit = { + if (!databaseExists(db)) { + throw new AnalysisException(s"cannot set current database to non-existent '$db'") + } + currentDb = db + } + + // ---------------------------------------------------------------------------- + // Tables + // ---------------------------------------------------------------------------- + // There are two kinds of tables, temporary tables and metastore tables. + // Temporary tables are isolated across sessions and do not belong to any + // particular database. Metastore tables can be used across multiple + // sessions as their metadata is persisted in the underlying catalog. + // ---------------------------------------------------------------------------- + + // ---------------------------------------------------- + // | Methods that interact with metastore tables only | + // ---------------------------------------------------- + + /** + * Create a metastore table in the database specified in `tableDefinition`. + * If no such database is specified, create it in the current database. + */ + def createTable(tableDefinition: CatalogTable, ignoreIfExists: Boolean): Unit = { + val db = tableDefinition.name.database.getOrElse(currentDb) + val newTableDefinition = tableDefinition.copy( + name = TableIdentifier(tableDefinition.name.table, Some(db))) + externalCatalog.createTable(db, newTableDefinition, ignoreIfExists) + } + + /** + * Alter the metadata of an existing metastore table identified by `tableDefinition`. + * + * If no database is specified in `tableDefinition`, assume the table is in the + * current database. + * + * Note: If the underlying implementation does not support altering a certain field, + * this becomes a no-op. + */ + def alterTable(tableDefinition: CatalogTable): Unit = { + val db = tableDefinition.name.database.getOrElse(currentDb) + val newTableDefinition = tableDefinition.copy( + name = TableIdentifier(tableDefinition.name.table, Some(db))) + externalCatalog.alterTable(db, newTableDefinition) + } + + /** + * Retrieve the metadata of an existing metastore table. + * If no database is specified, assume the table is in the current database. + */ + def getTable(name: TableIdentifier): CatalogTable = { + val db = name.database.getOrElse(currentDb) + externalCatalog.getTable(db, name.table) + } + + // ------------------------------------------------------------- + // | Methods that interact with temporary and metastore tables | + // ------------------------------------------------------------- + + /** + * Create a temporary table. + */ + def createTempTable( + name: String, + tableDefinition: LogicalPlan, + ignoreIfExists: Boolean): Unit = { + if (tempTables.containsKey(name) && !ignoreIfExists) { + throw new AnalysisException(s"Temporary table '$name' already exists.") + } + tempTables.put(name, tableDefinition) + } + + /** + * Rename a table. + * + * If a database is specified in `oldName`, this will rename the table in that database. + * If no database is specified, this will first attempt to rename a temporary table with + * the same name, then, if that does not exist, rename the table in the current database. + * + * This assumes the database specified in `oldName` matches the one specified in `newName`. + */ + def renameTable(oldName: TableIdentifier, newName: TableIdentifier): Unit = { + if (oldName.database != newName.database) { + throw new AnalysisException("rename does not support moving tables across databases") + } + val db = oldName.database.getOrElse(currentDb) + if (oldName.database.isDefined || !tempTables.containsKey(oldName.table)) { + externalCatalog.renameTable(db, oldName.table, newName.table) + } else { + val table = tempTables.remove(oldName.table) + tempTables.put(newName.table, table) + } + } + + /** + * Drop a table. + * + * If a database is specified in `name`, this will drop the table from that database. + * If no database is specified, this will first attempt to drop a temporary table with + * the same name, then, if that does not exist, drop the table from the current database. + */ + def dropTable(name: TableIdentifier, ignoreIfNotExists: Boolean): Unit = { + val db = name.database.getOrElse(currentDb) + if (name.database.isDefined || !tempTables.containsKey(name.table)) { + externalCatalog.dropTable(db, name.table, ignoreIfNotExists) + } else { + tempTables.remove(name.table) + } + } + + /** + * Return a [[LogicalPlan]] that represents the given table. + * + * If a database is specified in `name`, this will return the table from that database. + * If no database is specified, this will first attempt to return a temporary table with + * the same name, then, if that does not exist, return the table from the current database. + */ + def lookupRelation(name: TableIdentifier, alias: Option[String] = None): LogicalPlan = { + val db = name.database.getOrElse(currentDb) + val relation = + if (name.database.isDefined || !tempTables.containsKey(name.table)) { + val metadata = externalCatalog.getTable(db, name.table) + CatalogRelation(db, metadata, alias) + } else { + tempTables.get(name.table) + } + val tableWithQualifiers = SubqueryAlias(name.table, relation) + // If an alias was specified by the lookup, wrap the plan in a subquery so that + // attributes are properly qualified with this alias. + alias.map(a => SubqueryAlias(a, tableWithQualifiers)).getOrElse(tableWithQualifiers) + } + + /** + * List all tables in the specified database, including temporary tables. + */ + def listTables(db: String): Seq[TableIdentifier] = { + val dbTables = externalCatalog.listTables(db).map { t => TableIdentifier(t, Some(db)) } + val _tempTables = tempTables.keys().asScala.map { t => TableIdentifier(t) } + dbTables ++ _tempTables + } + + /** + * List all matching tables in the specified database, including temporary tables. + */ + def listTables(db: String, pattern: String): Seq[TableIdentifier] = { + val dbTables = + externalCatalog.listTables(db, pattern).map { t => TableIdentifier(t, Some(db)) } + val regex = pattern.replaceAll("\\*", ".*").r + val _tempTables = tempTables.keys().asScala + .filter { t => regex.pattern.matcher(t).matches() } + .map { t => TableIdentifier(t) } + dbTables ++ _tempTables + } + + /** + * Return a temporary table exactly as it was stored. + * For testing only. + */ + private[catalog] def getTempTable(name: String): Option[LogicalPlan] = { + Option(tempTables.get(name)) + } + + // ---------------------------------------------------------------------------- + // Partitions + // ---------------------------------------------------------------------------- + // All methods in this category interact directly with the underlying catalog. + // These methods are concerned with only metastore tables. + // ---------------------------------------------------------------------------- + + // TODO: We need to figure out how these methods interact with our data source + // tables. For such tables, we do not store values of partitioning columns in + // the metastore. For now, partition values of a data source table will be + // automatically discovered when we load the table. + + /** + * Create partitions in an existing table, assuming it exists. + * If no database is specified, assume the table is in the current database. + */ + def createPartitions( + tableName: TableIdentifier, + parts: Seq[CatalogTablePartition], + ignoreIfExists: Boolean): Unit = { + val db = tableName.database.getOrElse(currentDb) + externalCatalog.createPartitions(db, tableName.table, parts, ignoreIfExists) + } + + /** + * Drop partitions from a table, assuming they exist. + * If no database is specified, assume the table is in the current database. + */ + def dropPartitions( + tableName: TableIdentifier, + parts: Seq[TablePartitionSpec], + ignoreIfNotExists: Boolean): Unit = { + val db = tableName.database.getOrElse(currentDb) + externalCatalog.dropPartitions(db, tableName.table, parts, ignoreIfNotExists) + } + + /** + * Override the specs of one or many existing table partitions, assuming they exist. + * + * This assumes index i of `specs` corresponds to index i of `newSpecs`. + * If no database is specified, assume the table is in the current database. + */ + def renamePartitions( + tableName: TableIdentifier, + specs: Seq[TablePartitionSpec], + newSpecs: Seq[TablePartitionSpec]): Unit = { + val db = tableName.database.getOrElse(currentDb) + externalCatalog.renamePartitions(db, tableName.table, specs, newSpecs) + } + + /** + * Alter one or many table partitions whose specs that match those specified in `parts`, + * assuming the partitions exist. + * + * If no database is specified, assume the table is in the current database. + * + * Note: If the underlying implementation does not support altering a certain field, + * this becomes a no-op. + */ + def alterPartitions(tableName: TableIdentifier, parts: Seq[CatalogTablePartition]): Unit = { + val db = tableName.database.getOrElse(currentDb) + externalCatalog.alterPartitions(db, tableName.table, parts) + } + + /** + * Retrieve the metadata of a table partition, assuming it exists. + * If no database is specified, assume the table is in the current database. + */ + def getPartition(tableName: TableIdentifier, spec: TablePartitionSpec): CatalogTablePartition = { + val db = tableName.database.getOrElse(currentDb) + externalCatalog.getPartition(db, tableName.table, spec) + } + + /** + * List all partitions in a table, assuming it exists. + * If no database is specified, assume the table is in the current database. + */ + def listPartitions(tableName: TableIdentifier): Seq[CatalogTablePartition] = { + val db = tableName.database.getOrElse(currentDb) + externalCatalog.listPartitions(db, tableName.table) + } + + // ---------------------------------------------------------------------------- + // Functions + // ---------------------------------------------------------------------------- + // There are two kinds of functions, temporary functions and metastore + // functions (permanent UDFs). Temporary functions are isolated across + // sessions. Metastore functions can be used across multiple sessions as + // their metadata is persisted in the underlying catalog. + // ---------------------------------------------------------------------------- + + // ------------------------------------------------------- + // | Methods that interact with metastore functions only | + // ------------------------------------------------------- + + /** + * Create a metastore function in the database specified in `funcDefinition`. + * If no such database is specified, create it in the current database. + */ + def createFunction(funcDefinition: CatalogFunction): Unit = { + val db = funcDefinition.name.database.getOrElse(currentDb) + val newFuncDefinition = funcDefinition.copy( + name = FunctionIdentifier(funcDefinition.name.funcName, Some(db))) + externalCatalog.createFunction(db, newFuncDefinition) + } + + /** + * Drop a metastore function. + * If no database is specified, assume the function is in the current database. + */ + def dropFunction(name: FunctionIdentifier): Unit = { + val db = name.database.getOrElse(currentDb) + externalCatalog.dropFunction(db, name.funcName) + } + + /** + * Alter a metastore function whose name that matches the one specified in `funcDefinition`. + * + * If no database is specified in `funcDefinition`, assume the function is in the + * current database. + * + * Note: If the underlying implementation does not support altering a certain field, + * this becomes a no-op. + */ + def alterFunction(funcDefinition: CatalogFunction): Unit = { + val db = funcDefinition.name.database.getOrElse(currentDb) + val newFuncDefinition = funcDefinition.copy( + name = FunctionIdentifier(funcDefinition.name.funcName, Some(db))) + externalCatalog.alterFunction(db, newFuncDefinition) + } + + // ---------------------------------------------------------------- + // | Methods that interact with temporary and metastore functions | + // ---------------------------------------------------------------- + + /** + * Create a temporary function. + * This assumes no database is specified in `funcDefinition`. + */ + def createTempFunction(funcDefinition: CatalogFunction, ignoreIfExists: Boolean): Unit = { + require(funcDefinition.name.database.isEmpty, + "attempted to create a temporary function while specifying a database") + val name = funcDefinition.name.funcName + if (tempFunctions.containsKey(name) && !ignoreIfExists) { + throw new AnalysisException(s"Temporary function '$name' already exists.") + } + tempFunctions.put(name, funcDefinition) + } + + /** + * Drop a temporary function. + */ + // TODO: The reason that we distinguish dropFunction and dropTempFunction is that + // Hive has DROP FUNCTION and DROP TEMPORARY FUNCTION. We may want to consolidate + // dropFunction and dropTempFunction. + def dropTempFunction(name: String, ignoreIfNotExists: Boolean): Unit = { + if (!tempFunctions.containsKey(name) && !ignoreIfNotExists) { + throw new AnalysisException( + s"Temporary function '$name' cannot be dropped because it does not exist!") + } + tempFunctions.remove(name) + } + + /** + * Rename a function. + * + * If a database is specified in `oldName`, this will rename the function in that database. + * If no database is specified, this will first attempt to rename a temporary function with + * the same name, then, if that does not exist, rename the function in the current database. + * + * This assumes the database specified in `oldName` matches the one specified in `newName`. + */ + def renameFunction(oldName: FunctionIdentifier, newName: FunctionIdentifier): Unit = { + if (oldName.database != newName.database) { + throw new AnalysisException("rename does not support moving functions across databases") + } + val db = oldName.database.getOrElse(currentDb) + if (oldName.database.isDefined || !tempFunctions.containsKey(oldName.funcName)) { + externalCatalog.renameFunction(db, oldName.funcName, newName.funcName) + } else { + val func = tempFunctions.remove(oldName.funcName) + val newFunc = func.copy(name = func.name.copy(funcName = newName.funcName)) + tempFunctions.put(newName.funcName, newFunc) + } + } + + /** + * Retrieve the metadata of an existing function. + * + * If a database is specified in `name`, this will return the function in that database. + * If no database is specified, this will first attempt to return a temporary function with + * the same name, then, if that does not exist, return the function in the current database. + */ + def getFunction(name: FunctionIdentifier): CatalogFunction = { + val db = name.database.getOrElse(currentDb) + if (name.database.isDefined || !tempFunctions.containsKey(name.funcName)) { + externalCatalog.getFunction(db, name.funcName) + } else { + tempFunctions.get(name.funcName) + } + } + + // TODO: implement lookupFunction that returns something from the registry itself + + /** + * List all matching functions in the specified database, including temporary functions. + */ + def listFunctions(db: String, pattern: String): Seq[FunctionIdentifier] = { + val dbFunctions = + externalCatalog.listFunctions(db, pattern).map { f => FunctionIdentifier(f, Some(db)) } + val regex = pattern.replaceAll("\\*", ".*").r + val _tempFunctions = tempFunctions.keys().asScala + .filter { f => regex.pattern.matcher(f).matches() } + .map { f => FunctionIdentifier(f) } + dbFunctions ++ _tempFunctions + } + + /** + * Return a temporary function. For testing only. + */ + private[catalog] def getTempFunction(name: String): Option[CatalogFunction] = { + Option(tempFunctions.get(name)) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index db34af3..c4e4961 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -20,6 +20,9 @@ package org.apache.spark.sql.catalyst.catalog import javax.annotation.Nullable import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan} /** @@ -167,7 +170,7 @@ abstract class ExternalCatalog { * @param name name of the function * @param className fully qualified class name, e.g. "org.apache.spark.util.MyFunc" */ -case class CatalogFunction(name: String, className: String) +case class CatalogFunction(name: FunctionIdentifier, className: String) /** @@ -211,8 +214,7 @@ case class CatalogTablePartition( * future once we have a better understanding of how we want to handle skewed columns. */ case class CatalogTable( - specifiedDatabase: Option[String], - name: String, + name: TableIdentifier, tableType: CatalogTableType, storage: CatalogStorageFormat, schema: Seq[CatalogColumn], @@ -226,12 +228,12 @@ case class CatalogTable( viewText: Option[String] = None) { /** Return the database this table was specified to belong to, assuming it exists. */ - def database: String = specifiedDatabase.getOrElse { + def database: String = name.database.getOrElse { throw new AnalysisException(s"table $name did not specify database") } /** Return the fully qualified name of this table, assuming the database was specified. */ - def qualifiedName: String = s"$database.$name" + def qualifiedName: String = name.unquotedString /** Syntactic sugar to update a field in `storage`. */ def withNewStorage( @@ -272,3 +274,20 @@ object ExternalCatalog { */ type TablePartitionSpec = Map[String, String] } + + +/** + * A [[LogicalPlan]] that wraps [[CatalogTable]]. + */ +case class CatalogRelation( + db: String, + metadata: CatalogTable, + alias: Option[String] = None) + extends LeafNode { + + // TODO: implement this + override def output: Seq[Attribute] = Seq.empty + + require(metadata.name.database == Some(db), + "provided database does not much the one specified in the table definition") +} http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala new file mode 100644 index 0000000..87f4d1b --- /dev/null +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/identifiers.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst + + +/** + * An identifier that optionally specifies a database. + * + * Format (unquoted): "name" or "db.name" + * Format (quoted): "`name`" or "`db`.`name`" + */ +sealed trait IdentifierWithDatabase { + val name: String + def database: Option[String] + def quotedString: String = database.map(db => s"`$db`.`$name`").getOrElse(s"`$name`") + def unquotedString: String = database.map(db => s"$db.$name").getOrElse(name) + override def toString: String = quotedString +} + + +/** + * Identifies a table in a database. + * If `database` is not defined, the current database is used. + */ +case class TableIdentifier(table: String, database: Option[String]) + extends IdentifierWithDatabase { + + override val name: String = table + + def this(name: String) = this(name, None) + +} + +object TableIdentifier { + def apply(tableName: String): TableIdentifier = new TableIdentifier(tableName) +} + + +/** + * Identifies a function in a database. + * If `database` is not defined, the current database is used. + */ +case class FunctionIdentifier(funcName: String, database: Option[String]) + extends IdentifierWithDatabase { + + override val name: String = funcName + + def this(name: String) = this(name, None) +} + +object FunctionIdentifier { + def apply(funcName: String): FunctionIdentifier = new FunctionIdentifier(funcName) +} http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala index b03ba81..a1ea619 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/CatalogTestCases.scala @@ -21,6 +21,8 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.util.Utils /** @@ -29,23 +31,10 @@ import org.apache.spark.sql.AnalysisException * Implementations of the [[ExternalCatalog]] interface can create test suites by extending this. */ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { - private lazy val storageFormat = CatalogStorageFormat( - locationUri = None, - inputFormat = Some(tableInputFormat), - outputFormat = Some(tableOutputFormat), - serde = None, - serdeProperties = Map.empty) - private lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat) - private lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat) - private lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat) - private val funcClass = "org.apache.spark.myFunc" - - // Things subclasses should override - protected val tableInputFormat: String = "org.apache.park.serde.MyInputFormat" - protected val tableOutputFormat: String = "org.apache.park.serde.MyOutputFormat" - protected def newUriForDatabase(): String = "uri" + protected val utils: CatalogTestUtils + import utils._ + protected def resetState(): Unit = { } - protected def newEmptyCatalog(): ExternalCatalog // Clear all state after each test override def afterEach(): Unit = { @@ -56,62 +45,6 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { } } - /** - * Creates a basic catalog, with the following structure: - * - * default - * db1 - * db2 - * - tbl1 - * - tbl2 - * - part1 - * - part2 - * - func1 - */ - private def newBasicCatalog(): ExternalCatalog = { - val catalog = newEmptyCatalog() - // When testing against a real catalog, the default database may already exist - catalog.createDatabase(newDb("default"), ignoreIfExists = true) - catalog.createDatabase(newDb("db1"), ignoreIfExists = false) - catalog.createDatabase(newDb("db2"), ignoreIfExists = false) - catalog.createTable("db2", newTable("tbl1", "db2"), ignoreIfExists = false) - catalog.createTable("db2", newTable("tbl2", "db2"), ignoreIfExists = false) - catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false) - catalog.createFunction("db2", newFunc("func1")) - catalog - } - - private def newFunc(): CatalogFunction = CatalogFunction("funcname", funcClass) - - private def newDb(name: String): CatalogDatabase = { - CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty) - } - - private def newTable(name: String, db: String): CatalogTable = { - CatalogTable( - specifiedDatabase = Some(db), - name = name, - tableType = CatalogTableType.EXTERNAL_TABLE, - storage = storageFormat, - schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")), - partitionColumns = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string"))) - } - - private def newFunc(name: String): CatalogFunction = CatalogFunction(name, funcClass) - - /** - * Whether the catalog's table partitions equal the ones given. - * Note: Hive sets some random serde things, so we just compare the specs here. - */ - private def catalogPartitionsEqual( - catalog: ExternalCatalog, - db: String, - table: String, - parts: Seq[CatalogTablePartition]): Boolean = { - catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet - } - - // -------------------------------------------------------------------------- // Databases // -------------------------------------------------------------------------- @@ -277,7 +210,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { } test("get table") { - assert(newBasicCatalog().getTable("db2", "tbl1").name == "tbl1") + assert(newBasicCatalog().getTable("db2", "tbl1").name.table == "tbl1") } test("get table when database/table does not exist") { @@ -409,7 +342,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { test("alter partitions") { val catalog = newBasicCatalog() - try{ + try { // Note: Before altering table partitions in Hive, you *must* set the current database // to the one that contains the table of interest. Otherwise you will end up with the // most helpful error message ever: "Unable to alter partition. alter is not possible." @@ -498,7 +431,8 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { test("get function") { val catalog = newBasicCatalog() - assert(catalog.getFunction("db2", "func1") == newFunc("func1")) + assert(catalog.getFunction("db2", "func1") == + CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass)) intercept[AnalysisException] { catalog.getFunction("db2", "does_not_exist") } @@ -517,7 +451,7 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { assert(catalog.getFunction("db2", "func1").className == funcClass) catalog.renameFunction("db2", "func1", newName) intercept[AnalysisException] { catalog.getFunction("db2", "func1") } - assert(catalog.getFunction("db2", newName).name == newName) + assert(catalog.getFunction("db2", newName).name.funcName == newName) assert(catalog.getFunction("db2", newName).className == funcClass) intercept[AnalysisException] { catalog.renameFunction("db2", "does_not_exist", "me") } } @@ -553,3 +487,88 @@ abstract class CatalogTestCases extends SparkFunSuite with BeforeAndAfterEach { } } + + +/** + * A collection of utility fields and methods for tests related to the [[ExternalCatalog]]. + */ +abstract class CatalogTestUtils { + + // Unimplemented methods + val tableInputFormat: String + val tableOutputFormat: String + def newEmptyCatalog(): ExternalCatalog + + // These fields must be lazy because they rely on fields that are not implemented yet + lazy val storageFormat = CatalogStorageFormat( + locationUri = None, + inputFormat = Some(tableInputFormat), + outputFormat = Some(tableOutputFormat), + serde = None, + serdeProperties = Map.empty) + lazy val part1 = CatalogTablePartition(Map("a" -> "1", "b" -> "2"), storageFormat) + lazy val part2 = CatalogTablePartition(Map("a" -> "3", "b" -> "4"), storageFormat) + lazy val part3 = CatalogTablePartition(Map("a" -> "5", "b" -> "6"), storageFormat) + lazy val funcClass = "org.apache.spark.myFunc" + + /** + * Creates a basic catalog, with the following structure: + * + * default + * db1 + * db2 + * - tbl1 + * - tbl2 + * - part1 + * - part2 + * - func1 + */ + def newBasicCatalog(): ExternalCatalog = { + val catalog = newEmptyCatalog() + // When testing against a real catalog, the default database may already exist + catalog.createDatabase(newDb("default"), ignoreIfExists = true) + catalog.createDatabase(newDb("db1"), ignoreIfExists = false) + catalog.createDatabase(newDb("db2"), ignoreIfExists = false) + catalog.createTable("db2", newTable("tbl1", "db2"), ignoreIfExists = false) + catalog.createTable("db2", newTable("tbl2", "db2"), ignoreIfExists = false) + catalog.createPartitions("db2", "tbl2", Seq(part1, part2), ignoreIfExists = false) + catalog.createFunction("db2", newFunc("func1", Some("db2"))) + catalog + } + + def newFunc(): CatalogFunction = newFunc("funcName") + + def newUriForDatabase(): String = Utils.createTempDir().getAbsolutePath + + def newDb(name: String): CatalogDatabase = { + CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty) + } + + def newTable(name: String, db: String): CatalogTable = newTable(name, Some(db)) + + def newTable(name: String, database: Option[String] = None): CatalogTable = { + CatalogTable( + name = TableIdentifier(name, database), + tableType = CatalogTableType.EXTERNAL_TABLE, + storage = storageFormat, + schema = Seq(CatalogColumn("col1", "int"), CatalogColumn("col2", "string")), + partitionColumns = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string"))) + } + + def newFunc(name: String, database: Option[String] = None): CatalogFunction = { + CatalogFunction(FunctionIdentifier(name, database), funcClass) + } + + /** + * Whether the catalog's table partitions equal the ones given. + * Note: Hive sets some random serde things, so we just compare the specs here. + */ + def catalogPartitionsEqual( + catalog: ExternalCatalog, + db: String, + table: String, + parts: Seq[CatalogTablePartition]): Boolean = { + catalog.listPartitions(db, table).map(_.spec).toSet == parts.map(_.spec).toSet + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala index 9531758..63a7b2c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalogSuite.scala @@ -17,7 +17,14 @@ package org.apache.spark.sql.catalyst.catalog + /** Test suite for the [[InMemoryCatalog]]. */ class InMemoryCatalogSuite extends CatalogTestCases { - override protected def newEmptyCatalog(): ExternalCatalog = new InMemoryCatalog + + protected override val utils: CatalogTestUtils = new CatalogTestUtils { + override val tableInputFormat: String = "org.apache.park.SequenceFileInputFormat" + override val tableOutputFormat: String = "org.apache.park.SequenceFileOutputFormat" + override def newEmptyCatalog(): ExternalCatalog = new InMemoryCatalog + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org