Repository: spark Updated Branches: refs/heads/master 92b70576e -> ca9ef86c8
http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala new file mode 100644 index 0000000..e1973ee --- /dev/null +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalogSuite.scala @@ -0,0 +1,864 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.catalog + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} +import org.apache.spark.sql.catalyst.plans.logical.{Range, SubqueryAlias} + + +/** + * Tests for [[SessionCatalog]] that assume that [[InMemoryCatalog]] is correctly implemented. + * + * Note: many of the methods here are very similar to the ones in [[CatalogTestCases]]. + * This is because [[SessionCatalog]] and [[ExternalCatalog]] share many similar method + * signatures but do not extend a common parent. This is largely by design but + * unfortunately leads to very similar test code in two places. + */ +class SessionCatalogSuite extends SparkFunSuite { + private val utils = new CatalogTestUtils { + override val tableInputFormat: String = "com.fruit.eyephone.CameraInputFormat" + override val tableOutputFormat: String = "com.fruit.eyephone.CameraOutputFormat" + override def newEmptyCatalog(): ExternalCatalog = new InMemoryCatalog + } + + import utils._ + + // -------------------------------------------------------------------------- + // Databases + // -------------------------------------------------------------------------- + + test("basic create and list databases") { + val catalog = new SessionCatalog(newEmptyCatalog()) + catalog.createDatabase(newDb("default"), ignoreIfExists = true) + assert(catalog.databaseExists("default")) + assert(!catalog.databaseExists("testing")) + assert(!catalog.databaseExists("testing2")) + catalog.createDatabase(newDb("testing"), ignoreIfExists = false) + assert(catalog.databaseExists("testing")) + assert(catalog.listDatabases().toSet == Set("default", "testing")) + catalog.createDatabase(newDb("testing2"), ignoreIfExists = false) + assert(catalog.listDatabases().toSet == Set("default", "testing", "testing2")) + assert(catalog.databaseExists("testing2")) + assert(!catalog.databaseExists("does_not_exist")) + } + + test("get database when a database exists") { + val catalog = new SessionCatalog(newBasicCatalog()) + val db1 = catalog.getDatabase("db1") + assert(db1.name == "db1") + assert(db1.description.contains("db1")) + } + + test("get database should throw exception when the database does not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.getDatabase("db_that_does_not_exist") + } + } + + test("list databases without pattern") { + val catalog = new SessionCatalog(newBasicCatalog()) + assert(catalog.listDatabases().toSet == Set("default", "db1", "db2")) + } + + test("list databases with pattern") { + val catalog = new SessionCatalog(newBasicCatalog()) + assert(catalog.listDatabases("db").toSet == Set.empty) + assert(catalog.listDatabases("db*").toSet == Set("db1", "db2")) + assert(catalog.listDatabases("*1").toSet == Set("db1")) + assert(catalog.listDatabases("db2").toSet == Set("db2")) + } + + test("drop database") { + val catalog = new SessionCatalog(newBasicCatalog()) + catalog.dropDatabase("db1", ignoreIfNotExists = false, cascade = false) + assert(catalog.listDatabases().toSet == Set("default", "db2")) + } + + test("drop database when the database is not empty") { + // Throw exception if there are functions left + val externalCatalog1 = newBasicCatalog() + val sessionCatalog1 = new SessionCatalog(externalCatalog1) + externalCatalog1.dropTable("db2", "tbl1", ignoreIfNotExists = false) + externalCatalog1.dropTable("db2", "tbl2", ignoreIfNotExists = false) + intercept[AnalysisException] { + sessionCatalog1.dropDatabase("db2", ignoreIfNotExists = false, cascade = false) + } + + // Throw exception if there are tables left + val externalCatalog2 = newBasicCatalog() + val sessionCatalog2 = new SessionCatalog(externalCatalog2) + externalCatalog2.dropFunction("db2", "func1") + intercept[AnalysisException] { + sessionCatalog2.dropDatabase("db2", ignoreIfNotExists = false, cascade = false) + } + + // When cascade is true, it should drop them + val externalCatalog3 = newBasicCatalog() + val sessionCatalog3 = new SessionCatalog(externalCatalog3) + externalCatalog3.dropDatabase("db2", ignoreIfNotExists = false, cascade = true) + assert(sessionCatalog3.listDatabases().toSet == Set("default", "db1")) + } + + test("drop database when the database does not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = false, cascade = false) + } + catalog.dropDatabase("db_that_does_not_exist", ignoreIfNotExists = true, cascade = false) + } + + test("alter database") { + val catalog = new SessionCatalog(newBasicCatalog()) + val db1 = catalog.getDatabase("db1") + // Note: alter properties here because Hive does not support altering other fields + catalog.alterDatabase(db1.copy(properties = Map("k" -> "v3", "good" -> "true"))) + val newDb1 = catalog.getDatabase("db1") + assert(db1.properties.isEmpty) + assert(newDb1.properties.size == 2) + assert(newDb1.properties.get("k") == Some("v3")) + assert(newDb1.properties.get("good") == Some("true")) + } + + test("alter database should throw exception when the database does not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.alterDatabase(newDb("does_not_exist")) + } + } + + test("get/set current database") { + val catalog = new SessionCatalog(newBasicCatalog()) + assert(catalog.getCurrentDatabase == "default") + catalog.setCurrentDatabase("db2") + assert(catalog.getCurrentDatabase == "db2") + intercept[AnalysisException] { + catalog.setCurrentDatabase("deebo") + } + catalog.createDatabase(newDb("deebo"), ignoreIfExists = false) + catalog.setCurrentDatabase("deebo") + assert(catalog.getCurrentDatabase == "deebo") + } + + // -------------------------------------------------------------------------- + // Tables + // -------------------------------------------------------------------------- + + test("create table") { + val externalCatalog = newBasicCatalog() + val sessionCatalog = new SessionCatalog(externalCatalog) + assert(externalCatalog.listTables("db1").isEmpty) + assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + sessionCatalog.createTable(newTable("tbl3", "db1"), ignoreIfExists = false) + sessionCatalog.createTable(newTable("tbl3", "db2"), ignoreIfExists = false) + assert(externalCatalog.listTables("db1").toSet == Set("tbl3")) + assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2", "tbl3")) + // Create table without explicitly specifying database + sessionCatalog.setCurrentDatabase("db1") + sessionCatalog.createTable(newTable("tbl4"), ignoreIfExists = false) + assert(externalCatalog.listTables("db1").toSet == Set("tbl3", "tbl4")) + assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2", "tbl3")) + } + + test("create table when database does not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + // Creating table in non-existent database should always fail + intercept[AnalysisException] { + catalog.createTable(newTable("tbl1", "does_not_exist"), ignoreIfExists = false) + } + intercept[AnalysisException] { + catalog.createTable(newTable("tbl1", "does_not_exist"), ignoreIfExists = true) + } + // Table already exists + intercept[AnalysisException] { + catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false) + } + catalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = true) + } + + test("create temp table") { + val catalog = new SessionCatalog(newBasicCatalog()) + val tempTable1 = Range(1, 10, 1, 10, Seq()) + val tempTable2 = Range(1, 20, 2, 10, Seq()) + catalog.createTempTable("tbl1", tempTable1, ignoreIfExists = false) + catalog.createTempTable("tbl2", tempTable2, ignoreIfExists = false) + assert(catalog.getTempTable("tbl1") == Some(tempTable1)) + assert(catalog.getTempTable("tbl2") == Some(tempTable2)) + assert(catalog.getTempTable("tbl3") == None) + // Temporary table already exists + intercept[AnalysisException] { + catalog.createTempTable("tbl1", tempTable1, ignoreIfExists = false) + } + // Temporary table already exists but we override it + catalog.createTempTable("tbl1", tempTable2, ignoreIfExists = true) + assert(catalog.getTempTable("tbl1") == Some(tempTable2)) + } + + test("drop table") { + val externalCatalog = newBasicCatalog() + val sessionCatalog = new SessionCatalog(externalCatalog) + assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false) + assert(externalCatalog.listTables("db2").toSet == Set("tbl2")) + // Drop table without explicitly specifying database + sessionCatalog.setCurrentDatabase("db2") + sessionCatalog.dropTable(TableIdentifier("tbl2"), ignoreIfNotExists = false) + assert(externalCatalog.listTables("db2").isEmpty) + } + + test("drop table when database/table does not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + // Should always throw exception when the database does not exist + intercept[AnalysisException] { + catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = false) + } + intercept[AnalysisException] { + catalog.dropTable(TableIdentifier("tbl1", Some("unknown_db")), ignoreIfNotExists = true) + } + // Table does not exist + intercept[AnalysisException] { + catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = false) + } + catalog.dropTable(TableIdentifier("unknown_table", Some("db2")), ignoreIfNotExists = true) + } + + test("drop temp table") { + val externalCatalog = newBasicCatalog() + val sessionCatalog = new SessionCatalog(externalCatalog) + val tempTable = Range(1, 10, 2, 10, Seq()) + sessionCatalog.createTempTable("tbl1", tempTable, ignoreIfExists = false) + sessionCatalog.setCurrentDatabase("db2") + assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable)) + assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + // If database is not specified, temp table should be dropped first + sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false) + assert(sessionCatalog.getTempTable("tbl1") == None) + assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + // If temp table does not exist, the table in the current database should be dropped + sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false) + assert(externalCatalog.listTables("db2").toSet == Set("tbl2")) + // If database is specified, temp tables are never dropped + sessionCatalog.createTempTable("tbl1", tempTable, ignoreIfExists = false) + sessionCatalog.createTable(newTable("tbl1", "db2"), ignoreIfExists = false) + sessionCatalog.dropTable(TableIdentifier("tbl1", Some("db2")), ignoreIfNotExists = false) + assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable)) + assert(externalCatalog.listTables("db2").toSet == Set("tbl2")) + } + + test("rename table") { + val externalCatalog = newBasicCatalog() + val sessionCatalog = new SessionCatalog(externalCatalog) + assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + sessionCatalog.renameTable( + TableIdentifier("tbl1", Some("db2")), TableIdentifier("tblone", Some("db2"))) + assert(externalCatalog.listTables("db2").toSet == Set("tblone", "tbl2")) + sessionCatalog.renameTable( + TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbltwo", Some("db2"))) + assert(externalCatalog.listTables("db2").toSet == Set("tblone", "tbltwo")) + // Rename table without explicitly specifying database + sessionCatalog.setCurrentDatabase("db2") + sessionCatalog.renameTable(TableIdentifier("tbltwo"), TableIdentifier("table_two")) + assert(externalCatalog.listTables("db2").toSet == Set("tblone", "table_two")) + // Renaming "db2.tblone" to "db1.tblones" should fail because databases don't match + intercept[AnalysisException] { + sessionCatalog.renameTable( + TableIdentifier("tblone", Some("db2")), TableIdentifier("tblones", Some("db1"))) + } + } + + test("rename table when database/table does not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.renameTable( + TableIdentifier("tbl1", Some("unknown_db")), TableIdentifier("tbl2", Some("unknown_db"))) + } + intercept[AnalysisException] { + catalog.renameTable( + TableIdentifier("unknown_table", Some("db2")), TableIdentifier("tbl2", Some("db2"))) + } + } + + test("rename temp table") { + val externalCatalog = newBasicCatalog() + val sessionCatalog = new SessionCatalog(externalCatalog) + val tempTable = Range(1, 10, 2, 10, Seq()) + sessionCatalog.createTempTable("tbl1", tempTable, ignoreIfExists = false) + sessionCatalog.setCurrentDatabase("db2") + assert(sessionCatalog.getTempTable("tbl1") == Some(tempTable)) + assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + // If database is not specified, temp table should be renamed first + sessionCatalog.renameTable(TableIdentifier("tbl1"), TableIdentifier("tbl3")) + assert(sessionCatalog.getTempTable("tbl1") == None) + assert(sessionCatalog.getTempTable("tbl3") == Some(tempTable)) + assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl2")) + // If database is specified, temp tables are never renamed + sessionCatalog.renameTable( + TableIdentifier("tbl2", Some("db2")), TableIdentifier("tbl4", Some("db2"))) + assert(sessionCatalog.getTempTable("tbl3") == Some(tempTable)) + assert(sessionCatalog.getTempTable("tbl4") == None) + assert(externalCatalog.listTables("db2").toSet == Set("tbl1", "tbl4")) + } + + test("alter table") { + val externalCatalog = newBasicCatalog() + val sessionCatalog = new SessionCatalog(externalCatalog) + val tbl1 = externalCatalog.getTable("db2", "tbl1") + sessionCatalog.alterTable(tbl1.copy(properties = Map("toh" -> "frem"))) + val newTbl1 = externalCatalog.getTable("db2", "tbl1") + assert(!tbl1.properties.contains("toh")) + assert(newTbl1.properties.size == tbl1.properties.size + 1) + assert(newTbl1.properties.get("toh") == Some("frem")) + // Alter table without explicitly specifying database + sessionCatalog.setCurrentDatabase("db2") + sessionCatalog.alterTable(tbl1.copy(name = TableIdentifier("tbl1"))) + val newestTbl1 = externalCatalog.getTable("db2", "tbl1") + assert(newestTbl1 == tbl1) + } + + test("alter table when database/table does not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.alterTable(newTable("tbl1", "unknown_db")) + } + intercept[AnalysisException] { + catalog.alterTable(newTable("unknown_table", "db2")) + } + } + + test("get table") { + val externalCatalog = newBasicCatalog() + val sessionCatalog = new SessionCatalog(externalCatalog) + assert(sessionCatalog.getTable(TableIdentifier("tbl1", Some("db2"))) + == externalCatalog.getTable("db2", "tbl1")) + // Get table without explicitly specifying database + sessionCatalog.setCurrentDatabase("db2") + assert(sessionCatalog.getTable(TableIdentifier("tbl1")) + == externalCatalog.getTable("db2", "tbl1")) + } + + test("get table when database/table does not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.getTable(TableIdentifier("tbl1", Some("unknown_db"))) + } + intercept[AnalysisException] { + catalog.getTable(TableIdentifier("unknown_table", Some("db2"))) + } + } + + test("lookup table relation") { + val externalCatalog = newBasicCatalog() + val sessionCatalog = new SessionCatalog(externalCatalog) + val tempTable1 = Range(1, 10, 1, 10, Seq()) + val metastoreTable1 = externalCatalog.getTable("db2", "tbl1") + sessionCatalog.createTempTable("tbl1", tempTable1, ignoreIfExists = false) + sessionCatalog.setCurrentDatabase("db2") + // If we explicitly specify the database, we'll look up the relation in that database + assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1", Some("db2"))) + == SubqueryAlias("tbl1", CatalogRelation("db2", metastoreTable1))) + // Otherwise, we'll first look up a temporary table with the same name + assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")) + == SubqueryAlias("tbl1", tempTable1)) + // Then, if that does not exist, look up the relation in the current database + sessionCatalog.dropTable(TableIdentifier("tbl1"), ignoreIfNotExists = false) + assert(sessionCatalog.lookupRelation(TableIdentifier("tbl1")) + == SubqueryAlias("tbl1", CatalogRelation("db2", metastoreTable1))) + } + + test("lookup table relation with alias") { + val catalog = new SessionCatalog(newBasicCatalog()) + val alias = "monster" + val tableMetadata = catalog.getTable(TableIdentifier("tbl1", Some("db2"))) + val relation = SubqueryAlias("tbl1", CatalogRelation("db2", tableMetadata)) + val relationWithAlias = + SubqueryAlias(alias, + SubqueryAlias("tbl1", + CatalogRelation("db2", tableMetadata, Some(alias)))) + assert(catalog.lookupRelation( + TableIdentifier("tbl1", Some("db2")), alias = None) == relation) + assert(catalog.lookupRelation( + TableIdentifier("tbl1", Some("db2")), alias = Some(alias)) == relationWithAlias) + } + + test("list tables without pattern") { + val catalog = new SessionCatalog(newBasicCatalog()) + val tempTable = Range(1, 10, 2, 10, Seq()) + catalog.createTempTable("tbl1", tempTable, ignoreIfExists = false) + catalog.createTempTable("tbl4", tempTable, ignoreIfExists = false) + assert(catalog.listTables("db1").toSet == + Set(TableIdentifier("tbl1"), TableIdentifier("tbl4"))) + assert(catalog.listTables("db2").toSet == + Set(TableIdentifier("tbl1"), + TableIdentifier("tbl4"), + TableIdentifier("tbl1", Some("db2")), + TableIdentifier("tbl2", Some("db2")))) + intercept[AnalysisException] { + catalog.listTables("unknown_db") + } + } + + test("list tables with pattern") { + val catalog = new SessionCatalog(newBasicCatalog()) + val tempTable = Range(1, 10, 2, 10, Seq()) + catalog.createTempTable("tbl1", tempTable, ignoreIfExists = false) + catalog.createTempTable("tbl4", tempTable, ignoreIfExists = false) + assert(catalog.listTables("db1", "*").toSet == catalog.listTables("db1").toSet) + assert(catalog.listTables("db2", "*").toSet == catalog.listTables("db2").toSet) + assert(catalog.listTables("db2", "tbl*").toSet == + Set(TableIdentifier("tbl1"), + TableIdentifier("tbl4"), + TableIdentifier("tbl1", Some("db2")), + TableIdentifier("tbl2", Some("db2")))) + assert(catalog.listTables("db2", "*1").toSet == + Set(TableIdentifier("tbl1"), TableIdentifier("tbl1", Some("db2")))) + intercept[AnalysisException] { + catalog.listTables("unknown_db") + } + } + + // -------------------------------------------------------------------------- + // Partitions + // -------------------------------------------------------------------------- + + test("basic create and list partitions") { + val externalCatalog = newEmptyCatalog() + val sessionCatalog = new SessionCatalog(externalCatalog) + sessionCatalog.createDatabase(newDb("mydb"), ignoreIfExists = false) + sessionCatalog.createTable(newTable("tbl", "mydb"), ignoreIfExists = false) + sessionCatalog.createPartitions( + TableIdentifier("tbl", Some("mydb")), Seq(part1, part2), ignoreIfExists = false) + assert(catalogPartitionsEqual(externalCatalog, "mydb", "tbl", Seq(part1, part2))) + // Create partitions without explicitly specifying database + sessionCatalog.setCurrentDatabase("mydb") + sessionCatalog.createPartitions(TableIdentifier("tbl"), Seq(part3), ignoreIfExists = false) + assert(catalogPartitionsEqual(externalCatalog, "mydb", "tbl", Seq(part1, part2, part3))) + } + + test("create partitions when database/table does not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.createPartitions( + TableIdentifier("tbl1", Some("does_not_exist")), Seq(), ignoreIfExists = false) + } + intercept[AnalysisException] { + catalog.createPartitions( + TableIdentifier("does_not_exist", Some("db2")), Seq(), ignoreIfExists = false) + } + } + + test("create partitions that already exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.createPartitions( + TableIdentifier("tbl2", Some("db2")), Seq(part1), ignoreIfExists = false) + } + catalog.createPartitions( + TableIdentifier("tbl2", Some("db2")), Seq(part1), ignoreIfExists = true) + } + + test("drop partitions") { + val externalCatalog = newBasicCatalog() + val sessionCatalog = new SessionCatalog(externalCatalog) + assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part1, part2))) + sessionCatalog.dropPartitions( + TableIdentifier("tbl2", Some("db2")), Seq(part1.spec), ignoreIfNotExists = false) + assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part2))) + // Drop partitions without explicitly specifying database + sessionCatalog.setCurrentDatabase("db2") + sessionCatalog.dropPartitions( + TableIdentifier("tbl2"), Seq(part2.spec), ignoreIfNotExists = false) + assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty) + // Drop multiple partitions at once + sessionCatalog.createPartitions( + TableIdentifier("tbl2", Some("db2")), Seq(part1, part2), ignoreIfExists = false) + assert(catalogPartitionsEqual(externalCatalog, "db2", "tbl2", Seq(part1, part2))) + sessionCatalog.dropPartitions( + TableIdentifier("tbl2", Some("db2")), Seq(part1.spec, part2.spec), ignoreIfNotExists = false) + assert(externalCatalog.listPartitions("db2", "tbl2").isEmpty) + } + + test("drop partitions when database/table does not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.dropPartitions( + TableIdentifier("tbl1", Some("does_not_exist")), Seq(), ignoreIfNotExists = false) + } + intercept[AnalysisException] { + catalog.dropPartitions( + TableIdentifier("does_not_exist", Some("db2")), Seq(), ignoreIfNotExists = false) + } + } + + test("drop partitions that do not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.dropPartitions( + TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), ignoreIfNotExists = false) + } + catalog.dropPartitions( + TableIdentifier("tbl2", Some("db2")), Seq(part3.spec), ignoreIfNotExists = true) + } + + test("get partition") { + val catalog = new SessionCatalog(newBasicCatalog()) + assert(catalog.getPartition( + TableIdentifier("tbl2", Some("db2")), part1.spec).spec == part1.spec) + assert(catalog.getPartition( + TableIdentifier("tbl2", Some("db2")), part2.spec).spec == part2.spec) + // Get partition without explicitly specifying database + catalog.setCurrentDatabase("db2") + assert(catalog.getPartition(TableIdentifier("tbl2"), part1.spec).spec == part1.spec) + assert(catalog.getPartition(TableIdentifier("tbl2"), part2.spec).spec == part2.spec) + // Get non-existent partition + intercept[AnalysisException] { + catalog.getPartition(TableIdentifier("tbl2"), part3.spec) + } + } + + test("get partition when database/table does not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.getPartition(TableIdentifier("tbl1", Some("does_not_exist")), part1.spec) + } + intercept[AnalysisException] { + catalog.getPartition(TableIdentifier("does_not_exist", Some("db2")), part1.spec) + } + } + + test("rename partitions") { + val catalog = new SessionCatalog(newBasicCatalog()) + val newPart1 = part1.copy(spec = Map("a" -> "100", "b" -> "101")) + val newPart2 = part2.copy(spec = Map("a" -> "200", "b" -> "201")) + val newSpecs = Seq(newPart1.spec, newPart2.spec) + catalog.renamePartitions( + TableIdentifier("tbl2", Some("db2")), Seq(part1.spec, part2.spec), newSpecs) + assert(catalog.getPartition( + TableIdentifier("tbl2", Some("db2")), newPart1.spec).spec === newPart1.spec) + assert(catalog.getPartition( + TableIdentifier("tbl2", Some("db2")), newPart2.spec).spec === newPart2.spec) + intercept[AnalysisException] { + catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec) + } + intercept[AnalysisException] { + catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec) + } + // Rename partitions without explicitly specifying database + catalog.setCurrentDatabase("db2") + catalog.renamePartitions(TableIdentifier("tbl2"), newSpecs, Seq(part1.spec, part2.spec)) + assert(catalog.getPartition(TableIdentifier("tbl2"), part1.spec).spec === part1.spec) + assert(catalog.getPartition(TableIdentifier("tbl2"), part2.spec).spec === part2.spec) + intercept[AnalysisException] { + catalog.getPartition(TableIdentifier("tbl2"), newPart1.spec) + } + intercept[AnalysisException] { + catalog.getPartition(TableIdentifier("tbl2"), newPart2.spec) + } + } + + test("rename partitions when database/table does not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.renamePartitions( + TableIdentifier("tbl1", Some("does_not_exist")), Seq(part1.spec), Seq(part2.spec)) + } + intercept[AnalysisException] { + catalog.renamePartitions( + TableIdentifier("does_not_exist", Some("db2")), Seq(part1.spec), Seq(part2.spec)) + } + } + + test("alter partitions") { + val catalog = new SessionCatalog(newBasicCatalog()) + val newLocation = newUriForDatabase() + // Alter but keep spec the same + val oldPart1 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec) + val oldPart2 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec) + catalog.alterPartitions(TableIdentifier("tbl2", Some("db2")), Seq( + oldPart1.copy(storage = storageFormat.copy(locationUri = Some(newLocation))), + oldPart2.copy(storage = storageFormat.copy(locationUri = Some(newLocation))))) + val newPart1 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part1.spec) + val newPart2 = catalog.getPartition(TableIdentifier("tbl2", Some("db2")), part2.spec) + assert(newPart1.storage.locationUri == Some(newLocation)) + assert(newPart2.storage.locationUri == Some(newLocation)) + assert(oldPart1.storage.locationUri != Some(newLocation)) + assert(oldPart2.storage.locationUri != Some(newLocation)) + // Alter partitions without explicitly specifying database + catalog.setCurrentDatabase("db2") + catalog.alterPartitions(TableIdentifier("tbl2"), Seq(oldPart1, oldPart2)) + val newerPart1 = catalog.getPartition(TableIdentifier("tbl2"), part1.spec) + val newerPart2 = catalog.getPartition(TableIdentifier("tbl2"), part2.spec) + assert(oldPart1.storage.locationUri == newerPart1.storage.locationUri) + assert(oldPart2.storage.locationUri == newerPart2.storage.locationUri) + // Alter but change spec, should fail because new partition specs do not exist yet + val badPart1 = part1.copy(spec = Map("a" -> "v1", "b" -> "v2")) + val badPart2 = part2.copy(spec = Map("a" -> "v3", "b" -> "v4")) + intercept[AnalysisException] { + catalog.alterPartitions(TableIdentifier("tbl2", Some("db2")), Seq(badPart1, badPart2)) + } + } + + test("alter partitions when database/table does not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.alterPartitions(TableIdentifier("tbl1", Some("does_not_exist")), Seq(part1)) + } + intercept[AnalysisException] { + catalog.alterPartitions(TableIdentifier("does_not_exist", Some("db2")), Seq(part1)) + } + } + + test("list partitions") { + val catalog = new SessionCatalog(newBasicCatalog()) + assert(catalog.listPartitions(TableIdentifier("tbl2", Some("db2"))).toSet == Set(part1, part2)) + // List partitions without explicitly specifying database + catalog.setCurrentDatabase("db2") + assert(catalog.listPartitions(TableIdentifier("tbl2")).toSet == Set(part1, part2)) + } + + // -------------------------------------------------------------------------- + // Functions + // -------------------------------------------------------------------------- + + test("basic create and list functions") { + val externalCatalog = newEmptyCatalog() + val sessionCatalog = new SessionCatalog(externalCatalog) + sessionCatalog.createDatabase(newDb("mydb"), ignoreIfExists = false) + sessionCatalog.createFunction(newFunc("myfunc", Some("mydb"))) + assert(externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc")) + // Create function without explicitly specifying database + sessionCatalog.setCurrentDatabase("mydb") + sessionCatalog.createFunction(newFunc("myfunc2")) + assert(externalCatalog.listFunctions("mydb", "*").toSet == Set("myfunc", "myfunc2")) + } + + test("create function when database does not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.createFunction(newFunc("func5", Some("does_not_exist"))) + } + } + + test("create function that already exists") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.createFunction(newFunc("func1", Some("db2"))) + } + } + + test("create temp function") { + val catalog = new SessionCatalog(newBasicCatalog()) + val tempFunc1 = newFunc("temp1") + val tempFunc2 = newFunc("temp2") + catalog.createTempFunction(tempFunc1, ignoreIfExists = false) + catalog.createTempFunction(tempFunc2, ignoreIfExists = false) + assert(catalog.getTempFunction("temp1") == Some(tempFunc1)) + assert(catalog.getTempFunction("temp2") == Some(tempFunc2)) + assert(catalog.getTempFunction("temp3") == None) + // Temporary function already exists + intercept[AnalysisException] { + catalog.createTempFunction(tempFunc1, ignoreIfExists = false) + } + // Temporary function is overridden + val tempFunc3 = tempFunc1.copy(className = "something else") + catalog.createTempFunction(tempFunc3, ignoreIfExists = true) + assert(catalog.getTempFunction("temp1") == Some(tempFunc3)) + } + + test("drop function") { + val externalCatalog = newBasicCatalog() + val sessionCatalog = new SessionCatalog(externalCatalog) + assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1")) + sessionCatalog.dropFunction(FunctionIdentifier("func1", Some("db2"))) + assert(externalCatalog.listFunctions("db2", "*").isEmpty) + // Drop function without explicitly specifying database + sessionCatalog.setCurrentDatabase("db2") + sessionCatalog.createFunction(newFunc("func2", Some("db2"))) + assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func2")) + sessionCatalog.dropFunction(FunctionIdentifier("func2")) + assert(externalCatalog.listFunctions("db2", "*").isEmpty) + } + + test("drop function when database/function does not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.dropFunction(FunctionIdentifier("something", Some("does_not_exist"))) + } + intercept[AnalysisException] { + catalog.dropFunction(FunctionIdentifier("does_not_exist")) + } + } + + test("drop temp function") { + val catalog = new SessionCatalog(newBasicCatalog()) + val tempFunc = newFunc("func1") + catalog.createTempFunction(tempFunc, ignoreIfExists = false) + assert(catalog.getTempFunction("func1") == Some(tempFunc)) + catalog.dropTempFunction("func1", ignoreIfNotExists = false) + assert(catalog.getTempFunction("func1") == None) + intercept[AnalysisException] { + catalog.dropTempFunction("func1", ignoreIfNotExists = false) + } + catalog.dropTempFunction("func1", ignoreIfNotExists = true) + } + + test("get function") { + val catalog = new SessionCatalog(newBasicCatalog()) + val expected = CatalogFunction(FunctionIdentifier("func1", Some("db2")), funcClass) + assert(catalog.getFunction(FunctionIdentifier("func1", Some("db2"))) == expected) + // Get function without explicitly specifying database + catalog.setCurrentDatabase("db2") + assert(catalog.getFunction(FunctionIdentifier("func1")) == expected) + } + + test("get function when database/function does not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.getFunction(FunctionIdentifier("func1", Some("does_not_exist"))) + } + intercept[AnalysisException] { + catalog.getFunction(FunctionIdentifier("does_not_exist", Some("db2"))) + } + } + + test("get temp function") { + val externalCatalog = newBasicCatalog() + val sessionCatalog = new SessionCatalog(externalCatalog) + val metastoreFunc = externalCatalog.getFunction("db2", "func1") + val tempFunc = newFunc("func1").copy(className = "something weird") + sessionCatalog.createTempFunction(tempFunc, ignoreIfExists = false) + sessionCatalog.setCurrentDatabase("db2") + // If a database is specified, we'll always return the function in that database + assert(sessionCatalog.getFunction(FunctionIdentifier("func1", Some("db2"))) == metastoreFunc) + // If no database is specified, we'll first return temporary functions + assert(sessionCatalog.getFunction(FunctionIdentifier("func1")) == tempFunc) + // Then, if no such temporary function exist, check the current database + sessionCatalog.dropTempFunction("func1", ignoreIfNotExists = false) + assert(sessionCatalog.getFunction(FunctionIdentifier("func1")) == metastoreFunc) + } + + test("rename function") { + val externalCatalog = newBasicCatalog() + val sessionCatalog = new SessionCatalog(externalCatalog) + val newName = "funcky" + assert(sessionCatalog.getFunction( + FunctionIdentifier("func1", Some("db2"))) == newFunc("func1", Some("db2"))) + assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1")) + sessionCatalog.renameFunction( + FunctionIdentifier("func1", Some("db2")), FunctionIdentifier(newName, Some("db2"))) + assert(sessionCatalog.getFunction( + FunctionIdentifier(newName, Some("db2"))) == newFunc(newName, Some("db2"))) + assert(externalCatalog.listFunctions("db2", "*").toSet == Set(newName)) + // Rename function without explicitly specifying database + sessionCatalog.setCurrentDatabase("db2") + sessionCatalog.renameFunction(FunctionIdentifier(newName), FunctionIdentifier("func1")) + assert(sessionCatalog.getFunction( + FunctionIdentifier("func1")) == newFunc("func1", Some("db2"))) + assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1")) + // Renaming "db2.func1" to "db1.func2" should fail because databases don't match + intercept[AnalysisException] { + sessionCatalog.renameFunction( + FunctionIdentifier("func1", Some("db2")), FunctionIdentifier("func2", Some("db1"))) + } + } + + test("rename function when database/function does not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.renameFunction( + FunctionIdentifier("func1", Some("does_not_exist")), + FunctionIdentifier("func5", Some("does_not_exist"))) + } + intercept[AnalysisException] { + catalog.renameFunction( + FunctionIdentifier("does_not_exist", Some("db2")), + FunctionIdentifier("x", Some("db2"))) + } + } + + test("rename temp function") { + val externalCatalog = newBasicCatalog() + val sessionCatalog = new SessionCatalog(externalCatalog) + val tempFunc = newFunc("func1").copy(className = "something weird") + sessionCatalog.createTempFunction(tempFunc, ignoreIfExists = false) + sessionCatalog.setCurrentDatabase("db2") + // If a database is specified, we'll always rename the function in that database + sessionCatalog.renameFunction( + FunctionIdentifier("func1", Some("db2")), FunctionIdentifier("func3", Some("db2"))) + assert(sessionCatalog.getTempFunction("func1") == Some(tempFunc)) + assert(sessionCatalog.getTempFunction("func3") == None) + assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func3")) + // If no database is specified, we'll first rename temporary functions + sessionCatalog.createFunction(newFunc("func1", Some("db2"))) + sessionCatalog.renameFunction(FunctionIdentifier("func1"), FunctionIdentifier("func4")) + assert(sessionCatalog.getTempFunction("func4") == + Some(tempFunc.copy(name = FunctionIdentifier("func4")))) + assert(sessionCatalog.getTempFunction("func1") == None) + assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func1", "func3")) + // Then, if no such temporary function exist, rename the function in the current database + sessionCatalog.renameFunction(FunctionIdentifier("func1"), FunctionIdentifier("func5")) + assert(sessionCatalog.getTempFunction("func5") == None) + assert(externalCatalog.listFunctions("db2", "*").toSet == Set("func3", "func5")) + } + + test("alter function") { + val catalog = new SessionCatalog(newBasicCatalog()) + assert(catalog.getFunction(FunctionIdentifier("func1", Some("db2"))).className == funcClass) + catalog.alterFunction(newFunc("func1", Some("db2")).copy(className = "muhaha")) + assert(catalog.getFunction(FunctionIdentifier("func1", Some("db2"))).className == "muhaha") + // Alter function without explicitly specifying database + catalog.setCurrentDatabase("db2") + catalog.alterFunction(newFunc("func1").copy(className = "derpy")) + assert(catalog.getFunction(FunctionIdentifier("func1")).className == "derpy") + } + + test("alter function when database/function does not exist") { + val catalog = new SessionCatalog(newBasicCatalog()) + intercept[AnalysisException] { + catalog.alterFunction(newFunc("func5", Some("does_not_exist"))) + } + intercept[AnalysisException] { + catalog.alterFunction(newFunc("funcky", Some("db2"))) + } + } + + test("list functions") { + val catalog = new SessionCatalog(newBasicCatalog()) + val tempFunc1 = newFunc("func1").copy(className = "march") + val tempFunc2 = newFunc("yes_me").copy(className = "april") + catalog.createFunction(newFunc("func2", Some("db2"))) + catalog.createFunction(newFunc("not_me", Some("db2"))) + catalog.createTempFunction(tempFunc1, ignoreIfExists = false) + catalog.createTempFunction(tempFunc2, ignoreIfExists = false) + assert(catalog.listFunctions("db1", "*").toSet == + Set(FunctionIdentifier("func1"), + FunctionIdentifier("yes_me"))) + assert(catalog.listFunctions("db2", "*").toSet == + Set(FunctionIdentifier("func1"), + FunctionIdentifier("yes_me"), + FunctionIdentifier("func1", Some("db2")), + FunctionIdentifier("func2", Some("db2")), + FunctionIdentifier("not_me", Some("db2")))) + assert(catalog.listFunctions("db2", "func*").toSet == + Set(FunctionIdentifier("func1"), + FunctionIdentifier("func1", Some("db2")), + FunctionIdentifier("func2", Some("db2")))) + } + +} http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala index 5185e9a..439501f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveCatalog.scala @@ -24,6 +24,7 @@ import org.apache.thrift.TException import org.apache.spark.Logging import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.analysis.NoSuchItemException import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.hive.client.HiveClient @@ -73,10 +74,10 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit } private def requireDbMatches(db: String, table: CatalogTable): Unit = { - if (table.specifiedDatabase != Some(db)) { + if (table.name.database != Some(db)) { throw new AnalysisException( s"Provided database $db does not much the one specified in the " + - s"table definition (${table.specifiedDatabase.getOrElse("n/a")})") + s"table definition (${table.name.database.getOrElse("n/a")})") } } @@ -160,7 +161,7 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit } override def renameTable(db: String, oldName: String, newName: String): Unit = withClient { - val newTable = client.getTable(db, oldName).copy(name = newName) + val newTable = client.getTable(db, oldName).copy(name = TableIdentifier(newName, Some(db))) client.alterTable(oldName, newTable) } @@ -173,7 +174,7 @@ private[spark] class HiveCatalog(client: HiveClient) extends ExternalCatalog wit */ override def alterTable(db: String, tableDefinition: CatalogTable): Unit = withClient { requireDbMatches(db, tableDefinition) - requireTableExists(db, tableDefinition.name) + requireTableExists(db, tableDefinition.name.table) client.alterTable(tableDefinition) } http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index c70510b..b6c7869 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -118,8 +118,8 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte private def getQualifiedTableName(t: CatalogTable): QualifiedTableName = { QualifiedTableName( - t.specifiedDatabase.getOrElse(client.currentDatabase).toLowerCase, - t.name.toLowerCase) + t.name.database.getOrElse(client.currentDatabase).toLowerCase, + t.name.table.toLowerCase) } /** A cache of Spark SQL data source tables that have been accessed. */ @@ -293,8 +293,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte def newSparkSQLSpecificMetastoreTable(): CatalogTable = { CatalogTable( - specifiedDatabase = Option(dbName), - name = tblName, + name = TableIdentifier(tblName, Option(dbName)), tableType = tableType, schema = Nil, storage = CatalogStorageFormat( @@ -314,8 +313,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte assert(relation.partitionSchema.isEmpty) CatalogTable( - specifiedDatabase = Option(dbName), - name = tblName, + name = TableIdentifier(tblName, Option(dbName)), tableType = tableType, storage = CatalogStorageFormat( locationUri = Some(relation.location.paths.map(_.toUri.toString).head), @@ -432,7 +430,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte alias match { // because hive use things like `_c0` to build the expanded text // currently we cannot support view from "create view v1(c1) as ..." - case None => SubqueryAlias(table.name, hive.parseSql(viewText)) + case None => SubqueryAlias(table.name.table, hive.parseSql(viewText)) case Some(aliasText) => SubqueryAlias(aliasText, hive.parseSql(viewText)) } } else { @@ -618,9 +616,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table) execution.CreateViewAsSelect( - table.copy( - specifiedDatabase = Some(dbName), - name = tblName), + table.copy(name = TableIdentifier(tblName, Some(dbName))), child, allowExisting, replace) @@ -642,7 +638,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte if (hive.convertCTAS && table.storage.serde.isEmpty) { // Do the conversion when spark.sql.hive.convertCTAS is true and the query // does not specify any storage format (file format and storage handler). - if (table.specifiedDatabase.isDefined) { + if (table.name.database.isDefined) { throw new AnalysisException( "Cannot specify database name in a CTAS statement " + "when spark.sql.hive.convertCTAS is set to true.") @@ -650,7 +646,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val mode = if (allowExisting) SaveMode.Ignore else SaveMode.ErrorIfExists CreateTableUsingAsSelect( - TableIdentifier(desc.name), + TableIdentifier(desc.name.table), conf.defaultDataSourceName, temporary = false, Array.empty[String], @@ -671,9 +667,7 @@ private[hive] class HiveMetastoreCatalog(val client: HiveClient, hive: HiveConte val QualifiedTableName(dbName, tblName) = getQualifiedTableName(table) execution.CreateTableAsSelect( - desc.copy( - specifiedDatabase = Some(dbName), - name = tblName), + desc.copy(name = TableIdentifier(tblName, Some(dbName))), child, allowExisting) } @@ -824,7 +818,7 @@ private[hive] case class MetastoreRelation( // We start by constructing an API table as Hive performs several important transformations // internally when converting an API table to a QL table. val tTable = new org.apache.hadoop.hive.metastore.api.Table() - tTable.setTableName(table.name) + tTable.setTableName(table.name.table) tTable.setDbName(table.database) val tableParameters = new java.util.HashMap[String, String]() http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala index 739fbaf..00fc8af 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveQl.scala @@ -60,7 +60,7 @@ private[hive] case class CreateTableAsSelect( override def output: Seq[Attribute] = Seq.empty[Attribute] override lazy val resolved: Boolean = - tableDesc.specifiedDatabase.isDefined && + tableDesc.name.database.isDefined && tableDesc.schema.nonEmpty && tableDesc.storage.serde.isDefined && tableDesc.storage.inputFormat.isDefined && @@ -185,13 +185,10 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging properties: Map[String, String], allowExist: Boolean, replace: Boolean): CreateViewAsSelect = { - val TableIdentifier(viewName, dbName) = extractTableIdent(viewNameParts) - + val tableIdentifier = extractTableIdent(viewNameParts) val originalText = query.source - val tableDesc = CatalogTable( - specifiedDatabase = dbName, - name = viewName, + name = tableIdentifier, tableType = CatalogTableType.VIRTUAL_VIEW, schema = schema, storage = CatalogStorageFormat( @@ -356,12 +353,11 @@ private[hive] class HiveQl(conf: ParserConf) extends SparkQl(conf) with Logging "TOK_TABLELOCATION", "TOK_TABLEPROPERTIES"), children) - val TableIdentifier(tblName, dbName) = extractTableIdent(tableNameParts) + val tableIdentifier = extractTableIdent(tableNameParts) // TODO add bucket support var tableDesc: CatalogTable = CatalogTable( - specifiedDatabase = dbName, - name = tblName, + name = tableIdentifier, tableType = if (externalTable.isDefined) { CatalogTableType.EXTERNAL_TABLE http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala index b32aff2..d214e52 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala @@ -91,7 +91,7 @@ private[hive] trait HiveClient { def dropTable(dbName: String, tableName: String, ignoreIfNotExists: Boolean): Unit /** Alter a table whose name matches the one specified in `table`, assuming it exists. */ - final def alterTable(table: CatalogTable): Unit = alterTable(table.name, table) + final def alterTable(table: CatalogTable): Unit = alterTable(table.name.table, table) /** Updates the given table with new metadata, optionally renaming the table. */ def alterTable(tableName: String, table: CatalogTable): Unit http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala index c108750..3040ec9 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala @@ -35,6 +35,7 @@ import org.apache.hadoop.hive.ql.session.SessionState import org.apache.hadoop.security.UserGroupInformation import org.apache.spark.{Logging, SparkConf, SparkException} +import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.{NoSuchDatabaseException, NoSuchPartitionException} import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.Expression @@ -298,8 +299,7 @@ private[hive] class HiveClientImpl( logDebug(s"Looking up $dbName.$tableName") Option(client.getTable(dbName, tableName, false)).map { h => CatalogTable( - specifiedDatabase = Option(h.getDbName), - name = h.getTableName, + name = TableIdentifier(h.getTableName, Option(h.getDbName)), tableType = h.getTableType match { case HiveTableType.EXTERNAL_TABLE => CatalogTableType.EXTERNAL_TABLE case HiveTableType.MANAGED_TABLE => CatalogTableType.MANAGED_TABLE @@ -545,13 +545,13 @@ private[hive] class HiveClientImpl( } override def renameFunction(db: String, oldName: String, newName: String): Unit = withHiveState { - val catalogFunc = getFunction(db, oldName).copy(name = newName) + val catalogFunc = getFunction(db, oldName).copy(name = FunctionIdentifier(newName, Some(db))) val hiveFunc = toHiveFunction(catalogFunc, db) client.alterFunction(db, oldName, hiveFunc) } override def alterFunction(db: String, func: CatalogFunction): Unit = withHiveState { - client.alterFunction(db, func.name, toHiveFunction(func, db)) + client.alterFunction(db, func.name.funcName, toHiveFunction(func, db)) } override def getFunctionOption( @@ -612,7 +612,7 @@ private[hive] class HiveClientImpl( private def toHiveFunction(f: CatalogFunction, db: String): HiveFunction = { new HiveFunction( - f.name, + f.name.funcName, db, f.className, null, @@ -623,7 +623,8 @@ private[hive] class HiveClientImpl( } private def fromHiveFunction(hf: HiveFunction): CatalogFunction = { - new CatalogFunction(hf.getFunctionName, hf.getClassName) + val name = FunctionIdentifier(hf.getFunctionName, Option(hf.getDbName)) + new CatalogFunction(name, hf.getClassName) } private def toHiveColumn(c: CatalogColumn): FieldSchema = { @@ -639,7 +640,7 @@ private[hive] class HiveClientImpl( } private def toHiveTable(table: CatalogTable): HiveTable = { - val hiveTable = new HiveTable(table.database, table.name) + val hiveTable = new HiveTable(table.database, table.name.table) hiveTable.setTableType(table.tableType match { case CatalogTableType.EXTERNAL_TABLE => HiveTableType.EXTERNAL_TABLE case CatalogTableType.MANAGED_TABLE => HiveTableType.MANAGED_TABLE http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala index 91425d1..391e297 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateTableAsSelect.scala @@ -38,7 +38,7 @@ case class CreateTableAsSelect( allowExisting: Boolean) extends RunnableCommand { - val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database)) + private val tableIdentifier = tableDesc.name override def children: Seq[LogicalPlan] = Seq(query) @@ -93,6 +93,6 @@ case class CreateTableAsSelect( } override def argString: String = { - s"[Database:${tableDesc.database}}, TableName: ${tableDesc.name}, InsertIntoHiveTable]" + s"[Database:${tableDesc.database}}, TableName: ${tableDesc.name.table}, InsertIntoHiveTable]" } } http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala index 6c2b88e..8a1cf2c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/CreateViewAsSelect.scala @@ -44,7 +44,7 @@ private[hive] case class CreateViewAsSelect( assert(tableDesc.schema == Nil || tableDesc.schema.length == childSchema.length) assert(tableDesc.viewText.isDefined) - val tableIdentifier = TableIdentifier(tableDesc.name, Some(tableDesc.database)) + private val tableIdentifier = tableDesc.name override def run(sqlContext: SQLContext): Seq[Row] = { val hiveContext = sqlContext.asInstanceOf[HiveContext] @@ -116,7 +116,7 @@ private[hive] case class CreateViewAsSelect( } val viewText = tableDesc.viewText.get - val viewName = quote(tableDesc.name) + val viewName = quote(tableDesc.name.table) s"SELECT $viewOutput FROM ($viewText) $viewName" } http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala index 2809f94..0dc4fea 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveCatalogSuite.scala @@ -36,15 +36,12 @@ class HiveCatalogSuite extends CatalogTestCases { sparkConf = new SparkConf()).createClient() } - protected override val tableInputFormat: String = - "org.apache.hadoop.mapred.SequenceFileInputFormat" - protected override val tableOutputFormat: String = - "org.apache.hadoop.mapred.SequenceFileOutputFormat" - - protected override def newUriForDatabase(): String = Utils.createTempDir().getAbsolutePath + protected override val utils: CatalogTestUtils = new CatalogTestUtils { + override val tableInputFormat: String = "org.apache.hadoop.mapred.SequenceFileInputFormat" + override val tableOutputFormat: String = "org.apache.hadoop.mapred.SequenceFileOutputFormat" + override def newEmptyCatalog(): ExternalCatalog = new HiveCatalog(client) + } protected override def resetState(): Unit = client.reset() - protected override def newEmptyCatalog(): ExternalCatalog = new HiveCatalog(client) - } http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala index 626550f..1c775db 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveQlSuite.scala @@ -54,8 +54,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { val (desc, exists) = extractTableDesc(s1) assert(exists) - assert(desc.specifiedDatabase == Some("mydb")) - assert(desc.name == "page_view") + assert(desc.name.database == Some("mydb")) + assert(desc.name.table == "page_view") assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE) assert(desc.storage.locationUri == Some("/user/external/page_view")) assert(desc.schema == @@ -100,8 +100,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { val (desc, exists) = extractTableDesc(s2) assert(exists) - assert(desc.specifiedDatabase == Some("mydb")) - assert(desc.name == "page_view") + assert(desc.name.database == Some("mydb")) + assert(desc.name.table == "page_view") assert(desc.tableType == CatalogTableType.EXTERNAL_TABLE) assert(desc.storage.locationUri == Some("/user/external/page_view")) assert(desc.schema == @@ -127,8 +127,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { val s3 = """CREATE TABLE page_view AS SELECT * FROM src""" val (desc, exists) = extractTableDesc(s3) assert(exists == false) - assert(desc.specifiedDatabase == None) - assert(desc.name == "page_view") + assert(desc.name.database == None) + assert(desc.name.table == "page_view") assert(desc.tableType == CatalogTableType.MANAGED_TABLE) assert(desc.storage.locationUri == None) assert(desc.schema == Seq.empty[CatalogColumn]) @@ -162,8 +162,8 @@ class HiveQlSuite extends SparkFunSuite with BeforeAndAfterAll { | ORDER BY key, value""".stripMargin val (desc, exists) = extractTableDesc(s5) assert(exists == false) - assert(desc.specifiedDatabase == None) - assert(desc.name == "ctas2") + assert(desc.name.database == None) + assert(desc.name.table == "ctas2") assert(desc.tableType == CatalogTableType.MANAGED_TABLE) assert(desc.storage.locationUri == None) assert(desc.schema == Seq.empty[CatalogColumn]) http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala index 81420fe..a80c35c 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala @@ -719,8 +719,7 @@ class MetastoreDataSourcesSuite extends QueryTest with SQLTestUtils with TestHiv withTable(tableName) { val schema = StructType(StructField("int", IntegerType, true) :: Nil) val hiveTable = CatalogTable( - specifiedDatabase = Some("default"), - name = tableName, + name = TableIdentifier(tableName, Some("default")), tableType = CatalogTableType.MANAGED_TABLE, schema = Seq.empty, storage = CatalogStorageFormat( http://git-wip-us.apache.org/repos/asf/spark/blob/ca9ef86c/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala index 6292f6c..3d54da1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/client/VersionsSuite.scala @@ -22,6 +22,7 @@ import java.io.File import org.apache.hadoop.util.VersionInfo import org.apache.spark.{Logging, SparkConf, SparkFunSuite} +import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog._ import org.apache.spark.sql.catalyst.expressions.{AttributeReference, EqualTo, Literal, NamedExpression} import org.apache.spark.sql.catalyst.util.quietly @@ -129,8 +130,7 @@ class VersionsSuite extends SparkFunSuite with Logging { test(s"$version: createTable") { val table = CatalogTable( - specifiedDatabase = Option("default"), - name = "src", + name = TableIdentifier("src", Some("default")), tableType = CatalogTableType.MANAGED_TABLE, schema = Seq(CatalogColumn("key", "int")), storage = CatalogStorageFormat( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org