This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 271ef7b7dfa [FLINK-29677][table] Prevent dropping the current catalog 271ef7b7dfa is described below commit 271ef7b7dfac195b43abced8b81c38998855dabf Author: Jane Chan <qingyue....@gmail.com> AuthorDate: Tue Oct 18 18:14:34 2022 +0800 [FLINK-29677][table] Prevent dropping the current catalog This closes #21097 --- .../src/test/resources/sql/catalog_database.q | 14 +++ .../src/test/resources/sql/catalog_database.q | 107 +++++++++++++++++---- .../apache/flink/table/catalog/CatalogManager.java | 3 + .../flink/table/planner/catalog/CatalogITCase.java | 19 +++- .../flink/table/api/TableEnvironmentTest.scala | 6 ++ 5 files changed, 125 insertions(+), 24 deletions(-) diff --git a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q index a2fbae0d300..3704ce2f52c 100644 --- a/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q +++ b/flink-table/flink-sql-client/src/test/resources/sql/catalog_database.q @@ -88,6 +88,11 @@ drop catalog default_catalog; [INFO] Execute statement succeed. !info +drop catalog c1; +[ERROR] Could not execute SQL statement. Reason: +org.apache.flink.table.catalog.exceptions.CatalogException: Cannot drop a catalog which is currently in use. +!error + # ========================================================================== # test database # ========================================================================== @@ -183,6 +188,15 @@ drop database `default`; [INFO] Execute statement succeed. !info +drop catalog `mod`; +[ERROR] Could not execute SQL statement. Reason: +org.apache.flink.table.catalog.exceptions.CatalogException: Cannot drop a catalog which is currently in use. +!error + +use catalog `c1`; +[INFO] Execute statement succeed. +!info + drop catalog `mod`; [INFO] Execute statement succeed. !info diff --git a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q index fe96c4e1918..d79da350830 100644 --- a/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q +++ b/flink-table/flink-sql-gateway/src/test/resources/sql/catalog_database.q @@ -262,6 +262,21 @@ drop database `default`; 1 row in set !ok +drop catalog `mod`; +!output +org.apache.flink.table.catalog.exceptions.CatalogException: Cannot drop a catalog which is currently in use. +!error + +use catalog `c1`; +!output ++--------+ +| result | ++--------+ +| OK | ++--------+ +1 row in set +!ok + drop catalog `mod`; !output +--------+ @@ -283,39 +298,71 @@ org.apache.flink.table.catalog.exceptions.CatalogException: A catalog with name create table MyTable1 (a int, b string) with ('connector' = 'values'); !output -org.apache.flink.table.api.ValidationException: Catalog mod does not exist. -!error ++--------+ +| result | ++--------+ +| OK | ++--------+ +1 row in set +!ok create table MyTable2 (a int, b string) with ('connector' = 'values'); !output -org.apache.flink.table.api.ValidationException: Catalog mod does not exist. -!error ++--------+ +| result | ++--------+ +| OK | ++--------+ +1 row in set +!ok # hive catalog is case-insensitive show tables; !output -org.apache.flink.table.api.ValidationException: Catalog mod does not exist -!error ++------------+ +| table name | ++------------+ +| MyTable1 | +| MyTable2 | ++------------+ +2 rows in set +!ok show views; !output -org.apache.flink.table.api.ValidationException: Catalog mod does not exist -!error +Empty set +!ok create view MyView1 as select 1 + 1; !output -org.apache.flink.table.api.ValidationException: Catalog mod does not exist. -!error ++--------+ +| result | ++--------+ +| OK | ++--------+ +1 row in set +!ok create view MyView2 as select 1 + 1; !output -org.apache.flink.table.api.ValidationException: Catalog mod does not exist. -!error ++--------+ +| result | ++--------+ +| OK | ++--------+ +1 row in set +!ok show views; !output -org.apache.flink.table.api.ValidationException: Catalog mod does not exist -!error ++-----------+ +| view name | ++-----------+ +| MyView1 | +| MyView2 | ++-----------+ +2 rows in set +!ok # test create with full qualified name create table c1.db1.MyTable3 (a int, b string) with ('connector' = 'values'); @@ -458,12 +505,16 @@ show tables; +------------+ | table name | +------------+ +| MyTable1 | +| MyTable2 | | MyTable5 | | MyTable6 | +| MyView1 | +| MyView2 | | MyView5 | | MyView6 | +------------+ -4 rows in set +8 rows in set !ok show views; @@ -471,10 +522,12 @@ show views; +-----------+ | view name | +-----------+ +| MyView1 | +| MyView2 | | MyView5 | | MyView6 | +-----------+ -2 rows in set +4 rows in set !ok drop table db1.MyTable3; @@ -563,10 +616,14 @@ show tables; +------------+ | table name | +------------+ +| MyTable1 | +| MyTable2 | | MyTable5 | +| MyView1 | +| MyView2 | | MyView5 | +------------+ -2 rows in set +6 rows in set !ok show views; @@ -574,9 +631,11 @@ show views; +-----------+ | view name | +-----------+ +| MyView1 | +| MyView2 | | MyView5 | +-----------+ -1 row in set +3 rows in set !ok # ========================================================================== @@ -598,11 +657,15 @@ show tables; +------------+ | table name | +------------+ +| MyTable1 | +| MyTable2 | | MyTable5 | | MyTable7 | +| MyView1 | +| MyView2 | | MyView5 | +------------+ -3 rows in set +7 rows in set !ok reset; @@ -630,10 +693,14 @@ show tables; +------------+ | table name | +------------+ +| MyTable1 | +| MyTable2 | | MyTable7 | +| MyView1 | +| MyView2 | | MyView5 | +------------+ -2 rows in set +6 rows in set !ok # ========================================================================== diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java index ade4c3b155e..66b6dc2b4d8 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/CatalogManager.java @@ -226,6 +226,9 @@ public final class CatalogManager { "Catalog name cannot be null or empty."); if (catalogs.containsKey(catalogName)) { + if (currentCatalogName.equals(catalogName)) { + throw new CatalogException("Cannot drop a catalog which is currently in use."); + } Catalog catalog = catalogs.remove(catalogName); catalog.close(); } else if (!ignoreIfNotExists) { diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java index 119aa4b51fc..5e95788fc95 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/catalog/CatalogITCase.java @@ -22,6 +22,7 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.Schema; import org.apache.flink.table.api.TableEnvironment; +import org.apache.flink.table.api.ValidationException; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.CatalogDatabaseImpl; @@ -30,6 +31,7 @@ import org.apache.flink.table.catalog.CatalogTable; import org.apache.flink.table.catalog.GenericInMemoryCatalog; import org.apache.flink.table.catalog.GenericInMemoryCatalogFactoryOptions; import org.apache.flink.table.catalog.ObjectPath; +import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.utils.CatalogManagerMocks; import org.apache.flink.testutils.ClassLoaderUtils; import org.apache.flink.util.TemporaryClassLoaderContext; @@ -43,6 +45,7 @@ import java.util.ArrayList; import java.util.HashMap; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** IT Case for catalog ddl. */ public class CatalogITCase { @@ -69,15 +72,23 @@ public class CatalogITCase { String name = "c1"; TableEnvironment tableEnv = getTableEnvironment(); - String ddl = + String createDdl = String.format( "create catalog %s with('type'='%s')", name, GenericInMemoryCatalogFactoryOptions.IDENTIFIER); - tableEnv.executeSql(ddl); + tableEnv.executeSql(createDdl); assertThat(tableEnv.getCatalog(name)).isPresent(); - ddl = String.format("drop catalog %s", name); - tableEnv.executeSql(ddl); + String dropDdl = String.format("drop catalog %s", name); + tableEnv.executeSql(String.format("use catalog %s", name)); + assertThatThrownBy(() -> tableEnv.executeSql(dropDdl)) + .isInstanceOf(ValidationException.class) + .hasRootCauseExactlyInstanceOf(CatalogException.class) + .hasRootCauseMessage("Cannot drop a catalog which is currently in use."); + assertThat(tableEnv.getCatalog(name)).isPresent(); + + tableEnv.executeSql("use catalog default_catalog"); + tableEnv.executeSql(dropDdl); assertThat(tableEnv.getCatalog(name)).isNotPresent(); } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala index 791a365831d..e10ad0c93f1 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/api/TableEnvironmentTest.scala @@ -770,6 +770,12 @@ class TableEnvironmentTest { assertEquals(ResultKind.SUCCESS, tableResult2.getResultKind) assertEquals("my_catalog", tableEnv.getCurrentCatalog) + assertThatThrownBy(() => tableEnv.executeSql("DROP CATALOG my_catalog")) + .isInstanceOf(classOf[ValidationException]) + .hasRootCauseMessage("Cannot drop a catalog which is currently in use.") + + tableEnv.executeSql("USE CATALOG default_catalog") + val tableResult3 = tableEnv.executeSql("DROP CATALOG my_catalog") assertEquals(ResultKind.SUCCESS, tableResult3.getResultKind) assertFalse(tableEnv.getCatalog("my_catalog").isPresent)