This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch release-0.3
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.3 by this push:
     new 8bcec69b [FLINK-30608] support rename table
8bcec69b is described below

commit 8bcec69b5f9d0bd6a270889e8e7947270c0e3c44
Author: tsreaper <tsreape...@gmail.com>
AuthorDate: Mon Mar 6 14:45:57 2023 +0800

    [FLINK-30608] support rename table
    
    This closes #572.
---
 docs/content/docs/how-to/altering-tables.md        | 30 +++++++++++++
 .../flink/table/store/connector/FlinkCatalog.java  | 11 ++++-
 .../store/connector/FileSystemCatalogITCase.java   | 52 +++++++++++++++++++++-
 .../flink/table/store/file/catalog/Catalog.java    | 13 ++++++
 .../store/file/catalog/FileSystemCatalog.java      | 20 +++++++++
 .../apache/flink/table/store/hive/HiveCatalog.java | 27 +++++++++++
 .../flink/table/store/hive/HiveCatalogITCase.java  | 35 +++++++++++++++
 .../flink/table/store/spark/SparkCatalog.java      | 11 ++++-
 .../flink/table/store/spark/SparkReadTestBase.java | 15 +++++++
 .../store/spark/SparkSchemaEvolutionITCase.java    | 34 ++++++++++++--
 10 files changed, 239 insertions(+), 9 deletions(-)

diff --git a/docs/content/docs/how-to/altering-tables.md 
b/docs/content/docs/how-to/altering-tables.md
index 48048872..090860c0 100644
--- a/docs/content/docs/how-to/altering-tables.md
+++ b/docs/content/docs/how-to/altering-tables.md
@@ -54,6 +54,36 @@ ALTER TABLE my_table SET TBLPROPERTIES (
 
 {{< /tabs >}}
 
+## Rename Table Name
+
+The following SQL rename the table name to new name.
+
+{{< tabs "rename-table-name" >}}
+
+{{< tab "Flink" >}}
+
+```sql
+ALTER TABLE my_table RENAME TO my_table_new;
+```
+
+{{< /tab >}}
+
+{{< tab "Spark3" >}}
+
+```sql
+ALTER TABLE my_table RENAME TO my_table_new;
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+{{< hint info >}}
+
+If you use object storage, such as S3 or OSS, please use this syntax 
carefully, because the renaming of object storage is not atomic, and only 
partial files may be moved in case of failure.
+
+{{< /hint >}}
+
 ## Removing Table Properties
 
 The following SQL removes `write-buffer-size` table property.
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index 55e5f30e..9769e868 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -314,8 +314,15 @@ public class FlinkCatalog extends AbstractCatalog {
     @Override
     public final void renameTable(
             ObjectPath tablePath, String newTableName, boolean 
ignoreIfNotExists)
-            throws CatalogException {
-        throw new UnsupportedOperationException();
+            throws CatalogException, TableNotExistException, 
TableAlreadyExistException {
+        ObjectPath toTable = new ObjectPath(tablePath.getDatabaseName(), 
newTableName);
+        try {
+            catalog.renameTable(tablePath, toTable, ignoreIfNotExists);
+        } catch (Catalog.TableNotExistException e) {
+            throw new TableNotExistException(getName(), tablePath);
+        } catch (Catalog.TableAlreadyExistException e) {
+            throw new TableAlreadyExistException(getName(), toTable);
+        }
     }
 
     @Override
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
index 69ee10c2..932511e9 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
@@ -18,25 +18,37 @@
 
 package org.apache.flink.table.store.connector;
 
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.store.file.catalog.Catalog;
 import org.apache.flink.table.store.file.utils.BlockingIterator;
 import org.apache.flink.table.store.kafka.KafkaTableTestBase;
 import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
 
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.UUID;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
 
 /** ITCase for {@link FlinkCatalog}. */
 public class FileSystemCatalogITCase extends KafkaTableTestBase {
 
+    private String path;
+    private static final String DB_NAME = "default";
+
     @Before
     public void before() throws IOException {
-        String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
+        path = TEMPORARY_FOLDER.newFolder().toURI().toString();
         tEnv.executeSql(
                 String.format(
                         "CREATE CATALOG fs WITH ('type'='table-store', 
'warehouse'='%s')", path));
@@ -50,6 +62,34 @@ public class FileSystemCatalogITCase extends 
KafkaTableTestBase {
         innerTestWriteRead();
     }
 
+    @Test
+    public void testRenameTable() throws Exception {
+        tEnv.executeSql("CREATE TABLE t1 (a INT)").await();
+        tEnv.executeSql("CREATE TABLE t2 (a INT)").await();
+        tEnv.executeSql("INSERT INTO t1 VALUES(1),(2)").await();
+        // the source table do not exist.
+        assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE t3 RENAME TO 
t4"))
+                .hasMessage("Table `fs`.`default`.`t3` doesn't exist or is a 
temporary table.");
+
+        // the target table has existed.
+        assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE t1 RENAME TO 
t2"))
+                .hasMessage("Could not execute ALTER TABLE fs.default.t1 
RENAME TO fs.default.t2");
+
+        tEnv.executeSql("ALTER TABLE t1 RENAME TO t3").await();
+        Assert.assertEquals(Arrays.asList(Row.of("t2"), Row.of("t3")), 
collect("SHOW TABLES"));
+
+        ObjectPath objectPath = new ObjectPath(DB_NAME, "t3");
+        Catalog catalog =
+                ((FlinkCatalog) 
tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog();
+        Path tablePath = catalog.getTableLocation(objectPath);
+        Assert.assertEquals(tablePath.toString(), path + DB_NAME + ".db" + 
File.separator + "t3");
+
+        BlockingIterator<Row, Row> iterator =
+                BlockingIterator.of(tEnv.from("t3").execute().collect());
+        List<Row> result = iterator.collectAndClose(2);
+        assertThat(result).containsExactlyInAnyOrder(Row.of(1), Row.of(2));
+    }
+
     @Test
     public void testLogWriteRead() throws Exception {
         String topic = UUID.randomUUID().toString();
@@ -107,4 +147,14 @@ public class FileSystemCatalogITCase extends 
KafkaTableTestBase {
         List<Row> result = iterator.collectAndClose(2);
         assertThat(result).containsExactlyInAnyOrder(Row.of("1", "2", "3"), 
Row.of("4", "5", "6"));
     }
+
+    private List<Row> collect(String sql) throws Exception {
+        List<Row> result = new ArrayList<>();
+        try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
+            while (it.hasNext()) {
+                result.add(it.next());
+            }
+        }
+        return result;
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
index 919f6067..3789187a 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
@@ -150,6 +150,19 @@ public interface Catalog extends AutoCloseable {
     void createTable(ObjectPath tablePath, UpdateSchema tableSchema, boolean 
ignoreIfExists)
             throws TableAlreadyExistException, DatabaseNotExistException;
 
+    /**
+     * Rename a table.
+     *
+     * @param fromTable the name of the table which need to rename
+     * @param toTable the new table
+     * @param ignoreIfNotExists Flag to specify behavior when the table does 
not exist: if set to
+     *     false, throw an exception, if set to true, do nothing.
+     * @throws TableNotExistException if the from table does not exist
+     * @throws TableAlreadyExistException if the to table already exists
+     */
+    void renameTable(ObjectPath fromTable, ObjectPath toTable, boolean 
ignoreIfNotExists)
+            throws TableNotExistException, TableAlreadyExistException;
+
     /**
      * Modify an existing table from {@link SchemaChange}s.
      *
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
index 6dae077e..c5a9a80a 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
@@ -163,6 +163,26 @@ public class FileSystemCatalog extends AbstractCatalog {
         uncheck(() -> new SchemaManager(path).commitNewVersion(table));
     }
 
+    @Override
+    public void renameTable(ObjectPath fromTable, ObjectPath toTable, boolean 
ignoreIfNotExists)
+            throws TableNotExistException, TableAlreadyExistException {
+        Path fromPath = getTableLocation(fromTable);
+        if (!tableExists(fromPath)) {
+            if (ignoreIfNotExists) {
+                return;
+            }
+
+            throw new TableNotExistException(fromTable);
+        }
+
+        Path toPath = getTableLocation(toTable);
+        if (tableExists(toPath)) {
+            throw new TableAlreadyExistException(toTable);
+        }
+
+        uncheck(() -> fs.rename(fromPath, toPath));
+    }
+
     @Override
     public void alterTable(
             ObjectPath tablePath, List<SchemaChange> changes, boolean 
ignoreIfNotExists)
diff --git 
a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
 
b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index 90269ee4..e9db107c 100644
--- 
a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++ 
b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -239,6 +239,33 @@ public class HiveCatalog extends AbstractCatalog {
         }
     }
 
+    @Override
+    public void renameTable(ObjectPath fromTable, ObjectPath toTable, boolean 
ignoreIfNotExists)
+            throws TableNotExistException, TableAlreadyExistException {
+        if (!tableStoreTableExists(fromTable)) {
+            if (ignoreIfNotExists) {
+                return;
+            } else {
+                throw new TableNotExistException(fromTable);
+            }
+        }
+
+        if (tableExists(toTable)) {
+            throw new TableAlreadyExistException(toTable);
+        }
+
+        try {
+            String fromDB = fromTable.getDatabaseName();
+            String fromTableName = fromTable.getObjectName();
+            Table table = client.getTable(fromDB, fromTableName);
+            table.setDbName(toTable.getDatabaseName());
+            table.setTableName(toTable.getObjectName());
+            client.alter_table(fromDB, fromTableName, table);
+        } catch (TException e) {
+            throw new RuntimeException("Failed to rename table " + 
fromTable.getFullName(), e);
+        }
+    }
+
     @Override
     public void alterTable(
             ObjectPath tablePath, List<SchemaChange> changes, boolean 
ignoreIfNotExists)
diff --git 
a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
 
b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
index 3ac047b4..0cde2fe7 100644
--- 
a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
+++ 
b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
@@ -23,7 +23,9 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.ObjectPath;
 import org.apache.flink.table.store.connector.FlinkCatalog;
+import org.apache.flink.table.store.file.catalog.Catalog;
 import org.apache.flink.table.store.file.catalog.CatalogLock;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
@@ -42,6 +44,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
@@ -276,6 +279,38 @@ public class HiveCatalogITCase {
         }
     }
 
+    @Test
+    public void testRenameTable() throws Exception {
+        tEnv.executeSql("CREATE TABLE t1 (a INT)").await();
+        tEnv.executeSql("CREATE TABLE t2 (a INT)").await();
+        tEnv.executeSql("INSERT INTO t1 SELECT 1");
+        // the source table do not exist.
+        assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE t3 RENAME TO 
t4"))
+                .hasMessage(
+                        "Table `my_hive`.`test_db`.`t3` doesn't exist or is a 
temporary table.");
+
+        // the target table has existed.
+        assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE t1 RENAME TO 
t2"))
+                .hasMessage(
+                        "Could not execute ALTER TABLE my_hive.test_db.t1 
RENAME TO my_hive.test_db.t2");
+
+        tEnv.executeSql("ALTER TABLE t1 RENAME TO t3").await();
+        List<String> tables = hiveShell.executeQuery("SHOW TABLES");
+        Assert.assertTrue(tables.contains("t3"));
+        Assert.assertFalse(tables.contains("t1"));
+
+        ObjectPath objectPath = new ObjectPath("test_db", "t3");
+        Catalog catalog =
+                ((FlinkCatalog) 
tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog();
+        org.apache.flink.core.fs.Path tablePath = 
catalog.getTableLocation(objectPath);
+        Assert.assertEquals(tablePath.toString(), path + "test_db.db" + 
File.separator + "t3");
+
+        // TODO: the hiverunner (4.0) has a bug ,it can not rename the table 
path correctly ,
+        // we should upgrade it to the 6.0 later ,and  update the test case 
for query.
+        assertThatThrownBy(() -> tEnv.executeSql("SELECT * FROM t3"))
+                .hasMessageContaining("SQL validation failed. There is no 
table stored in");
+    }
+
     @Test
     public void testHiveLock() throws InterruptedException {
         tEnv.executeSql("CREATE TABLE t (a INT)");
diff --git 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 8c925f4f..98d4e49b 100644
--- 
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++ 
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -357,7 +357,14 @@ public class SparkCatalog implements TableCatalog, 
SupportsNamespaces {
     }
 
     @Override
-    public void renameTable(Identifier oldIdent, Identifier newIdent) {
-        throw new UnsupportedOperationException();
+    public void renameTable(Identifier oldIdent, Identifier newIdent)
+            throws NoSuchTableException, TableAlreadyExistsException {
+        try {
+            catalog.renameTable(objectPath(oldIdent), objectPath(newIdent), 
false);
+        } catch (Catalog.TableNotExistException e) {
+            throw new NoSuchTableException(oldIdent);
+        } catch (Catalog.TableAlreadyExistException e) {
+            throw new TableAlreadyExistsException(newIdent);
+        }
     }
 }
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
index 15ec0c6f..99e1af9d 100644
--- 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
@@ -40,7 +40,9 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
 
 import java.io.File;
 import java.io.IOException;
@@ -71,7 +73,11 @@ public abstract class SparkReadTestBase {
         spark = SparkSession.builder().master("local[2]").getOrCreate();
         spark.conf().set("spark.sql.catalog.tablestore", 
SparkCatalog.class.getName());
         spark.conf().set("spark.sql.catalog.tablestore.warehouse", 
warehousePath.toString());
+        spark.sql("USE tablestore");
+    }
 
+    @BeforeEach
+    public void beforeEach() throws Exception {
         // flink sink
         tablePath1 = new Path(warehousePath, "default.db/t1");
         SimpleTableTestHelper testHelper1 = new 
SimpleTableTestHelper(tablePath1, rowType1());
@@ -129,6 +135,15 @@ public abstract class SparkReadTestBase {
         testHelper2.commit();
     }
 
+    @AfterEach
+    public void afterEach() {
+        List<Row> tables = spark.sql("show tables").collectAsList();
+        tables.forEach(
+                table -> {
+                    spark.sql("DROP TABLE " + table.getString(0) + "." + 
table.getString(1));
+                });
+    }
+
     protected static SimpleTableTestHelper createTestHelper(Path tablePath) 
throws Exception {
         RowType rowType =
                 new RowType(
diff --git 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
index 22a096dd..d75956a5 100644
--- 
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
+++ 
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
@@ -105,6 +105,36 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
                         "java.lang.IllegalArgumentException: ADD COLUMN cannot 
specify NOT NULL.");
     }
 
+    @Test
+    public void testRenameTable() {
+        assertThatThrownBy(() -> spark.sql("ALTER TABLE t3 RENAME TO t4"))
+                .isInstanceOf(AnalysisException.class)
+                .hasMessageContaining("Table or view not found: t3");
+
+        assertThatThrownBy(() -> spark.sql("ALTER TABLE t1 RENAME TO t2"))
+                .isInstanceOf(AnalysisException.class)
+                .hasMessageContaining("Table default.t2 already exists");
+
+        spark.sql("ALTER TABLE t1 RENAME TO t3");
+        List<Row> tables = spark.sql("SHOW TABLES").collectAsList();
+        assertThat(tables.stream().map(Row::toString))
+                .containsExactlyInAnyOrder("[default,t2,false]", 
"[default,t3,false]");
+
+        List<Row> afterRename =
+                spark.sql("SHOW CREATE TABLE 
tablestore.default.t3").collectAsList();
+        assertThat(afterRename.toString())
+                .isEqualTo(
+                        "[[CREATE TABLE t3 (\n"
+                                + "  `a` INT NOT NULL,\n"
+                                + "  `b` BIGINT,\n"
+                                + "  `c` STRING)\n"
+                                + buildTableProperties("default.db/t3")
+                                + "]]");
+
+        List<Row> data = spark.sql("SELECT * FROM t3").collectAsList();
+        assertThat(data.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
+    }
+
     @Test
     public void testRenameColumn() throws Exception {
         Path tablePath = new Path(warehousePath, 
"default.db/testRenameColumn");
@@ -149,7 +179,6 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
 
     @Test
     public void testRenamePartitionKey() {
-        spark.sql("USE tablestore");
         spark.sql(
                 "CREATE TABLE default.testRenamePartitionKey (\n"
                         + "a BIGINT,\n"
@@ -255,7 +284,6 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
 
     @Test
     public void testDropPartitionKey() {
-        spark.sql("USE tablestore");
         spark.sql(
                 "CREATE TABLE default.testDropPartitionKey (\n"
                         + "a BIGINT,\n"
@@ -295,7 +323,6 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
 
     @Test
     public void testDropPrimaryKey() {
-        spark.sql("USE tablestore");
         spark.sql(
                 "CREATE TABLE default.testDropPrimaryKey (\n"
                         + "a BIGINT,\n"
@@ -385,7 +412,6 @@ public class SparkSchemaEvolutionITCase extends 
SparkReadTestBase {
 
     @Test
     public void testAlterPrimaryKeyNullability() {
-        spark.sql("USE tablestore");
         spark.sql(
                 "CREATE TABLE default.testAlterPkNullability (\n"
                         + "a BIGINT,\n"

Reply via email to