Repository: spark Updated Branches: refs/heads/branch-2.0 fb73663db -> 5cdb7bea5
[SPARK-15093][SQL] create/delete/rename directory for InMemoryCatalog operations if needed ## What changes were proposed in this pull request? following operations have file system operation now: 1. CREATE DATABASE: create a dir 2. DROP DATABASE: delete the dir 3. CREATE TABLE: create a dir 4. DROP TABLE: delete the dir 5. RENAME TABLE: rename the dir 6. CREATE PARTITIONS: create a dir 7. RENAME PARTITIONS: rename the dir 8. DROP PARTITIONS: drop the dir ## How was this patch tested? new tests in `ExternalCatalogSuite` Author: Wenchen Fan <wenc...@databricks.com> Closes #12871 from cloud-fan/catalog. (cherry picked from commit beb16ec556c3b7a23fe0ac7bda66f71abd5c61e9) Signed-off-by: Andrew Or <and...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5cdb7bea Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5cdb7bea Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5cdb7bea Branch: refs/heads/branch-2.0 Commit: 5cdb7bea56d65e5ae7e3d09b04ce3189e6a60f10 Parents: fb73663 Author: Wenchen Fan <wenc...@databricks.com> Authored: Mon May 9 10:47:45 2016 -0700 Committer: Andrew Or <and...@databricks.com> Committed: Mon May 9 10:48:00 2016 -0700 ---------------------------------------------------------------------- .../sql/catalyst/catalog/InMemoryCatalog.scala | 122 ++++++++++++++++++- .../catalyst/catalog/ExternalCatalogSuite.scala | 95 ++++++++++++++- .../apache/spark/sql/internal/SharedState.scala | 2 +- .../spark/sql/execution/command/DDLSuite.scala | 57 +++------ 4 files changed, 232 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5cdb7bea/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala index 676a9e1..982b035 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala @@ -17,8 +17,14 @@ package org.apache.spark.sql.catalyst.catalog +import java.io.IOException + import scala.collection.mutable +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileSystem, Path} + +import org.apache.spark.SparkException import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier} import org.apache.spark.sql.catalyst.util.StringUtils @@ -32,7 +38,7 @@ import org.apache.spark.sql.catalyst.util.StringUtils * * All public methods should be synchronized for thread-safety. */ -class InMemoryCatalog extends ExternalCatalog { +class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends ExternalCatalog { import CatalogTypes.TablePartitionSpec private class TableDesc(var table: CatalogTable) { @@ -104,6 +110,8 @@ class InMemoryCatalog extends ExternalCatalog { } } + private val fs = FileSystem.get(hadoopConfig) + // -------------------------------------------------------------------------- // Databases // -------------------------------------------------------------------------- @@ -116,6 +124,13 @@ class InMemoryCatalog extends ExternalCatalog { throw new AnalysisException(s"Database '${dbDefinition.name}' already exists.") } } else { + try { + fs.mkdirs(new Path(dbDefinition.locationUri)) + } catch { + case e: IOException => + throw new SparkException(s"Unable to create database ${dbDefinition.name} as failed " + + s"to create its directory ${dbDefinition.locationUri}", e) + } catalog.put(dbDefinition.name, new DatabaseDesc(dbDefinition)) } } @@ -135,6 +150,14 @@ class InMemoryCatalog extends ExternalCatalog { } } // Remove the database. + val dbDefinition = catalog(db).db + try { + fs.delete(new Path(dbDefinition.locationUri), true) + } catch { + case e: IOException => + throw new SparkException(s"Unable to drop database ${dbDefinition.name} as failed " + + s"to delete its directory ${dbDefinition.locationUri}", e) + } catalog.remove(db) } else { if (!ignoreIfNotExists) { @@ -182,6 +205,16 @@ class InMemoryCatalog extends ExternalCatalog { throw new AnalysisException(s"Table '$table' already exists in database '$db'") } } else { + if (tableDefinition.tableType == CatalogTableType.MANAGED) { + val dir = new Path(catalog(db).db.locationUri, table) + try { + fs.mkdirs(dir) + } catch { + case e: IOException => + throw new SparkException(s"Unable to create table $table as failed " + + s"to create its directory $dir", e) + } + } catalog(db).tables.put(table, new TableDesc(tableDefinition)) } } @@ -192,6 +225,16 @@ class InMemoryCatalog extends ExternalCatalog { ignoreIfNotExists: Boolean): Unit = synchronized { requireDbExists(db) if (tableExists(db, table)) { + if (getTable(db, table).tableType == CatalogTableType.MANAGED) { + val dir = new Path(catalog(db).db.locationUri, table) + try { + fs.delete(dir, true) + } catch { + case e: IOException => + throw new SparkException(s"Unable to drop table $table as failed " + + s"to delete its directory $dir", e) + } + } catalog(db).tables.remove(table) } else { if (!ignoreIfNotExists) { @@ -205,6 +248,19 @@ class InMemoryCatalog extends ExternalCatalog { requireTableNotExists(db, newName) val oldDesc = catalog(db).tables(oldName) oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, Some(db))) + + if (oldDesc.table.tableType == CatalogTableType.MANAGED) { + val oldDir = new Path(catalog(db).db.locationUri, oldName) + val newDir = new Path(catalog(db).db.locationUri, newName) + try { + fs.rename(oldDir, newDir) + } catch { + case e: IOException => + throw new SparkException(s"Unable to rename table $oldName to $newName as failed " + + s"to rename its directory $oldDir", e) + } + } + catalog(db).tables.put(newName, oldDesc) catalog(db).tables.remove(oldName) } @@ -277,7 +333,26 @@ class InMemoryCatalog extends ExternalCatalog { s"'$db' table '$table':\n$dupSpecsStr") } } - parts.foreach { p => existingParts.put(p.spec, p) } + + val tableDir = new Path(catalog(db).db.locationUri, table) + val partitionColumnNames = getTable(db, table).partitionColumnNames + // TODO: we should follow hive to roll back if one partition path failed to create. + parts.foreach { p => + // If location is set, the partition is using an external partition location and we don't + // need to handle its directory. + if (p.storage.locationUri.isEmpty) { + val partitionPath = partitionColumnNames.flatMap { col => + p.spec.get(col).map(col + "=" + _) + }.mkString("/") + try { + fs.mkdirs(new Path(tableDir, partitionPath)) + } catch { + case e: IOException => + throw new SparkException(s"Unable to create partition path $partitionPath", e) + } + } + existingParts.put(p.spec, p) + } } override def dropPartitions( @@ -295,7 +370,26 @@ class InMemoryCatalog extends ExternalCatalog { s"'$db' table '$table':\n$missingSpecsStr") } } - partSpecs.foreach(existingParts.remove) + + val tableDir = new Path(catalog(db).db.locationUri, table) + val partitionColumnNames = getTable(db, table).partitionColumnNames + // TODO: we should follow hive to roll back if one partition path failed to delete. + partSpecs.foreach { p => + // If location is set, the partition is using an external partition location and we don't + // need to handle its directory. + if (existingParts.contains(p) && existingParts(p).storage.locationUri.isEmpty) { + val partitionPath = partitionColumnNames.flatMap { col => + p.get(col).map(col + "=" + _) + }.mkString("/") + try { + fs.delete(new Path(tableDir, partitionPath), true) + } catch { + case e: IOException => + throw new SparkException(s"Unable to delete partition path $partitionPath", e) + } + } + existingParts.remove(p) + } } override def renamePartitions( @@ -306,9 +400,31 @@ class InMemoryCatalog extends ExternalCatalog { require(specs.size == newSpecs.size, "number of old and new partition specs differ") requirePartitionsExist(db, table, specs) requirePartitionsNotExist(db, table, newSpecs) + + val tableDir = new Path(catalog(db).db.locationUri, table) + val partitionColumnNames = getTable(db, table).partitionColumnNames + // TODO: we should follow hive to roll back if one partition path failed to rename. specs.zip(newSpecs).foreach { case (oldSpec, newSpec) => val newPart = getPartition(db, table, oldSpec).copy(spec = newSpec) val existingParts = catalog(db).tables(table).partitions + + // If location is set, the partition is using an external partition location and we don't + // need to handle its directory. + if (newPart.storage.locationUri.isEmpty) { + val oldPath = partitionColumnNames.flatMap { col => + oldSpec.get(col).map(col + "=" + _) + }.mkString("/") + val newPath = partitionColumnNames.flatMap { col => + newSpec.get(col).map(col + "=" + _) + }.mkString("/") + try { + fs.rename(new Path(tableDir, oldPath), new Path(tableDir, newPath)) + } catch { + case e: IOException => + throw new SparkException(s"Unable to rename partition path $oldPath", e) + } + } + existingParts.remove(oldSpec) existingParts.put(newSpec, newPart) } http://git-wip-us.apache.org/repos/asf/spark/blob/5cdb7bea/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala index e347734..651be26 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.sql.catalyst.catalog +import java.io.File +import java.net.URI + import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkFunSuite @@ -510,6 +513,96 @@ abstract class ExternalCatalogSuite extends SparkFunSuite with BeforeAndAfterEac assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", "func2")) } + // -------------------------------------------------------------------------- + // File System operations + // -------------------------------------------------------------------------- + + private def exists(uri: String, children: String*): Boolean = { + val base = new File(new URI(uri)) + children.foldLeft(base) { + case (parent, child) => new File(parent, child) + }.exists() + } + + test("create/drop database should create/delete the directory") { + val catalog = newBasicCatalog() + val db = newDb("mydb") + catalog.createDatabase(db, ignoreIfExists = false) + assert(exists(db.locationUri)) + + catalog.dropDatabase("mydb", ignoreIfNotExists = false, cascade = false) + assert(!exists(db.locationUri)) + } + + test("create/drop/rename table should create/delete/rename the directory") { + val catalog = newBasicCatalog() + val db = catalog.getDatabase("db1") + val table = CatalogTable( + identifier = TableIdentifier("my_table", Some("db1")), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), + schema = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string")) + ) + + catalog.createTable("db1", table, ignoreIfExists = false) + assert(exists(db.locationUri, "my_table")) + + catalog.renameTable("db1", "my_table", "your_table") + assert(!exists(db.locationUri, "my_table")) + assert(exists(db.locationUri, "your_table")) + + catalog.dropTable("db1", "your_table", ignoreIfNotExists = false) + assert(!exists(db.locationUri, "your_table")) + + val externalTable = CatalogTable( + identifier = TableIdentifier("external_table", Some("db1")), + tableType = CatalogTableType.EXTERNAL, + storage = CatalogStorageFormat( + Some(Utils.createTempDir().getAbsolutePath), + None, None, None, false, Map.empty), + schema = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string")) + ) + catalog.createTable("db1", externalTable, ignoreIfExists = false) + assert(!exists(db.locationUri, "external_table")) + } + + test("create/drop/rename partitions should create/delete/rename the directory") { + val catalog = newBasicCatalog() + val databaseDir = catalog.getDatabase("db1").locationUri + val table = CatalogTable( + identifier = TableIdentifier("tbl", Some("db1")), + tableType = CatalogTableType.MANAGED, + storage = CatalogStorageFormat(None, None, None, None, false, Map.empty), + schema = Seq( + CatalogColumn("col1", "int"), + CatalogColumn("col2", "string"), + CatalogColumn("a", "int"), + CatalogColumn("b", "string")), + partitionColumnNames = Seq("a", "b") + ) + catalog.createTable("db1", table, ignoreIfExists = false) + + catalog.createPartitions("db1", "tbl", Seq(part1, part2), ignoreIfExists = false) + assert(exists(databaseDir, "tbl", "a=1", "b=2")) + assert(exists(databaseDir, "tbl", "a=3", "b=4")) + + catalog.renamePartitions("db1", "tbl", Seq(part1.spec), Seq(part3.spec)) + assert(!exists(databaseDir, "tbl", "a=1", "b=2")) + assert(exists(databaseDir, "tbl", "a=5", "b=6")) + + catalog.dropPartitions("db1", "tbl", Seq(part2.spec, part3.spec), ignoreIfNotExists = false) + assert(!exists(databaseDir, "tbl", "a=3", "b=4")) + assert(!exists(databaseDir, "tbl", "a=5", "b=6")) + + val externalPartition = CatalogTablePartition( + Map("a" -> "7", "b" -> "8"), + CatalogStorageFormat( + Some(Utils.createTempDir().getAbsolutePath), + None, None, None, false, Map.empty) + ) + catalog.createPartitions("db1", "tbl", Seq(externalPartition), ignoreIfExists = false) + assert(!exists(databaseDir, "tbl", "a=7", "b=8")) + } } @@ -563,7 +656,7 @@ abstract class CatalogTestUtils { def newFunc(): CatalogFunction = newFunc("funcName") - def newUriForDatabase(): String = Utils.createTempDir().getAbsolutePath + def newUriForDatabase(): String = Utils.createTempDir().toURI.toString.stripSuffix("/") def newDb(name: String): CatalogDatabase = { CatalogDatabase(name, name + " description", newUriForDatabase(), Map.empty) http://git-wip-us.apache.org/repos/asf/spark/blob/5cdb7bea/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala index ab4af8d..eaf993a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala @@ -43,7 +43,7 @@ private[sql] class SharedState(val sparkContext: SparkContext) { /** * A catalog that interacts with external systems. */ - lazy val externalCatalog: ExternalCatalog = new InMemoryCatalog + lazy val externalCatalog: ExternalCatalog = new InMemoryCatalog(sparkContext.hadoopConfiguration) /** * A classloader used to load all user-added jar. http://git-wip-us.apache.org/repos/asf/spark/blob/5cdb7bea/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala index f72325b..13074a6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala @@ -25,7 +25,7 @@ import org.scalatest.BeforeAndAfterEach import org.apache.spark.sql.{AnalysisException, QueryTest, Row} import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, CatalogStorageFormat} -import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType} +import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, CatalogTableType} import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, SessionCatalog} import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec import org.apache.spark.sql.internal.SQLConf @@ -69,7 +69,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { CatalogDatabase(name, "", sqlContext.conf.warehousePath, Map()), ignoreIfExists = false) } - private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = { + private def generateTable(catalog: SessionCatalog, name: TableIdentifier): CatalogTable = { val storage = CatalogStorageFormat( locationUri = Some(catalog.defaultTablePath(name)), @@ -78,12 +78,23 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { serde = None, compressed = false, serdeProperties = Map()) - catalog.createTable(CatalogTable( + CatalogTable( identifier = name, tableType = CatalogTableType.EXTERNAL, storage = storage, - schema = Seq(), - createTime = 0L), ignoreIfExists = false) + schema = Seq( + CatalogColumn("col1", "int"), + CatalogColumn("col2", "string"), + CatalogColumn("a", "int"), + CatalogColumn("b", "int"), + CatalogColumn("c", "int"), + CatalogColumn("d", "int")), + partitionColumnNames = Seq("a", "b", "c", "d"), + createTime = 0L) + } + + private def createTable(catalog: SessionCatalog, name: TableIdentifier): Unit = { + catalog.createTable(generateTable(catalog, name), ignoreIfExists = false) } private def createTablePartition( @@ -327,23 +338,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { val tableIdent1 = TableIdentifier("tab1", None) createTable(catalog, tableIdent1) val expectedTableIdent = tableIdent1.copy(database = Some("default")) - val expectedLocation = - catalog.getDatabaseMetadata("default").locationUri + "/tab1" - val expectedStorage = - CatalogStorageFormat( - locationUri = Some(expectedLocation), - inputFormat = None, - outputFormat = None, - serde = None, - compressed = false, - serdeProperties = Map()) - val expectedTable = - CatalogTable( - identifier = expectedTableIdent, - tableType = CatalogTableType.EXTERNAL, - storage = expectedStorage, - schema = Seq(), - createTime = 0L) + val expectedTable = generateTable(catalog, expectedTableIdent) assert(catalog.getTableMetadata(tableIdent1) === expectedTable) } @@ -352,23 +347,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach { createDatabase(catalog, "dbx") val tableIdent1 = TableIdentifier("tab1", Some("dbx")) createTable(catalog, tableIdent1) - val expectedLocation = - catalog.getDatabaseMetadata("dbx").locationUri + "/tab1" - val expectedStorage = - CatalogStorageFormat( - locationUri = Some(expectedLocation), - inputFormat = None, - outputFormat = None, - serde = None, - compressed = false, - serdeProperties = Map()) - val expectedTable = - CatalogTable( - identifier = tableIdent1, - tableType = CatalogTableType.EXTERNAL, - storage = expectedStorage, - schema = Seq(), - createTime = 0L) + val expectedTable = generateTable(catalog, tableIdent1) assert(catalog.getTableMetadata(tableIdent1) === expectedTable) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org