Repository: spark
Updated Branches:
  refs/heads/branch-2.0 fb73663db -> 5cdb7bea5


[SPARK-15093][SQL] create/delete/rename directory for InMemoryCatalog 
operations if needed

## What changes were proposed in this pull request?

following operations have file system operation now:

1. CREATE DATABASE: create a dir
2. DROP DATABASE: delete the dir
3. CREATE TABLE: create a dir
4. DROP TABLE: delete the dir
5. RENAME TABLE: rename the dir
6. CREATE PARTITIONS: create a dir
7. RENAME PARTITIONS: rename the dir
8. DROP PARTITIONS: drop the dir

## How was this patch tested?

new tests in `ExternalCatalogSuite`

Author: Wenchen Fan <wenc...@databricks.com>

Closes #12871 from cloud-fan/catalog.

(cherry picked from commit beb16ec556c3b7a23fe0ac7bda66f71abd5c61e9)
Signed-off-by: Andrew Or <and...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5cdb7bea
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5cdb7bea
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5cdb7bea

Branch: refs/heads/branch-2.0
Commit: 5cdb7bea56d65e5ae7e3d09b04ce3189e6a60f10
Parents: fb73663
Author: Wenchen Fan <wenc...@databricks.com>
Authored: Mon May 9 10:47:45 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Mon May 9 10:48:00 2016 -0700

----------------------------------------------------------------------
 .../sql/catalyst/catalog/InMemoryCatalog.scala  | 122 ++++++++++++++++++-
 .../catalyst/catalog/ExternalCatalogSuite.scala |  95 ++++++++++++++-
 .../apache/spark/sql/internal/SharedState.scala |   2 +-
 .../spark/sql/execution/command/DDLSuite.scala  |  57 +++------
 4 files changed, 232 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/5cdb7bea/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
index 676a9e1..982b035 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/InMemoryCatalog.scala
@@ -17,8 +17,14 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
+import java.io.IOException
+
 import scala.collection.mutable
 
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileSystem, Path}
+
+import org.apache.spark.SparkException
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, TableIdentifier}
 import org.apache.spark.sql.catalyst.util.StringUtils
@@ -32,7 +38,7 @@ import org.apache.spark.sql.catalyst.util.StringUtils
  *
  * All public methods should be synchronized for thread-safety.
  */
-class InMemoryCatalog extends ExternalCatalog {
+class InMemoryCatalog(hadoopConfig: Configuration = new Configuration) extends 
ExternalCatalog {
   import CatalogTypes.TablePartitionSpec
 
   private class TableDesc(var table: CatalogTable) {
@@ -104,6 +110,8 @@ class InMemoryCatalog extends ExternalCatalog {
     }
   }
 
+  private val fs = FileSystem.get(hadoopConfig)
+
   // --------------------------------------------------------------------------
   // Databases
   // --------------------------------------------------------------------------
@@ -116,6 +124,13 @@ class InMemoryCatalog extends ExternalCatalog {
         throw new AnalysisException(s"Database '${dbDefinition.name}' already 
exists.")
       }
     } else {
+      try {
+        fs.mkdirs(new Path(dbDefinition.locationUri))
+      } catch {
+        case e: IOException =>
+          throw new SparkException(s"Unable to create database 
${dbDefinition.name} as failed " +
+            s"to create its directory ${dbDefinition.locationUri}", e)
+      }
       catalog.put(dbDefinition.name, new DatabaseDesc(dbDefinition))
     }
   }
@@ -135,6 +150,14 @@ class InMemoryCatalog extends ExternalCatalog {
         }
       }
       // Remove the database.
+      val dbDefinition = catalog(db).db
+      try {
+        fs.delete(new Path(dbDefinition.locationUri), true)
+      } catch {
+        case e: IOException =>
+          throw new SparkException(s"Unable to drop database 
${dbDefinition.name} as failed " +
+            s"to delete its directory ${dbDefinition.locationUri}", e)
+      }
       catalog.remove(db)
     } else {
       if (!ignoreIfNotExists) {
@@ -182,6 +205,16 @@ class InMemoryCatalog extends ExternalCatalog {
         throw new AnalysisException(s"Table '$table' already exists in 
database '$db'")
       }
     } else {
+      if (tableDefinition.tableType == CatalogTableType.MANAGED) {
+        val dir = new Path(catalog(db).db.locationUri, table)
+        try {
+          fs.mkdirs(dir)
+        } catch {
+          case e: IOException =>
+            throw new SparkException(s"Unable to create table $table as failed 
" +
+              s"to create its directory $dir", e)
+        }
+      }
       catalog(db).tables.put(table, new TableDesc(tableDefinition))
     }
   }
@@ -192,6 +225,16 @@ class InMemoryCatalog extends ExternalCatalog {
       ignoreIfNotExists: Boolean): Unit = synchronized {
     requireDbExists(db)
     if (tableExists(db, table)) {
+      if (getTable(db, table).tableType == CatalogTableType.MANAGED) {
+        val dir = new Path(catalog(db).db.locationUri, table)
+        try {
+          fs.delete(dir, true)
+        } catch {
+          case e: IOException =>
+            throw new SparkException(s"Unable to drop table $table as failed " 
+
+              s"to delete its directory $dir", e)
+        }
+      }
       catalog(db).tables.remove(table)
     } else {
       if (!ignoreIfNotExists) {
@@ -205,6 +248,19 @@ class InMemoryCatalog extends ExternalCatalog {
     requireTableNotExists(db, newName)
     val oldDesc = catalog(db).tables(oldName)
     oldDesc.table = oldDesc.table.copy(identifier = TableIdentifier(newName, 
Some(db)))
+
+    if (oldDesc.table.tableType == CatalogTableType.MANAGED) {
+      val oldDir = new Path(catalog(db).db.locationUri, oldName)
+      val newDir = new Path(catalog(db).db.locationUri, newName)
+      try {
+        fs.rename(oldDir, newDir)
+      } catch {
+        case e: IOException =>
+          throw new SparkException(s"Unable to rename table $oldName to 
$newName as failed " +
+            s"to rename its directory $oldDir", e)
+      }
+    }
+
     catalog(db).tables.put(newName, oldDesc)
     catalog(db).tables.remove(oldName)
   }
@@ -277,7 +333,26 @@ class InMemoryCatalog extends ExternalCatalog {
           s"'$db' table '$table':\n$dupSpecsStr")
       }
     }
-    parts.foreach { p => existingParts.put(p.spec, p) }
+
+    val tableDir = new Path(catalog(db).db.locationUri, table)
+    val partitionColumnNames = getTable(db, table).partitionColumnNames
+    // TODO: we should follow hive to roll back if one partition path failed 
to create.
+    parts.foreach { p =>
+      // If location is set, the partition is using an external partition 
location and we don't
+      // need to handle its directory.
+      if (p.storage.locationUri.isEmpty) {
+        val partitionPath = partitionColumnNames.flatMap { col =>
+          p.spec.get(col).map(col + "=" + _)
+        }.mkString("/")
+        try {
+          fs.mkdirs(new Path(tableDir, partitionPath))
+        } catch {
+          case e: IOException =>
+            throw new SparkException(s"Unable to create partition path 
$partitionPath", e)
+        }
+      }
+      existingParts.put(p.spec, p)
+    }
   }
 
   override def dropPartitions(
@@ -295,7 +370,26 @@ class InMemoryCatalog extends ExternalCatalog {
           s"'$db' table '$table':\n$missingSpecsStr")
       }
     }
-    partSpecs.foreach(existingParts.remove)
+
+    val tableDir = new Path(catalog(db).db.locationUri, table)
+    val partitionColumnNames = getTable(db, table).partitionColumnNames
+    // TODO: we should follow hive to roll back if one partition path failed 
to delete.
+    partSpecs.foreach { p =>
+      // If location is set, the partition is using an external partition 
location and we don't
+      // need to handle its directory.
+      if (existingParts.contains(p) && 
existingParts(p).storage.locationUri.isEmpty) {
+        val partitionPath = partitionColumnNames.flatMap { col =>
+          p.get(col).map(col + "=" + _)
+        }.mkString("/")
+        try {
+          fs.delete(new Path(tableDir, partitionPath), true)
+        } catch {
+          case e: IOException =>
+            throw new SparkException(s"Unable to delete partition path 
$partitionPath", e)
+        }
+      }
+      existingParts.remove(p)
+    }
   }
 
   override def renamePartitions(
@@ -306,9 +400,31 @@ class InMemoryCatalog extends ExternalCatalog {
     require(specs.size == newSpecs.size, "number of old and new partition 
specs differ")
     requirePartitionsExist(db, table, specs)
     requirePartitionsNotExist(db, table, newSpecs)
+
+    val tableDir = new Path(catalog(db).db.locationUri, table)
+    val partitionColumnNames = getTable(db, table).partitionColumnNames
+    // TODO: we should follow hive to roll back if one partition path failed 
to rename.
     specs.zip(newSpecs).foreach { case (oldSpec, newSpec) =>
       val newPart = getPartition(db, table, oldSpec).copy(spec = newSpec)
       val existingParts = catalog(db).tables(table).partitions
+
+      // If location is set, the partition is using an external partition 
location and we don't
+      // need to handle its directory.
+      if (newPart.storage.locationUri.isEmpty) {
+        val oldPath = partitionColumnNames.flatMap { col =>
+          oldSpec.get(col).map(col + "=" + _)
+        }.mkString("/")
+        val newPath = partitionColumnNames.flatMap { col =>
+          newSpec.get(col).map(col + "=" + _)
+        }.mkString("/")
+        try {
+          fs.rename(new Path(tableDir, oldPath), new Path(tableDir, newPath))
+        } catch {
+          case e: IOException =>
+            throw new SparkException(s"Unable to rename partition path 
$oldPath", e)
+        }
+      }
+
       existingParts.remove(oldSpec)
       existingParts.put(newSpec, newPart)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/5cdb7bea/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
index e347734..651be26 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.catalyst.catalog
 
+import java.io.File
+import java.net.URI
+
 import org.scalatest.BeforeAndAfterEach
 
 import org.apache.spark.SparkFunSuite
@@ -510,6 +513,96 @@ abstract class ExternalCatalogSuite extends SparkFunSuite 
with BeforeAndAfterEac
     assert(catalog.listFunctions("db2", "func*").toSet == Set("func1", 
"func2"))
   }
 
+  // --------------------------------------------------------------------------
+  // File System operations
+  // --------------------------------------------------------------------------
+
+  private def exists(uri: String, children: String*): Boolean = {
+    val base = new File(new URI(uri))
+    children.foldLeft(base) {
+      case (parent, child) => new File(parent, child)
+    }.exists()
+  }
+
+  test("create/drop database should create/delete the directory") {
+    val catalog = newBasicCatalog()
+    val db = newDb("mydb")
+    catalog.createDatabase(db, ignoreIfExists = false)
+    assert(exists(db.locationUri))
+
+    catalog.dropDatabase("mydb", ignoreIfNotExists = false, cascade = false)
+    assert(!exists(db.locationUri))
+  }
+
+  test("create/drop/rename table should create/delete/rename the directory") {
+    val catalog = newBasicCatalog()
+    val db = catalog.getDatabase("db1")
+    val table = CatalogTable(
+      identifier = TableIdentifier("my_table", Some("db1")),
+      tableType = CatalogTableType.MANAGED,
+      storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
+      schema = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string"))
+    )
+
+    catalog.createTable("db1", table, ignoreIfExists = false)
+    assert(exists(db.locationUri, "my_table"))
+
+    catalog.renameTable("db1", "my_table", "your_table")
+    assert(!exists(db.locationUri, "my_table"))
+    assert(exists(db.locationUri, "your_table"))
+
+    catalog.dropTable("db1", "your_table", ignoreIfNotExists = false)
+    assert(!exists(db.locationUri, "your_table"))
+
+    val externalTable = CatalogTable(
+      identifier = TableIdentifier("external_table", Some("db1")),
+      tableType = CatalogTableType.EXTERNAL,
+      storage = CatalogStorageFormat(
+        Some(Utils.createTempDir().getAbsolutePath),
+        None, None, None, false, Map.empty),
+      schema = Seq(CatalogColumn("a", "int"), CatalogColumn("b", "string"))
+    )
+    catalog.createTable("db1", externalTable, ignoreIfExists = false)
+    assert(!exists(db.locationUri, "external_table"))
+  }
+
+  test("create/drop/rename partitions should create/delete/rename the 
directory") {
+    val catalog = newBasicCatalog()
+    val databaseDir = catalog.getDatabase("db1").locationUri
+    val table = CatalogTable(
+      identifier = TableIdentifier("tbl", Some("db1")),
+      tableType = CatalogTableType.MANAGED,
+      storage = CatalogStorageFormat(None, None, None, None, false, Map.empty),
+      schema = Seq(
+        CatalogColumn("col1", "int"),
+        CatalogColumn("col2", "string"),
+        CatalogColumn("a", "int"),
+        CatalogColumn("b", "string")),
+      partitionColumnNames = Seq("a", "b")
+    )
+    catalog.createTable("db1", table, ignoreIfExists = false)
+
+    catalog.createPartitions("db1", "tbl", Seq(part1, part2), ignoreIfExists = 
false)
+    assert(exists(databaseDir, "tbl", "a=1", "b=2"))
+    assert(exists(databaseDir, "tbl", "a=3", "b=4"))
+
+    catalog.renamePartitions("db1", "tbl", Seq(part1.spec), Seq(part3.spec))
+    assert(!exists(databaseDir, "tbl", "a=1", "b=2"))
+    assert(exists(databaseDir, "tbl", "a=5", "b=6"))
+
+    catalog.dropPartitions("db1", "tbl", Seq(part2.spec, part3.spec), 
ignoreIfNotExists = false)
+    assert(!exists(databaseDir, "tbl", "a=3", "b=4"))
+    assert(!exists(databaseDir, "tbl", "a=5", "b=6"))
+
+    val externalPartition = CatalogTablePartition(
+      Map("a" -> "7", "b" -> "8"),
+      CatalogStorageFormat(
+        Some(Utils.createTempDir().getAbsolutePath),
+        None, None, None, false, Map.empty)
+    )
+    catalog.createPartitions("db1", "tbl", Seq(externalPartition), 
ignoreIfExists = false)
+    assert(!exists(databaseDir, "tbl", "a=7", "b=8"))
+  }
 }
 
 
@@ -563,7 +656,7 @@ abstract class CatalogTestUtils {
 
   def newFunc(): CatalogFunction = newFunc("funcName")
 
-  def newUriForDatabase(): String = Utils.createTempDir().getAbsolutePath
+  def newUriForDatabase(): String = 
Utils.createTempDir().toURI.toString.stripSuffix("/")
 
   def newDb(name: String): CatalogDatabase = {
     CatalogDatabase(name, name + " description", newUriForDatabase(), 
Map.empty)

http://git-wip-us.apache.org/repos/asf/spark/blob/5cdb7bea/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
index ab4af8d..eaf993a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SharedState.scala
@@ -43,7 +43,7 @@ private[sql] class SharedState(val sparkContext: 
SparkContext) {
   /**
    * A catalog that interacts with external systems.
    */
-  lazy val externalCatalog: ExternalCatalog = new InMemoryCatalog
+  lazy val externalCatalog: ExternalCatalog = new 
InMemoryCatalog(sparkContext.hadoopConfiguration)
 
   /**
    * A classloader used to load all user-added jar.

http://git-wip-us.apache.org/repos/asf/spark/blob/5cdb7bea/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index f72325b..13074a6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -25,7 +25,7 @@ import org.scalatest.BeforeAndAfterEach
 import org.apache.spark.sql.{AnalysisException, QueryTest, Row}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogDatabase, 
CatalogStorageFormat}
-import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumn, CatalogTable, 
CatalogTableType}
 import org.apache.spark.sql.catalyst.catalog.{CatalogTablePartition, 
SessionCatalog}
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.internal.SQLConf
@@ -69,7 +69,7 @@ class DDLSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfterEach {
       CatalogDatabase(name, "", sqlContext.conf.warehousePath, Map()), 
ignoreIfExists = false)
   }
 
-  private def createTable(catalog: SessionCatalog, name: TableIdentifier): 
Unit = {
+  private def generateTable(catalog: SessionCatalog, name: TableIdentifier): 
CatalogTable = {
     val storage =
       CatalogStorageFormat(
         locationUri = Some(catalog.defaultTablePath(name)),
@@ -78,12 +78,23 @@ class DDLSuite extends QueryTest with SharedSQLContext with 
BeforeAndAfterEach {
         serde = None,
         compressed = false,
         serdeProperties = Map())
-    catalog.createTable(CatalogTable(
+    CatalogTable(
       identifier = name,
       tableType = CatalogTableType.EXTERNAL,
       storage = storage,
-      schema = Seq(),
-      createTime = 0L), ignoreIfExists = false)
+      schema = Seq(
+        CatalogColumn("col1", "int"),
+        CatalogColumn("col2", "string"),
+        CatalogColumn("a", "int"),
+        CatalogColumn("b", "int"),
+        CatalogColumn("c", "int"),
+        CatalogColumn("d", "int")),
+      partitionColumnNames = Seq("a", "b", "c", "d"),
+      createTime = 0L)
+  }
+
+  private def createTable(catalog: SessionCatalog, name: TableIdentifier): 
Unit = {
+    catalog.createTable(generateTable(catalog, name), ignoreIfExists = false)
   }
 
   private def createTablePartition(
@@ -327,23 +338,7 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
     val tableIdent1 = TableIdentifier("tab1", None)
     createTable(catalog, tableIdent1)
     val expectedTableIdent = tableIdent1.copy(database = Some("default"))
-    val expectedLocation =
-      catalog.getDatabaseMetadata("default").locationUri + "/tab1"
-    val expectedStorage =
-      CatalogStorageFormat(
-        locationUri = Some(expectedLocation),
-        inputFormat = None,
-        outputFormat = None,
-        serde = None,
-        compressed = false,
-        serdeProperties = Map())
-    val expectedTable =
-      CatalogTable(
-        identifier = expectedTableIdent,
-        tableType = CatalogTableType.EXTERNAL,
-        storage = expectedStorage,
-        schema = Seq(),
-        createTime = 0L)
+    val expectedTable = generateTable(catalog, expectedTableIdent)
     assert(catalog.getTableMetadata(tableIdent1) === expectedTable)
   }
 
@@ -352,23 +347,7 @@ class DDLSuite extends QueryTest with SharedSQLContext 
with BeforeAndAfterEach {
     createDatabase(catalog, "dbx")
     val tableIdent1 = TableIdentifier("tab1", Some("dbx"))
     createTable(catalog, tableIdent1)
-    val expectedLocation =
-      catalog.getDatabaseMetadata("dbx").locationUri + "/tab1"
-    val expectedStorage =
-      CatalogStorageFormat(
-        locationUri = Some(expectedLocation),
-        inputFormat = None,
-        outputFormat = None,
-        serde = None,
-        compressed = false,
-        serdeProperties = Map())
-    val expectedTable =
-      CatalogTable(
-        identifier = tableIdent1,
-        tableType = CatalogTableType.EXTERNAL,
-        storage = expectedStorage,
-        schema = Seq(),
-        createTime = 0L)
+    val expectedTable = generateTable(catalog, tableIdent1)
     assert(catalog.getTableMetadata(tableIdent1) === expectedTable)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to