This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 598fcbe  [SPARK-28265][SQL] Add renameTable to TableCatalog API
598fcbe is described below

commit 598fcbe5ed353e3e432d5d32f656527806c7c612
Author: Edgar Rodriguez <edgar...@gmail.com>
AuthorDate: Wed Aug 14 14:24:13 2019 +0800

    [SPARK-28265][SQL] Add renameTable to TableCatalog API
    
    ## What changes were proposed in this pull request?
    
    This PR adds the `renameTable` call to the `TableCatalog` API, as described 
in the [Table Metadata API 
SPIP](https://docs.google.com/document/d/1zLFiA1VuaWeVxeTDXNg8bL6GP3BVoOZBkewFtEnjEoo/edit#heading=h.m45webtwxf2d).
    
    This PR is related to: https://github.com/apache/spark/pull/24246
    
    ## How was this patch tested?
    
    Added  unit tests and contract tests.
    
    Closes #25206 from edgarRd/SPARK-28265-add-rename-table-catalog-api.
    
    Authored-by: Edgar Rodriguez <edgar...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../apache/spark/sql/catalog/v2/TableCatalog.java  | 20 ++++++
 .../spark/sql/catalog/v2/TableCatalogSuite.scala   | 47 ++++++++++++++
 .../spark/sql/catalog/v2/TestTableCatalog.scala    | 13 ++++
 .../datasources/v2/V2SessionCatalog.scala          | 10 +++
 .../datasources/v2/V2SessionCatalogSuite.scala     | 74 +++++++++++++++++++++-
 .../sql/sources/v2/TestInMemoryTableCatalog.scala  | 14 ++++
 6 files changed, 177 insertions(+), 1 deletion(-)

diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java
index 681629d..4775b58 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalog/v2/TableCatalog.java
@@ -134,4 +134,24 @@ public interface TableCatalog extends CatalogPlugin {
    * @return true if a table was deleted, false if no table exists for the 
identifier
    */
   boolean dropTable(Identifier ident);
+
+  /**
+   * Renames a table in the catalog.
+   * <p>
+   * If the catalog supports views and contains a view for the old identifier 
and not a table, this
+   * throws {@link NoSuchTableException}. Additionally, if the new identifier 
is a table or a view,
+   * this throws {@link TableAlreadyExistsException}.
+   * <p>
+   * If the catalog does not support table renames between namespaces, it 
throws
+   * {@link UnsupportedOperationException}.
+   *
+   * @param oldIdent the table identifier of the existing table to rename
+   * @param newIdent the new table identifier of the table
+   * @throws NoSuchTableException If the table to rename doesn't exist or is a 
view
+   * @throws TableAlreadyExistsException If the new table name already exists 
or is a view
+   * @throws UnsupportedOperationException If the namespaces of old and new 
identiers do not
+   *                                       match (optional)
+   */
+  void renameTable(Identifier oldIdent, Identifier newIdent)
+      throws NoSuchTableException, TableAlreadyExistsException;
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala
index 089b4c5..e4c1b3c 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TableCatalogSuite.scala
@@ -45,6 +45,7 @@ class TableCatalogSuite extends SparkFunSuite {
 
   private val testNs = Array("`", ".")
   private val testIdent = Identifier.of(testNs, "test_table")
+  private val testIdentNew = Identifier.of(testNs, "test_table_new")
 
   test("Catalogs can load the catalog") {
     val catalog = newCatalog()
@@ -656,6 +657,52 @@ class TableCatalogSuite extends SparkFunSuite {
     assert(!catalog.tableExists(testIdent))
   }
 
+  test("renameTable") {
+    val catalog = newCatalog()
+
+    assert(!catalog.tableExists(testIdent))
+    assert(!catalog.tableExists(testIdentNew))
+
+    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(catalog.tableExists(testIdent))
+    catalog.renameTable(testIdent, testIdentNew)
+
+    assert(!catalog.tableExists(testIdent))
+    assert(catalog.tableExists(testIdentNew))
+  }
+
+  test("renameTable: fail if table does not exist") {
+    val catalog = newCatalog()
+
+    val exc = intercept[NoSuchTableException] {
+      catalog.renameTable(testIdent, testIdentNew)
+    }
+
+    assert(exc.message.contains(testIdent.quoted))
+    assert(exc.message.contains("not found"))
+  }
+
+  test("renameTable: fail if new table name already exists") {
+    val catalog = newCatalog()
+
+    assert(!catalog.tableExists(testIdent))
+    assert(!catalog.tableExists(testIdentNew))
+
+    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    catalog.createTable(testIdentNew, schema, Array.empty, emptyProps)
+
+    assert(catalog.tableExists(testIdent))
+    assert(catalog.tableExists(testIdentNew))
+
+    val exc = intercept[TableAlreadyExistsException] {
+      catalog.renameTable(testIdent, testIdentNew)
+    }
+
+    assert(exc.message.contains(testIdentNew.quoted))
+    assert(exc.message.contains("already exists"))
+  }
+
   test("listNamespaces: list namespaces from metadata") {
     val catalog = newCatalog()
     catalog.createNamespace(Array("ns1"), Map("property" -> "value").asJava)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala
index 6fdd6e3..de7c5c9 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalog/v2/TestTableCatalog.scala
@@ -93,6 +93,19 @@ class TestTableCatalog extends TableCatalog with 
SupportsNamespaces {
 
   override def dropTable(ident: Identifier): Boolean = 
Option(tables.remove(ident)).isDefined
 
+  override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = 
{
+    if (tables.containsKey(newIdent)) {
+      throw new TableAlreadyExistsException(newIdent)
+    }
+
+    Option(tables.remove(oldIdent)) match {
+      case Some(table) =>
+        tables.put(newIdent, InMemoryTable(table.name, table.schema, 
table.properties))
+      case _ =>
+        throw new NoSuchTableException(oldIdent)
+    }
+  }
+
   private def allNamespaces: Seq[Seq[String]] = {
     (tables.keySet.asScala.map(_.namespace.toSeq) ++ 
namespaces.keySet.asScala).toSeq.distinct
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index a3b8f28..79ea875 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -153,6 +153,16 @@ class V2SessionCatalog(sessionState: SessionState) extends 
TableCatalog {
     }
   }
 
+  override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = 
{
+    if (tableExists(newIdent)) {
+      throw new TableAlreadyExistsException(newIdent)
+    }
+
+    // Load table to make sure the table exists
+    loadTable(oldIdent)
+    catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier)
+  }
+
   implicit class TableIdentifierHelper(ident: Identifier) {
     def asTableIdentifier: TableIdentifier = {
       ident.namespace match {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
index 3822882..4f14ecc 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalogSuite.scala
@@ -25,6 +25,7 @@ import scala.collection.JavaConverters._
 import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalog.v2.{Catalogs, Identifier, TableCatalog, 
TableChange}
 import org.apache.spark.sql.catalyst.analysis.{NoSuchTableException, 
TableAlreadyExistsException}
 import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
@@ -45,6 +46,7 @@ class V2SessionCatalogSuite
   override protected def beforeAll(): Unit = {
     super.beforeAll()
     spark.sql("""CREATE DATABASE IF NOT EXISTS db""")
+    spark.sql("""CREATE DATABASE IF NOT EXISTS db2""")
     spark.sql("""CREATE DATABASE IF NOT EXISTS ns""")
     spark.sql("""CREATE DATABASE IF NOT EXISTS ns2""")
   }
@@ -52,6 +54,7 @@ class V2SessionCatalogSuite
   override protected def afterAll(): Unit = {
     spark.sql("""DROP TABLE IF EXISTS db.test_table""")
     spark.sql("""DROP DATABASE IF EXISTS db""")
+    spark.sql("""DROP DATABASE IF EXISTS db2""")
     spark.sql("""DROP DATABASE IF EXISTS ns""")
     spark.sql("""DROP DATABASE IF EXISTS ns2""")
     super.afterAll()
@@ -59,6 +62,7 @@ class V2SessionCatalogSuite
 
   after {
     newCatalog().dropTable(testIdent)
+    newCatalog().dropTable(testIdentNew)
   }
 
   private def newCatalog(): TableCatalog = {
@@ -67,7 +71,9 @@ class V2SessionCatalogSuite
     newCatalog
   }
 
-  private val testIdent = Identifier.of(Array("db"), "test_table")
+  private val testNs = Array("db")
+  private val testIdent = Identifier.of(testNs, "test_table")
+  private val testIdentNew = Identifier.of(testNs, "test_table_new")
 
   test("Catalogs can load the catalog") {
     val catalog = newCatalog()
@@ -680,4 +686,70 @@ class V2SessionCatalogSuite
     assert(!wasDropped)
     assert(!catalog.tableExists(testIdent))
   }
+
+  test("renameTable") {
+    val catalog = newCatalog()
+
+    assert(!catalog.tableExists(testIdent))
+    assert(!catalog.tableExists(testIdentNew))
+
+    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(catalog.tableExists(testIdent))
+    catalog.renameTable(testIdent, testIdentNew)
+
+    assert(!catalog.tableExists(testIdent))
+    assert(catalog.tableExists(testIdentNew))
+  }
+
+  test("renameTable: fail if table does not exist") {
+    val catalog = newCatalog()
+
+    val exc = intercept[NoSuchTableException] {
+      catalog.renameTable(testIdent, testIdentNew)
+    }
+
+    assert(exc.message.contains(testIdent.quoted))
+    assert(exc.message.contains("not found"))
+  }
+
+  test("renameTable: fail if new table name already exists") {
+    val catalog = newCatalog()
+
+    assert(!catalog.tableExists(testIdent))
+    assert(!catalog.tableExists(testIdentNew))
+
+    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+    catalog.createTable(testIdentNew, schema, Array.empty, emptyProps)
+
+    assert(catalog.tableExists(testIdent))
+    assert(catalog.tableExists(testIdentNew))
+
+    val exc = intercept[TableAlreadyExistsException] {
+      catalog.renameTable(testIdent, testIdentNew)
+    }
+
+    assert(exc.message.contains(testIdentNew.quoted))
+    assert(exc.message.contains("already exists"))
+  }
+
+  test("renameTable: fail if db does not match for old and new table names") {
+    val catalog = newCatalog()
+    val testIdentNewOtherDb = Identifier.of(Array("db2"), "test_table_new")
+
+    assert(!catalog.tableExists(testIdent))
+    assert(!catalog.tableExists(testIdentNewOtherDb))
+
+    catalog.createTable(testIdent, schema, Array.empty, emptyProps)
+
+    assert(catalog.tableExists(testIdent))
+
+    val exc = intercept[AnalysisException] {
+      catalog.renameTable(testIdent, testIdentNewOtherDb)
+    }
+
+    assert(exc.message.contains(testIdent.namespace.quoted))
+    assert(exc.message.contains(testIdentNewOtherDb.namespace.quoted))
+    assert(exc.message.contains("RENAME TABLE source and destination databases 
do not match"))
+  }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
index 7c51a29..b715176 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/v2/TestInMemoryTableCatalog.scala
@@ -104,6 +104,20 @@ class TestInMemoryTableCatalog extends TableCatalog {
     Option(tables.remove(ident)).isDefined
   }
 
+  override def renameTable(oldIdent: Identifier, newIdent: Identifier): Unit = 
{
+    if (tables.containsKey(newIdent)) {
+      throw new TableAlreadyExistsException(newIdent)
+    }
+
+    Option(tables.remove(oldIdent)) match {
+      case Some(table) =>
+        tables.put(newIdent,
+          new InMemoryTable(table.name, table.schema, table.partitioning, 
table.properties))
+      case _ =>
+        throw new NoSuchTableException(oldIdent)
+    }
+  }
+
   def clearTables(): Unit = {
     tables.clear()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to